I’m using multiprocessing.Process to build a master and worker system.
The master is coordinating between the worker and the workers send information to the master by the master_event_queue, and the master send information to workers by worker_event queue.
The code looks like this:
class Master(Process):
def __init__(self,
args,
nth_physical: int,
init_task_list: List[Task],
child_conn,
log_level: str = "info",
logdir=None,
log_to_console=True
):
super().__init__()
self.nth_physical = nth_physical # number of physical threads
self.thread_factor = args.root_thread_factor
self.nth_logical = nth_physical * self.thread_factor + 1 # total number of logical threads, +1 for MON_CPUMEM task
self.nth_idle = self.nth_logical
self.init_task_list = init_task_list
self.task_list = []
self.workers_dict = {} # worker_id: worker
self.idle_workers = []
self.n_workers = 0
self.main_worker_id = None
self.main_worker = None # worker with the original MIP
self.main_worker_finished = False
# monitor related
self.hcpu_on = False
self.oom_on = False
self.manager = Manager()
self.master_task_queue = self.manager.Queue()
self.master_event_queue = self.manager.Queue()
self.sid = 0
self.args = args
self.child_conn = child_conn
def run(self):
try:
self.logger.info(f"Master: Started with pid {os.getpid()}.")
event_queue = self.master_event_queue
task_queue = self.master_task_queue
# Start monitoring
self.logger.info(f"Master: Start monitoring the queues.")
has_task_flag = True
has_event_flag = True
while not self.main_worker_finished:
# self.logger.debug(list(map(lambda p: p.pid, list_children())))
# main job: deal with all the tasks
try:
if has_task_flag:
self.logger.debug(f"Master: Try to get task from task queue. The length is {task_queue.qsize()}.")
task = task_queue.get_nowait()
has_task_flag = True
self.logger.info(f"Master: Got task {task}.")
self.deal_with_task(task)
self.logger.info(f"Master: Task {task} is dealt with.")
except Empty:
if has_task_flag:
self.logger.info(f"Master: Task queue is empty. Break the task loop.")
has_task_flag = False
# communication: deal with all the events
while True:
try:
if has_event_flag:
self.logger.debug(f"Master: Try to get event from event queue.")
event = event_queue.get_nowait()
has_event_flag = True
self.logger.info(f"Master: Received event {event}.")
self.deal_with_event(event)
self.logger.info(f"Master: Event {event} is dealt with.")
rel_gap = 1.0 - self.model_status["bstbnd"] / self.model_status["bstobj"]
target_rel_gap = self.args.relgap
self.logger.info(f"Master: The relative gap is {rel_gap:.6e}.")
if self.args.focus == "time" and rel_gap < target_rel_gap:
self.main_worker_finished = True
self.logger.info(f"Master: The relative gap {rel_gap:.6e} is less than {target_rel_gap}. Stop the main worker.")
except Empty:
if has_event_flag:
self.logger.info(f"Master: Event queue is empty. Break the event loop.")
has_event_flag = False
break
time.sleep(0.01)
self.logger.info(f"Master: The main worker finished. Send the result and Kill all workers.")
except Exception as e:
self.logger.critical(f"Master: Exception {e} occurred.")
# clear the queues and kill all workers
self.clear()
self.logger.info(f"Master: Sent the result to the parent process.")
# close the connection
self.child_conn.close()
self.logger.info(f"Master: close the pipe connection to the parent process.")
And the worker
class Worker(Process):
def __init__(self, args, worker_id, master_event_queue, master_task_queue, manager, log_level: str = "info", logdir=None, log_to_console=True):
super().__init__()
self.worker_id = worker_id
self.master_event_queue = master_event_queue
self.master_task_queue = master_task_queue
self.manager = manager
self.worker_event_queue = manager.Queue()
......
def run(self):
try:
# self.logger.setLevel(logging.DEBUG)
self.logger.info(f"Worker {self.worker_id}: Started with pid {os.getpid()}")
while True:
self.logger.debug(f"Worker {self.worker_id}: Blocking wait for events from event queue.")
event = self.worker_event_queue.get(True) # If the queue is empty, get blocks until an item is available
if event.type == EventType.ASSIGN_TASK:
self.task = event.data["task"]
self.logger.info(f"Worker {self.worker_id}: Received task {self.task}.")
self.process_task()
self.logger.debug(f"Worker {self.worker_id}: Task done.")
elif event.type == EventType.KILL_SOLVER:
self.logger.info(f"Worker {self.worker_id}: Received kill solver event.")
break
else:
self.logger.error(f"Worker {self.worker_id}: Received unknown event type: {event.type}")
time.sleep(0.01)
self.logger.info(f"Worker {self.worker_id}: Terminated.")
except Exception as e:
self.logger.critical(f"Worker {self.worker_id}: Exception {e} occurred.")
# clear the worker event queue
self.clear()
def process_task(self):
..... # send information by self.master_event_queue.put(Event(....))
I have loggings and find that after Worker put into master_event_queue, the master didn’t get any event since sometime from this single worker. But master can receive event in master_event_queue from other workers.