Running external commands in parallel

This is likely a concurrency beginner question that can be answered with a pointer to the right tool. I’m just having a bit of trouble navigating all the available options for concurrency: threading, multiprocessing, asyncio, concurrent.futures, trio, …

I have a number of files of varying sizes that I want to process using a command, let’s call it processit. I can do that using subprocess.run in a for loop, but this is sequential.

for file in files:
    subprocess.run(['processit', file])

I can also use subprocess.Popen to do this in parallel:

processes = [subprocess.Popen(['processit', file]) for file in files]
for p in processes:
    p.wait()

Am I right that this will use all available CPUs? And am I right that if I have, say, 2000 files, it is better to control the parallel execution myself instead of asking the system to do the scheduling?

If so, what is the easiest tool to do so? Of course I can split the files in chunks, and iterate over these chunks to run processit in parallel within a chunk. But if a chunk gets one very large file and otherwise small files, the CPUs that were assigned the small files will remain idle while waiting for the large file. I’d like to do better and assign a file to each CPU as soon as it is idle.

PS: I can answer my question by explicitly managing the processes (using poll() and such), but I guess there is an stdlib tool for this.

By Jean Abou Samra via Discussions on Python.org at 02Aug2022 23:01:

This is likely a concurrency beginner question that can be answered
with a pointer to the right tool. I’m just having a bit of trouble
navigating all the available options for concurrency: threading,
multiprocessing, asyncio, concurrent.futures, trio, …

I have a number of files of varying sizes that I want to process using a command, let’s call it processit. I can do that using subprocess.run in a for loop, but this is sequential.

for file in files:
   subprocess.run(['processit', file])

I can also use subprocess.Popen to do this in parallel:

processes = [subprocess.Popen(['processit', file]) for file in files]
for p in processes:
   p.wait()

Am I right that this will use all available CPUs? And am I right that
if I have, say, 2000 files, it is better to control the parallel
execution myself instead of asking the system to do the scheduling?

It is probably better to manage the concurrency yourself, even if you
have to hand tune the number of parallel tasks. Ifyou hand them all off
the the OS it will try to run them all and at a point, probably well
under 100 processes, aggregate performance will degrade.

Personally I do this with Threads and a Semaphore.

Remark: because all the work is being done in subprocesses, the OS can
use the CPUs as much as it likes. Python spends almost all its time
blocked waiting for subprocesses or Threads, so it does not need much
CPU at all.

So dispatch is like (totally untested, but I have similar real world
code):

sem = Semaphore(8)   # pick a threshold here

def dispatch(sem, argv, **kw):
    with sem:
        run(argv, **kw)

Ts = []
for file in files:
    T = Thread(target=dispatch, args=(sem, argv), kwargs=some_run_kw_options_here)
    T.start()
    Ts.append(T)

for T in Ts:
    T.join()

Note that this spawns a Thread per file but caps the OS processes. So
many many Threads but only a max of 8 subprocesses.

You can cap the Threads as well (Python Threrads map to real OS level
threads, so this can be important). Something like:

sem = Semaphore(8)   # pick a threshold here

def dispatch(sem, argv, **kw):
    try:
        run(argv, **kw)
    finally:
        sem.release()

Ts = []
for file in files:
    sem.acquire()
    T = Thread(target=dispatch, args=(sem, argv), kwargs=some_run_kw_options_here)
    T.start()
    Ts.append(T)


for T in Ts:
    T.join()

This prevents making more than 8 Python Threads at a time by blocking
the Thread creation with the semaphore; you just move acquire to the
loop and the release stays in the dispatch function.

Cheers,
Cameron Simpson cs@cskk.id.au