Queue termination further design

This is a follow-on of Queue termination after implementation brought to light some aspects not considered during the design.

@guido brought up the following points regarding design decisions to be made:

  • What should full() and empty() return when the queue is shut down?
  • Do we need a new inquiry method (is_alive()?) to tell whether a queue has been shut down?
  • When the queue is shut down and empty, should get_nowait() raise QueueShutDown or QueueEmpty?
  • How do task_done() and join() interact with shutting down?

In addition, Guido asked about the necessity of immediate=True (argument to queue shutdown), and you see my comments in the original Discourse topic, but I argue the benefit outweighs the implementation compexity.

Right now the PRs are in a good state with comprehensive testing (although I’m still not convinced my gist has a bug regarding multiprocessing queue immediate shutdown), so see them for the current decisions made for the design, but in brief:

  • full and empty haven’t had any changes
  • There is no is_alive (or is_shutdown, or get_shutdown_state) method
  • get and get_nowait raise when queue is empty and has been shutdown
  • task_done and join raise when the queue is shutdown immediately, and don’t when shutdown eventually

Relavant links:

Sorry for being slow. This is definitely going to target 3.13, so I am focusing on things that need my attention more immediately. But thanks for taking my questions seriously, and we will get through this!

1 Like

An alternate implementation (for threading queues for now) where immediate=True simply consumes the queue: gh-96471: Add threading queue shutdown by EpicWink · Pull Request #104750 · python/cpython · GitHub

Tangentially related, but do we want to make queues (especially asyncio queues) iterable?

A queue being terminated and emptied maps nicely to the async for loop exiting. Feels like an easy win.

So what would Queue.__anext__ look like? Maybe this?

def __anext__(self):
    try:
        return await self.get()
    except ShutDown:
        raise StopIteration

(__aiter__ would just return self)

Alternatively, __aiter__ could be an async generator:

def __aiter__(self):
    while True:
        try:
            value = await self.get()
        except ShutDown:
            return
        try:
            yield value
        finally:
            self.task_done()

(I’m not sure where to put the task_done() call in the __anext__ version.)

I like this. Maybe this suggests a different name for immediate? E.g. drain=True or flush=True or discard=True. The “immediate” term doesn’t really convey the meaning (perhaps), plus the English half of my brain wants to change it to immediately=True.

I think so. flush feels more idiomatic to me, but discard better informs the user of the data loss. I’ll rename the parameter in the consuming implementation later today. Any further arguments for other names will likely change my mind.

Yeah, the __aiter__ version looks good at first blush.

Processing items from a queue using this feels much better than the ol’

while True:
    item = await queue.get()
    if item is sentinel:
        break

(or I guess the modern except ShutDown: now)

… with the unfortunate side-effect of not being able to stick a timeout on the queue.get() operation. But I feel like this is a broader problem with async iterators, and ideally would be solved by an independent helper function which is out of scope here in any case, so never mind that now.

What do you think?

You could execute it inside async with asyncio.timeout(10) as to: and then keep calling to.reschedule(asyncio.get_running_loop().time() + 10). But you’re quickly entering the area where the cure is worse than the disease. And why not assume that either a new item will eventually appear in the queue, or the queue will be shut down?

You make great points, for some reason I didn’t think of the rescheduling approach :innocent:

@guido if we like the async iterator idea (and it sounds like we do), I’d be happy to put together a PR for it (including tests and docs). Just ping me when the termination stuff gets merged.

asyncio and threading queue shutdown will be in Python 3.13.


Multiprocessing queue shutdown is still in development, but I don’t think it’s as important as it’s a lot easier to kill a process externally than a thread. It would be nice to have in to keep the APIs consistent, and I’ll make an attempt with the time I have before the feature-freeze.


@eric.snow has commented that sub-interpreter queue shutdown is possible, but I haven’t started work on it as I don’t have any familiarity with sub-interpreters, and the sub-interpreter standard-library module proposal PEP 734 isn’t yet accepted.

2 Likes

Update on multiprocessing queue termination: @YvesDup and I have notices that the implementation makes it difficult to cleanly implement shutdown without changing the behaviour of the existing implementation. I suggest two options:

  • Introduce a new multiprocessing.queues.TerminableQueue which simply has the same API as queue.Queue (perhaps without join() and task_done(), as we could add another subclass which uses the existing multiprocessing.JoinableQueue’s implementation)

  • Change how the current get() implementation works by injecting a select call before self._reader.recv_bytes (self._reader is a multiprocessing.connection.Connection object):

    def _reset(...):
        ...
        self._shutdown_conn = Connection()
    ...
    def get(...):
        ...
        (ready, _, _) = select.select(
            [self._reader, self._shutdown_conn], [], []
        )
        if self._reader not in ready:
            raise ShutDown
        res = self._reader.recv_bytes()
        ...