I have two third party CLI tools which I’m using as such: reader.exe binary_file - | writer.exe - where reader reads a binary file and pipes it into writer. They seem to be only using 1 core so simply looping on all the files one by one and running this in a subprocess is fairly slow. How would I go about running several instances simultaneously on all of my cores (maybe all minus 2 to not tank the entire PC).
After a bit of research and asking, I feel like I’m just as lost as before. Here’s what I’ve got so far:
import subprocess
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from pathlib import Path
files = ... # list of files
# Method: 1
def processor(file: Path) -> None:
destination = Path.cwd() / "processed" / file
pipe = subprocess.Popen(("reader", file, "-"), stdout=subprocess.PIPE)
subprocess.run(("writer", "-", destination), stdin=pipe.stdout)
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
for file in files:
executor.submit(processor, file)
# Method: 2
def processor(file: Path) -> None:
destination = Path.cwd() / "processed" / file
pipe = subprocess.Popen(("reader", file, "-"), stdout=subprocess.PIPE)
subprocess.run(("writer", "-", destination), stdin=pipe.stdout)
if __name__ == "__main__":
with ThreadPoolExecutor() as executor:
for file in files:
executor.submit(processor, file)
# Method: 3
if __name__ == "__main__":
for file in files:
destination = Path.cwd() / "processed" / file
pipe = subprocess.Popen(("reader", file, "-"), stdout=subprocess.PIPE)
subprocess.Popen(("writer", "-", destination), stdin=pipe.stdout)
I’m unsure of the details here and would appreciate some insight. I have never done multi processing (or threading? or parallelization?, I’m not really sure about the distinction here).
I didn’t really need to do anything with the future objects which is why I wasn’t storing them. The 3 examples I gave already work, my question was more so about which one being the more appropriate or correct way to do things (or if there’s a better method)
Ah, yeah, asyncio is one area where Python isn’t the same as some other languages, such as JavaScript and Pike. If you drop the futures on the ground, the task doesn’t just run to completion without you; you need to hang onto a reference to it. That’s only the case with asyncio though, it should be fine with subprocess.
Since these are subprocesses, you don’t also need threads to manage them, so ideally you should be able to spawn each reader/writer pair and then wait() for them all. More-or-less the third method you have there, but once you’ve started everything, iterate over all the processes you started and wait till they’re done. Which will mean retaining all those Popens in a list.
How would I manage the popens? Just launching them all nearly brought my PC to a halt while ThreadPoolExecutor seemed to manage them automatically quite well without bringing everything else to a halt + gives me the max_worker control too
Oh, I didn’t realise you had THAT many! In that case, yes, you want some sort of worker pool. ThreadPoolExecutor is probably the right choice; it adds some overhead (a thread for each worker) which could be avoided if you did the work manually, but it’s most likely going to be simple enough to justify that overhead.