Using time.sleep() to wait on results of a sub-process

Hello all,

I’ve been trying to use the multiprocessing.py module to get some processes running in parallel, but I’m not sure I’m doing it right. My subprocesses append items to a queue that’s then accessed in the main routine. As far as I can see (may not be the case) it takes any given subprocess longer to append an item to the queue than it does for the main routine to reach the line where it checks that the queue is not empty, so the check always fails.

I’ve worked around this by chucking a time.sleep() in there but I’m not sure if this is a real thing or just some nonsense I’ve conjured up. The example below is a little analog of the actual problem, but it exhibits the same behaviour - remove the sleep() and the subsequent print() statement isn’t executed.

from time import sleep
from multiprocessing import Process, Queue

def proc(q):
    q.put('foo')

q = Queue()
p = Process(target=proc, args=(q,))
p.start()

sleep(0.01)
if not q.empty():
    print('queue is not empty')
    
p.join()
    

Any help is much appreciated!

You can use q.get() to wait for the answer.

1 Like

thanks, I’m unsure how I’d use this to the same effect in the actual program, as there are multiple items in the queue and it has to process them all. Something like:

while not q.empty():
    item = q.get()
    # do something with the item

it seems to work if I chuck a small sleep() in front of the loop. What’s interesting is that if the subprocess that puts items in the queue is relatively long running compared to the loop in which the items are consumed, even a sleep of 1/100th of a second before the loop runs is enough to prevent the queue from being depleted prematurely (i.e before the subprocess has finished appending all the necessary items to it).

I’m not saying this is meaningful in any way but just interesting, in the real program there are a number of comparisons the sub-process has to make to determine if an item should be added to the queue. Even if the main routine is simultaneously getting items from the queue and, for example, printing them, a small sleep() seems sufficient to prevent the queue being run down.

The queue is empty any time you’re “ahead of” the process that’s putting stuff onto it. Usually that’s not a problem; you just wait for it. But how do you know when the other process is done? There are two main ways: Either, the subprocess puts a “sentinel” onto the queue to say “I’m done now, you can shut down”; or, any time the queue is empty, you check to see if the process has ended, and if so, you finish. The sentinel is a lot easier to work with, but can’t handle the subprocess crashing.

Replace this:

With

result = q.get()
print(f'results is {result})

p.join()

And when you have start more then one process like this:

for _ in range(num_processes_to_get_results_from):
    print('Results from process:', q.get())

Of course you will need to keep track of all the p objects so that you can join on them.

Thanks for replying. In the real code, I’m spawning four subprocesses, so that would mean four sentinels no? In that case, I’m guessing the main routine would just need to track how many sentinels it had encountered when pulling from the queue, and carry on upon encountering the fourth? I’ve added the snippet below to my real code and now it works nicely.

for p in procs:
    p.start()

counter = 0
while counter != 4:
    result = queue.get()
    if result == None:
        counter += 1
        continue
    # do something with result here

for p in procs:
    p.join()

Would you say that, if going down the route of checking whether the subprocess has ended, it’s sufficient just to do something like the following ()

while True:
    try:
        result = q.get(block=False)
        # do something with result
    except Empty:
        if p.is_alive():
            sleep(1)
            continue
        p.join()
        break

Another issue I encountered was that it wasn’t possible to simply run the four subprocesses and let them build the queue, and then continue with the main routine - calls to .join() would always hang.

Based on what I’ve read online, I’m making the assumption that this is because the pipe/buffer the OS uses to share the queue between the subprocess and the main process reaches it’s full capacity, causing the subprocess to hang until some buffer space is freed by the main .get()ing items from the queue.

So it seems essential that if the subprocess is able to fill the buffer, the code is written in a way that allows the main routine to relieve the queue of it’s contents as it is filled

Here is one possible way to handle this.

You could assign an id to each process, can just be an int you increment for each process.
Keep all the processes in a dict using the id as the key.
Pass the id to each process you start.
Pass back the a tuple of (id, result)

Now you can find the process to join after reading the tuple from the queue.

1 Like

Yep. You may want to have each process incorporate its process ID in the sentinel, and then as soon as you find a sentinel, you join() that process; otherwise, your given structure is fine. I’d make a small inversion though:

for p in procs:
    p.start()

running = len(procs)
while running:
    result = queue.get()
    if result == None:
        running -= 1
        continue
    # do something with result here

for p in procs:
    p.join()

as this can be easily adjusted if you add more processes dynamically, or anything like that. But yes, this is an excellent design.

Bingo. The queue becomes full, so the subprocesses wait for you. You’ll need to process data concurrently.

This sounds like you’re using subprocesses as workers, giving them jobs and getting back results. If so, you could probably do the same management more conveniently using multiprocessing.Pool (or concurrent.futures.ProcessPoolExecutor) to manage the queues and processes. Fundamentally equivalent, but easier to wrap up.

1 Like

Your operating system already does this :slight_smile: The multiprocessing.Process object has a pid attribute.