Multiprocess.Queue only get some of items not all

I’m using multiprocessing.Process to build a master and worker system.
The master is coordinating between the worker and the workers send information to the master by the master_event_queue, and the master send information to workers by worker_event queue.

The code looks like this:

class Master(Process):
    def __init__(self,
                 args,
                 nth_physical: int,
                 init_task_list: List[Task],
                 child_conn,
                 log_level: str = "info",
                 logdir=None,
                 log_to_console=True
                 ):
        super().__init__()
        self.nth_physical = nth_physical  # number of physical threads
        self.thread_factor = args.root_thread_factor
        self.nth_logical = nth_physical * self.thread_factor + 1  # total number of logical threads, +1 for MON_CPUMEM task
        self.nth_idle = self.nth_logical

        self.init_task_list = init_task_list
        self.task_list = []

        self.workers_dict = {}  # worker_id: worker
        self.idle_workers = []
        self.n_workers = 0
        self.main_worker_id = None
        self.main_worker = None  # worker with the original MIP
        self.main_worker_finished = False

        # monitor related
        self.hcpu_on = False
        self.oom_on = False

        self.manager = Manager()
        self.master_task_queue = self.manager.Queue()
        self.master_event_queue = self.manager.Queue()
        self.sid = 0

        self.args = args
        self.child_conn = child_conn


    def run(self):
        try:
            self.logger.info(f"Master: Started with pid {os.getpid()}.")
            event_queue = self.master_event_queue
            task_queue = self.master_task_queue
            # Start monitoring
            self.logger.info(f"Master: Start monitoring the queues.")

            has_task_flag = True
            has_event_flag = True
            while not self.main_worker_finished:
                # self.logger.debug(list(map(lambda p: p.pid, list_children())))
                # main job: deal with all the tasks
                try:
                    if has_task_flag:
                        self.logger.debug(f"Master: Try to get task from task queue. The length is {task_queue.qsize()}.")
                    task = task_queue.get_nowait()
                    has_task_flag = True
                    self.logger.info(f"Master: Got task {task}.")
                    self.deal_with_task(task)
                    self.logger.info(f"Master: Task {task} is dealt with.")
                except Empty:
                    if has_task_flag:
                        self.logger.info(f"Master: Task queue is empty. Break the task loop.")
                    has_task_flag = False

                # communication: deal with all the events
                while True:
                    try:
                        if has_event_flag:
                            self.logger.debug(f"Master: Try to get event from event queue.")
                        event = event_queue.get_nowait()
                        has_event_flag = True
                        self.logger.info(f"Master: Received event {event}.")
                        self.deal_with_event(event)
                        self.logger.info(f"Master: Event {event} is dealt with.")
                        rel_gap = 1.0 - self.model_status["bstbnd"] / self.model_status["bstobj"]
                        target_rel_gap = self.args.relgap
                        self.logger.info(f"Master: The relative gap is {rel_gap:.6e}.")
                        if self.args.focus == "time" and rel_gap < target_rel_gap:
                            self.main_worker_finished = True
                            self.logger.info(f"Master: The relative gap {rel_gap:.6e} is less than {target_rel_gap}. Stop the main worker.")
                    except Empty:
                        if has_event_flag:
                            self.logger.info(f"Master: Event queue is empty. Break the event loop.")
                        has_event_flag = False
                        break
                time.sleep(0.01)
            self.logger.info(f"Master: The main worker finished. Send the result and Kill all workers.")
        except Exception as e:
            self.logger.critical(f"Master: Exception {e} occurred.")

        # clear the queues and kill all workers
        self.clear()
        self.logger.info(f"Master: Sent the result to the parent process.")
        # close the connection
        self.child_conn.close()
        self.logger.info(f"Master: close the pipe connection to the parent process.")

And the worker

class Worker(Process):
    def __init__(self, args, worker_id, master_event_queue, master_task_queue, manager, log_level: str = "info", logdir=None, log_to_console=True):
        super().__init__()
        self.worker_id = worker_id
        self.master_event_queue = master_event_queue
        self.master_task_queue = master_task_queue

        self.manager = manager
        self.worker_event_queue = manager.Queue()
        ......

    def run(self):
        try:
            # self.logger.setLevel(logging.DEBUG)
            self.logger.info(f"Worker {self.worker_id}: Started with pid {os.getpid()}")

            while True:
                self.logger.debug(f"Worker {self.worker_id}: Blocking wait for events from event queue.")
                event = self.worker_event_queue.get(True)  # If the queue is empty, get blocks until an item is available
                if event.type == EventType.ASSIGN_TASK:
                    self.task = event.data["task"]
                    self.logger.info(f"Worker {self.worker_id}: Received task {self.task}.")
                    self.process_task()
                    self.logger.debug(f"Worker {self.worker_id}: Task done.")
                elif event.type == EventType.KILL_SOLVER:
                    self.logger.info(f"Worker {self.worker_id}: Received kill solver event.")
                    break
                else:
                    self.logger.error(f"Worker {self.worker_id}: Received unknown event type: {event.type}")
                time.sleep(0.01)
            self.logger.info(f"Worker {self.worker_id}: Terminated.")
        except Exception as e:
            self.logger.critical(f"Worker {self.worker_id}: Exception {e} occurred.")

        # clear the worker event queue
        self.clear()

    def process_task(self):
        ..... # send information by self.master_event_queue.put(Event(....))

I have loggings and find that after Worker put into master_event_queue, the master didn’t get any event since sometime from this single worker. But master can receive event in master_event_queue from other workers.

If it works with multiple children, but not one, something weird is probably wrong.

I find adding more logging and exception catching in the run() helpful in this situation.

If you just want to run tasks on processes then consider using a ProcessPoolExecutor instead of rolling your own implementation: concurrent.futures — Launching parallel tasks — Python 3.12.2 documentation