Queue termination further design

Update on multiprocessing queue termination: @YvesDup and I have notices that the implementation makes it difficult to cleanly implement shutdown without changing the behaviour of the existing implementation. I suggest two options:

  • Introduce a new multiprocessing.queues.TerminableQueue which simply has the same API as queue.Queue (perhaps without join() and task_done(), as we could add another subclass which uses the existing multiprocessing.JoinableQueue’s implementation)

  • Change how the current get() implementation works by injecting a select call before self._reader.recv_bytes (self._reader is a multiprocessing.connection.Connection object):

    def _reset(...):
        ...
        self._shutdown_conn = Connection()
    ...
    def get(...):
        ...
        (ready, _, _) = select.select(
            [self._reader, self._shutdown_conn], [], []
        )
        if self._reader not in ready:
            raise ShutDown
        res = self._reader.recv_bytes()
        ...