How to handle multiple subprocesses simultaneously

I have two third party CLI tools which I’m using as such: reader.exe binary_file - | writer.exe - where reader reads a binary file and pipes it into writer. They seem to be only using 1 core so simply looping on all the files one by one and running this in a subprocess is fairly slow. How would I go about running several instances simultaneously on all of my cores (maybe all minus 2 to not tank the entire PC).

After a bit of research and asking, I feel like I’m just as lost as before. Here’s what I’ve got so far:

import subprocess
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from pathlib import Path

files = ... # list of files


# Method: 1
def processor(file: Path) -> None:
    destination = Path.cwd() / "processed" / file
    pipe = subprocess.Popen(("reader", file, "-"), stdout=subprocess.PIPE)
    subprocess.run(("writer", "-",  destination), stdin=pipe.stdout)


if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        for file in files:
            executor.submit(processor, file)


# Method: 2
def processor(file: Path) -> None:
    destination = Path.cwd() / "processed" / file
    pipe = subprocess.Popen(("reader", file, "-"), stdout=subprocess.PIPE)
    subprocess.run(("writer", "-", destination), stdin=pipe.stdout)


if __name__ == "__main__":
    with ThreadPoolExecutor() as executor:
        for file in files:
            executor.submit(processor, file)


# Method: 3
if __name__ == "__main__":
    for file in files:
        destination = Path.cwd() / "processed" / file
        pipe = subprocess.Popen(("reader", file, "-"), stdout=subprocess.PIPE)
        subprocess.Popen(("writer", "-", destination), stdin=pipe.stdout)

I’m unsure of the details here and would appreciate some insight. I have never done multi processing (or threading? or parallelization?, I’m not really sure about the distinction here).

Does the example on this page help? concurrent.futures — Launching parallel tasks — Python 3.12.4 documentation

The example creates a set of futures then loops to wait for them to complete.
Your code is not structured like that.

I didn’t really need to do anything with the future objects which is why I wasn’t storing them. The 3 examples I gave already work, my question was more so about which one being the more appropriate or correct way to do things (or if there’s a better method)

Ah, yeah, asyncio is one area where Python isn’t the same as some other languages, such as JavaScript and Pike. If you drop the futures on the ground, the task doesn’t just run to completion without you; you need to hang onto a reference to it. That’s only the case with asyncio though, it should be fine with subprocess.

Since these are subprocesses, you don’t also need threads to manage them, so ideally you should be able to spawn each reader/writer pair and then wait() for them all. More-or-less the third method you have there, but once you’ve started everything, iterate over all the processes you started and wait till they’re done. Which will mean retaining all those Popens in a list.

How would I manage the popens? Just launching them all nearly brought my PC to a halt while ThreadPoolExecutor seemed to manage them automatically quite well without bringing everything else to a halt + gives me the max_worker control too

Oh, I didn’t realise you had THAT many! In that case, yes, you want some sort of worker pool. ThreadPoolExecutor is probably the right choice; it adds some overhead (a thread for each worker) which could be avoided if you did the work manually, but it’s most likely going to be simple enough to justify that overhead.

1 Like

Yea, I’ll just use ThreadPoolExecutor then. The added complexity of manually managing them isn’t worth the minor performance gain imo. Thank you!

1 Like