Update on multiprocessing queue termination: @YvesDup and I have notices that the implementation makes it difficult to cleanly implement shutdown without changing the behaviour of the existing implementation. I suggest two options:
-
Introduce a new
multiprocessing.queues.TerminableQueue
which simply has the same API asqueue.Queue
(perhaps withoutjoin()
andtask_done()
, as we could add another subclass which uses the existingmultiprocessing.JoinableQueue
’s implementation) -
Change how the current
get()
implementation works by injecting a select call beforeself._reader.recv_bytes
(self._reader
is amultiprocessing.connection.Connection
object):def _reset(...): ... self._shutdown_conn = Connection() ... def get(...): ... (ready, _, _) = select.select( [self._reader, self._shutdown_conn], [], [] ) if self._reader not in ready: raise ShutDown res = self._reader.recv_bytes() ...