multiprocessing.Queue put order

I am using multiprocessing.Queue to implement classic producer-consumer pattern. To let consumers know all the tasks are done. I use a special None as sentinel. And I use Lock and Value(counter) to judge if current process is the last producer. If it is the last producer, it will put as many None as the number of consumers to the queue.

def producer(queue, lock, running_counters, num_consumers):
    while task_not_done:
        result = do something
        queue.put(result)
        with lock:
            running_counters.value -= 1
            if running_counters.value == 0: # last one, put sentinel Nones
                for _ in range(num_consumers):
                    queue.put(None)


def consumer(queue):
    while True:
        data = queue.get()
        if data is None:
            break
        do something with data

def main():
    queue = mp.Queue(maxsize=1_000_000)
    running_counters = mp.Value('i', NUM_PRODUCER)
    lock = mp.Lock()   
    producers = []
    for _ in range(NUM_PRODUCER):
        p = mp.Process(target=producer, args=(queue, lock, running_counters, NUM_CONSUMER))
        p.start()
        producers.append(p)

    consumers = []
    for _ in range(NUM_CONSUMER):
        p = mp.Process(target=consumer, args=(queue,))
        p.start()      
        consumers.append(p)
    [p.join() for p in producers]
    [p.join() for p in consumers]

I run this code in one machine, it works. But in another it deadlocks. when it deadlocks, I find that even all the consumers finished, the queue is still not empty. So the producers can’t finish.

I am very confused about this. Why use lock has synchronization problem. It seems when the last producer put None to queue, other producers still can put normal data to queue. I am not familiar with python multiprocessing. And unlike java/c++, I can’t find clear documents about python memory model which define happens-before or fence or related topic. I know python multiprocessing is not multithreads which all threads share the memory space of the same process.
Some web blogs say mp.Queue.put() will use background thread to send message. Maybe when last producer put None to the queue(it’s fast), other producers’ put is still running? If it’s true, how I can know all background sending is done?

In function producer, you have a while loop.

Does that mean that the producer can put more than one result into the queue?

Note that it decrements running_counters inside the loop, so it’ll decrement for each result it produces.

On another point, mp.Value contains its own lock, so you can remove the lock variable and say:

    running_counters = mp.Value('i', NUM_PRODUCER, lock=True)

and:

    with running_counters.get_lock():
        running_counters.value -= 1
        ...

I am sorry, the producer function is wrongly pasted. The while loop will put many results.


def producer(queue, lock, running_counters, num_consumers):
    while task_not_done:
        result = do something
        queue.put(result)
    
    # after all tasks done
    with lock:
        running_counters.value -= 1
        if running_counters.value == 0: # last one, put sentinel Nones
            for _ in range(num_consumers):
                queue.put(None)

Deadlocking can occur when one thread or process tries to acquire locks in the order A then B, and another thread or process tries to acquire locks in the order B then A.

The queue has a lock (queue.put and queue.get), and you have another explicit lock, but, as far as I can see, you’re acquiring either lock A or both locks in the order B then A, which I believe is safe from deadlocking.

You could do this instead to avoid nested locks:

    with lock:
        running_counters.value -= 1
        finished = running_counters.value == 0
        
    if finished: # last one, put sentinel Nones
        for _ in range(num_consumers):
            queue.put(None)

I suggest putting in some logging and then running it until it deadlocks. The log might make the problem clearer.

@MRAB I have changed to your suggestion. But it still doesn’t work. The problem is that the queue is still not empty after the last consumer exit because it received None. And all the producers have exited the while loop. Another strange thing is that, it works when process number is 8/16/24, but hang with 32 processes.

I add some print to producers(32) and consumers(only one).

def producer(queue, lock, running_counters, num_consumers):
    process_name = mp.current_process().name
    
    while task_not_done:
        result = do something
        queue.put(result)
        
    with lock:
        running_counters.value -= 1
        finished = running_counters.value == 0

        
    if finished:
        print(f"{process_name} is the last one")
        for _ in range(num_mergers):
            counter_queue.put(None)            

    print(f"{process_name} is done")    
    

def consumer(queue):
    process_name = mp.current_process().name

    while True:
        data = queue.get()
        if data is None:
            break
        do something with data          
        
    print(f"{process_name} is done")

The log of one deadlock running is:

counter_process-13 is done
counter_process-21 is done
counter_process-17 is done
counter_process-14 is done
counter_process-32 is done
counter_process-19 is done
counter_process-11 is done
counter_process-2 is done
counter_process-15 is done
counter_process-1 is done
counter_process-16 is done
counter_process-22 is done
counter_process-28 is done
counter_process-27 is done
counter_process-29 is done
counter_process-24 is done
counter_process-25 is done
counter_process-5 is done
counter_process-26 is done
counter_process-31 is done
counter_process-30 is done
counter_process-20 is done
counter_process-3 is done
counter_process-7 is done
counter_process-4 is done
counter_process-9 is done
counter_process-18 is done
counter_process-12 is done
counter_process-23 is done
counter_process-6 is done
counter_process-8 is done
counter_process-10 is the last one
counter_process-10 is done
merge_process-1 is done
chunk_queue: 0, counter_queue: 72, merged_queue: 0
bug
process counter_process-30 put data after finish
join counter_process-1
join counter_process-2
join counter_process-3
join counter_process-4
join counter_process-5
join counter_process-6
join counter_process-7
join counter_process-8
join counter_process-9
join counter_process-10
join counter_process-11
join counter_process-12
join counter_process-13
join counter_process-14
join counter_process-15
^CTraceback (most recent call last):
  File "......py", line 19, in <module>
    main()
  File "......py", line 8, in main
    vocabulary, merges = bpe_trainer.train(data_path, vocab_size,
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".....", line 14, in train
    word_counts = self._pretokenize_and_count_mp(input_path, special_tokens)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".....", line 287, in _pretokenize_and_count_mp
    p.join()
  File "......local/share/uv/python/cpython-3.12.11-linux-x86_64-gnu/lib/python3.12/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "......local/share/uv/python/cpython-3.12.11-linux-x86_64-gnu/lib/python3.12/multiprocessing/popen_fork.py", line 43, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "......local/share/uv/python/cpython-3.12.11-linux-x86_64-gnu/lib/python3.12/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

From the log, I find all the producers are done. The consumer(merge_process-1) is also done. The last producer is counter_process-10

We can see counter_process-15(producer) has printed counter_process-15 is done. But when it’s joined, it’s hanging. And I also print the queue and find “process counter_process-30 put data after finish”. it means process-30 put data after counter_process-10 put None to the queue.

Where is the code that prints:

chunk_queue: 0, counter_queue: 72, merged_queue: 0
bug
process counter_process-30 put data after finish

counter_process-30 said that it finished before counter_process-10 said that it finished. How does it know that counter_process-30 put data after finish?

A silly question, but can a producer even put None in the queue other than as the sentinel?

I changed my code to put the process name with the real data:

            debug_data = (process_name, real_data)
            counter_queue.put(debug_data)

real_data cann’t be None. And also in the debug version. normal data is a tuple of process_name and real_data.