Multiprocessing: Difficulty in Terminating Child Processes

I’m having trouble understanding why my process gets stuck at process.join(), even though all the workers have already finished.

In my code, I was expecting all the workers to finish correctly when if number is None: occurred. However, after checking the process PIDs in my OS panel, they are still alive.

What I’m trying to do:

  • Start Workers using the multiprocessing module.

  • Put a set of data in the work_queue to be processed by the workers, which I expect will be CPU intensive.

  • Workers consume the queues and put the calculated result in the result_queue.

  • I terminate the workers by throwing None in the task_queue.

  • I collect all the results from the result queue.

import os
import multiprocessing
from typing import List


class Test:
    def __init__(self, num_worker: int = multiprocessing.cpu_count()) -> None:
        self.num_workers: int = num_worker
        self.task_queue: multiprocessing.Queue = multiprocessing.Queue()
        self.result_queue: multiprocessing.Queue = multiprocessing.Queue()
        self.workers: List[multiprocessing.Process] = []
        self.collect_list: List = []

    def worker(
        self, task_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue
    ) -> None:

        try:
            while True:
                number: int = task_queue.get()
                if number is None:
                    break
                result_queue.put(number * number)
        except KeyboardInterrupt:
            pass

        print(f"Worker PID {os.getpid()} Finishing")

    def start_workers(self) -> None:
        for _ in range(self.num_workers):
            process = multiprocessing.Process(
                target=self.worker, args=(self.task_queue, self.result_queue)
            )

            self.workers.append(process)
            process.start()

    def stop_workers(self) -> None:

        for _ in self.workers:
            self.task_queue.put(None)

        for process in self.workers:
            print("Starting Join")
            process.join()

    def process(self, num: int):
        self.task_queue.put(num)

    def collect(self) -> None:
        while True:
            item = self.result_queue.get()
            self.collect_list.append(item)

            if self.result_queue.qsize() == 0:
                break
if __name__ == "__main__":
    test = Test()
    test.start_workers()
    for i in range(10000):
        test.process(i)
    test.stop_workers()
    test.collect()
    print(len(test.collect_list))

Hi,

I ran your script. If I comment out this line:

self.workers.append(process)

in the def start_workers(self) -> None: method, it appears to run. Otherwise, you get the EOFError: Ran out of input traceback error. I would start there.

2 Likes

Thanks for the response, Paul. I made some changes to my code. I removed self.workers.append(process) and added a while loop to wait for the task_queue to empty. Only after that do I send the None values to the queue.

I tested with 10K and 100K entries and after a while I can collect all the results.

Can you tell me why putting the processes inside self.workers and calling the join() was causing this behavior? And what would be the correct way to handle the Process life cycle?

Code updated.

import os
import multiprocessing
from typing import List


class Test:
    def __init__(self, num_worker: int = multiprocessing.cpu_count()) -> None:
        self.num_workers: int = num_worker
        self.task_queue: multiprocessing.Queue = multiprocessing.Queue()
        self.result_queue: multiprocessing.Queue = multiprocessing.Queue()
        self.workers: List[multiprocessing.Process] = []
        self.collect_list: List = []

    def worker(
        self, task_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue
    ) -> None:

        try:
            while True:
                number: int = task_queue.get()
                if number is None:
                    break
                result_queue.put(number * number)
        except KeyboardInterrupt:
            pass

        print(f"Worker PID {os.getpid()} Finishing")

    def start_workers(self) -> None:
        for _ in range(self.num_workers):
            process = multiprocessing.Process(
                target=self.worker, args=(self.task_queue, self.result_queue,)
            )

            # self.workers.append(process)
            process.start()

    def stop_workers(self) -> None:

        while True:
            if self.task_queue.qsize() == 0:
                print(f"Task queue empty")
                break

        for _ in range(self.num_workers):
            self.task_queue.put(None)

    def process(self, num: int):
        self.task_queue.put(num)

    def collect(self) -> None:
        while True:
            item = self.result_queue.get()
            self.collect_list.append(item)

            if self.result_queue.qsize() == 0:
                break
if __name__ == "__main__":
    test = Test()
    test.start_workers()
    for i in range(10000):
        test.process(i)
    test.stop_workers()
    test.collect()

    print(f"Collect list")
    print(len(test.collect_list))

Apparently, you need a handle when running the processes by which to reference them. You also need to start the processes prior to appending them to the list. Order is important.

Do like this:

   process_run = process.start()
   self.workers.append(process_run)

You don’t need the following import, so delete it from your script:

# from typing import List

There is a built-in list that you can use in its place.

import os
import multiprocessing

class Test:

    def __init__(self, num_worker: int = multiprocessing.cpu_count()):

        self.num_workers: int = num_worker
        self.task_queue: multiprocessing.Queue = multiprocessing.Queue()
        self.result_queue: multiprocessing.Queue = multiprocessing.Queue()
        self.workers: list[multiprocessing] = []
        self.collect_list: list = []

    # @staticmethod
    def worker(self, task_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue ) -> None:

        try:
            while True:
                number: int = task_queue.get()
                if number is None:
                    break
                result_queue.put(number * number)
        except KeyboardInterrupt:
            pass

        print(f"Worker PID {os.getpid()} Finishing")

    def start_workers(self) -> None:

        for _ in range(self.num_workers):

            process = multiprocessing.Process(
                target=self.worker, args=(self.task_queue, self.result_queue))

            process_run = process.start()
            self.workers.append(process_run)


    def stop_workers(self) -> None:

        while True:

            if self.task_queue.qsize() == 0:
                print(f"Task queue empty")
                break

        for _ in range(self.num_workers):
            self.task_queue.put(None)

    def process(self, num: int):
        self.task_queue.put(num)

    def collect(self) -> None:

        while True:
            item = self.result_queue.get()
            self.collect_list.append(item)

            if self.result_queue.qsize() == 0:
                break

if __name__ == "__main__":

    try:

        test = Test()
        test.start_workers()
        for i in range(4):
            test.process(i)
        test.stop_workers()
        test.collect()

        print(f"Collect list")
        print(len(test.collect_list))

    except KeyboardInterrupt:
        pass

fyi …
When I performed a search to familiarize myself with the List class, AI provided this google search response:

The statement from typing import List in Python is used to import the List type hint from the typing module. Type hints were introduced in Python 3.5 to allow specifying the expected type of variables, function arguments, and return values. This enhances code readability and helps catch type-related errors during development, especially when used with static analysis tools like MyPy.

Prior to Python 3.9, List (with a capital 'L') had to be imported from the typing module when annotating lists with type hints. Since Python 3.9, the built-in list type can be used directly for type hinting, making the import statement unnecessary in many cases. However, using from typing import List is still valid and may be preferred for compatibility with older Python versions or for clarity.

As you can see, after v3.9, the built-in list can be used directly instead of importing the typing library.

Hello again Paul. Just a few points, regarding List, I know that currently I can use the build-in type list but I find it more readable to use List from the typing module.

Regarding your suggestion, I did some tests and start() does not return any value.

I still have doubts about why my child processes were not finished even after exiting the execution loop, causing .join() to get stuck.

Please copy and paste on your system the last script that I provided (it is an edited version of the script that you had earlier provided by the way). Test it. There is no .join() anywhere in the script. You might perhaps be testing an earlier version that you have on your system.

By the way, the following method can be set to static since you are not passing any instance attributes as arguments:

    @staticmethod
    def worker(task_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue ) -> None:

        try:
            while True:
                number: int = task_queue.get()
                if number is None:
                    break
                result_queue.put(number * number)
        except KeyboardInterrupt:
            pass

        print(f"Worker PID {os.getpid()} Finishing")

When I ran the script, it did not get stuck. This is the output that I observed:

Task queue empty
Worker PID 14600 Finishing
Worker PID 9724 Finishing
Worker PID 9524 Finishing
Worker PID 8068 Finishing
Worker PID 4140 FinishingWorker PID 3328 Finishing

Worker PID 14648 Finishing
Worker PID 17364 Finishing
Collect list
4
Worker PID 13716 Finishing
Worker PID 10752 Finishing
Worker PID 4968 Finishing
Worker PID 13432 Finishing

Process finished with exit code 0

Note that traceback errors are no longer being generated as per your original script.