Queue termination

Consider a typical worker that consumes items from an asyncio.Queue. It might have a loop like this:

    while True:
        item = await queue.get()
        <process item>
        queue.task_done()

How would the producer indicate to the worker that there are no work items left? I’ve often wondered what the best way to do this.

For example, we could put a None into the queue and add if item is None: break to the loop. This requires us to know that regular work items are never None though. If we don’t know that, we might use a sentinel object.

Next, there might be multiple workers – would we have to append multiple sentinels to the queue, one per worker? That’s kind of ugly. Instead, the producer might call queue.join() to ensure that all workers are idle, and then cancel all the worker tasks – but this requires us to keep track of the worker tasks. Maybe that’s the responsibility of a worker pool?

What kind of mechanisms are in use out there? I don’t find any clues on how to handle this problem in the examples in the docs. Perhaps I am only imagining that this is a problem, and in practice this doesn’t bother anyone else?

Or maybe there’s a real need. In that case, would it make sense to add a new API to asyncio.Queue to make this simpler? E.g. the producer might “close” the queue and calling a closed queue’s get() method might raise an exception that all workers can catch.

(The same concern applies for the threaded queue module, perhaps more so. And why do I have a feeling of deja vu?)

3 Likes

This can get quite complex depending on your goals. My goals are to have a queue which can be cancelled at any time, which will finish on both sides when cancelled, can be consumed or produced by multiple threads/processes, and won’t cause memory leaks.

This is an implementation that achieves these goals:

import queue

class _Sentinel:   # support cross-process
    def __eq__(self, other):
        return self.__class__ == other.__class__

class MultiThreadedQueue:
    sentinel = _Sentinel()
    timeout = 0.2

    def __init__(self, maxsize=1):
        self.queue = queue.Queue(maxsize=maxsize)
        self.should_stop = False

    def get(self):
        while True:
            if self.should_stop:
                return self.sentinel
            try:
                return self.queue.get(timeout=self.timeout)
            except queue.Empty:
                pass

    def put(self, item):
        while True:
            if self.should_stop:
                raise RuntimeError("Queue has stopped")
            try:
                return self.queue.put(item, timeout=self.timeout)
            except queue.Full:
                pass

    def finish(self, n_consumers: int):
        [self.put(self.sentinel) for _ in range(n_consumers)]

Care needs to be taken with multi-processing threads: correct instantiation and forking of multiprocessing.Queue, and cancelling of the queue feed thread on unhandled exception.

Consumers would be expected to call q.finish(n) or set q.should_stop (likely in a try-finally, in my case in the __exit__ of a context-manager which manages the thread pool).

For instantaneous interruption, I’m sure there’s something I could do with Condition.


The above implementation is a small part of a large multi-inheritence pipelining library I have, which is relatively specialised (pipeline stages are homogeneous, CPU resources are at a premium) and not designed for message-passing.

I would be surprised if there aren’t other libraries out there more generic and user friendly which specifically address this problem better.

I’m using the following pattern for type-safety:

import asyncio
import enum

class Item: ...

class Sentinel(enum.Enum):
    TOKEN = 0  # value does not matter

async def process(q: asyncio.Queue[Item | Sentinel]) -> None:
    while True:
        item = await q.get()
        if item is Sentinel.TOKEN:
            break
        await work(item)
        q.task_done()

In this case, yes, we need to put sentinel objects multiple times. TaskGroup would make the logic you’ve described a little bit simplified but I think it’s not still ideal because threading does not have the equivalent and queue users have to keep references to all threads/tasks. :thinking:

In my cases, I’ve created multiple queues and attached only one consumer to each queue and let other mechanisms “load-balance” the incoming workloads (e.g., Redis streams).

Probably we could add something like Queue.close_and_wait() which makes all waiters on the queue to get queue.Empty exception when all items are dequeued while blocks the producer until then.
With this approach, I’m a little concerned about potential side effects from the races of consumer terminations and the end of the producer blocking. So I’d suggest the API users to wrap the entire logic with TaskGroup in asyncio for complete safety, but not sure about what to do in threads other than calling .join() of all consumer threads.

By Guido van Rossum via Discussions on Python.org at 22Aug2022 04:21:

Consider a typical worker that consumes items from an asyncio.Queue.
It might have a loop like this:

   while True:
       item = await queue.get()
       <process item>
       queue.task_done()

How would the producer indicate to the worker that there are no work items left? I’ve often wondered what the best way to do this.

For example, we could put a None into the queue and add if item is None: break to the loop. This requires us to know that regular work items are never None though. If we don’t know that, we might use a sentinel object.

Next, there might be multiple workers – would we have to append multiple sentinels to the queue, one per worker? That’s kind of ugly. Instead, the producer might call queue.join() to ensure that all workers are idle, and then cancel all the worker tasks – but this requires us to keep track of the worker tasks. Maybe that’s the responsibility of a worker pool?

What kind of mechanisms are in use out there? I don’t find any clues on how to handle this problem in the examples in the docs. Perhaps I am only imagining that this is a problem, and in practice this doesn’t bother anyone else?

I’ve got an IterableQUeue class I use this way. It uses a sentinel
object, and when the sentinel is obtained by get() the sentinel is put
back on the queue for the next getter. That’s synchronous (or threaded)
though.

For me, iteration hides this detail from the consumer - the “next”
operation does the get() and conceals the sentinel.

Is there any notion of treating await like an iterator’s “next” is a
pleasing way?

Cheers,
Cameron Simpson cs@cskk.id.au

I think the core issue is that the queue itself does not have any reference to its consumer (worker) threads or tasks, and thus it is impossible to guarantee synchronized termination of them but only can deliver the termination signal as sentinels or exceptions.

So the solution would be writing a wrapper abstraction to combine a queue and multiple consumers. Internally it may use TaskGroup for asyncio and Thread.join() for threading.

In Trio, we split our Queue equivalent into two separate endpoint objects, mostly so that we could express “sender closed its end” and “receiver closed its end” as two cleanly different concepts.

(Sender closed its end → receiver gets a special end-of-stream exception next time they read; receiver closed its end → sender gets a BrokenResourceError next time it tries to send, same as if you try to send on a socket that the other side has closed.)

2 Likes

I usually use None as the sentinel, and if there might be multiple workers, I have each worker put it back into the queue for the next worker to find. If None were ever a valid value, I’d make a special singleton for the sentinel.

I’ve never found a good way to do this. I typically either use None as a sentinel (and have to take care when I’m dealing with multiple workers, or cases where None might be a legitimate queue item) or I redesign my code to use a worker pool (and hence make it somebody else’s problem, but I’m not sure that always works as well as I hope).

I would love a Queue.shutdown() method that made this type of pattern more robust.

  1. queue.get() could raise some kind of “no more items” exception.
  2. Instead of supplying a fixed sentinel, get() could return a user-supplied sentinel value, in the same way that dict.get(key, value) does. So next_value = queue.get(my_sentinel). Since my_sentinel can be some unique object chosen by the caller, it can be something other than None, if None would be a legitimate queue entry.

The problem is that there being nothing to get doesn’t work. That’s normal and get just waits. But in conjunction with some sort of shutdown method that might work.

Based on the feedback it looks like there is an appetite for a better API around this. Let’s try to design one that works both for asyncio.Queue (coroutine-based) and for queue.Queue (thread-based).

For the reader end, it seems that a decent API would be to have a shutdown() method that sets a flag that causes get() to raise instead of block if there are no items left. The shutdown() itself would return instantaneous, if you want to wait until all queue items have actually been processed, you should use asyncio.Queue.join() or queue.Queue.join() (those require that the workers call q.task_done()).

This mechanism would make it a cinch to implement an iterator that iterates over the queue elements (even supporting multiple readers). To answer Cameron’s question, in the asyncio case this would have to be an async iterator, i.e. implement __aiter__() and __anext__(), which can be iterated over using async for item in q: ....

I have a hunch that it’s less common to want to tell the writer to stop putting things into a queue (certainly there’s no corresponding straightforward iterator pattern here, and there’s no task_done() / join() API) but for symmetry we should probably make it so that after shutdown() is called any put() calls will also raise an exception.

Is anyone interested in taking this on as a project?

I think my preference would be for the method to be called close() rather than shutdown().

That’s a good thing to bikeshed on. :slight_smile:

I think I slightly prefer shutdown() because close() sounds final – most objects (e.g. files) immediately stop being functional once close() returns, whereas here we definitely want the readers to continue getting items from the queue after it is shut down – get() will only return an exception once the queue is actually empty.

It would be different if we had a separate “reader” and “writer” end – then closing the writer would allow the reader to drain remaining data from the queue. (Like the Trio design Nathaniel alluded to.) But we don’t – we just have a single Queue object that has all methods for both sides.

2 Likes

By Guido van Rossum via Discussions on Python.org at 24Aug2022 21:08:

This mechanism would make it a cinch to implement an iterator that
iterates over the queue elements (even supporting multiple readers). To
answer Cameron’s question, in the asyncio case this would have to be
an async iterator, i.e. implement __aiter__() and __anext__(),
which can be iterated over using async for item in q: ....

Thank you. I have yet to get to griops with the async stuff.

I have a hunch that it’s less common to want to tell the writer to stop putting things into a queue (certainly there’s no corresponding straightforward iterator pattern here, and there’s no task_done() / join() API) but for symmetry we should probably make it so that after shutdown() is called any put() calls will also raise an exception.

I actually do that, with a close() method on my iterable queues; I
want to ensure that my control logic is correct by preventing further
put() calls. Not so much “informing” the writer as catching when the
writer is mistaken about being able to queue more data by raising a
ClosedError. So I’ve very +1 for .shutdown() (however spelled)
preventing further put().

Cheers,
Cameron Simpson cs@cskk.id.au

It’s true what you say about close(), but I dislike shutdown() for a similar reason. Windows PCs are ‘shutdown’, for example.

How about finish()?

Hm. shutdown() is a somewhat common function name for this kind of operation – it’s used for example by concurrent.futures.Executor, and there’s also a shutdown() socket method which is used to shut down one or both ends of a connection – named after the syscall of the same name.

I guess it’s unfortunate that in other context “shutdown” refers to the orderly shutting down of a computer.

3 Likes

Please add an optional boolean parameter ‘now’ (default: False) and purge the queue when it is set to True.

I have a hunch that it’s less common to want to tell the writer to stop putting things into a queue (certainly there’s no corresponding straightforward iterator pattern here, and there’s no task_done() / join() API) but for symmetry we should probably make it so that after shutdown() is called any put() calls will also raise an exception.

Yes, please. I have a queue with multiple writers where one of the readers must signal the end of the day has arrived.

Just yesterday I made use of AnyIO’s memory object streams for this purpose. They mirror Trio’s Channel design. Works splendidly for my use cases:

import asyncio

from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from anyio import create_memory_object_stream, create_task_group


async def consumer(receive: MemoryObjectReceiveStream[str]) -> None:
    async with receive:
        async for item in receive:
            print("Consumed", item)


async def producer(send: MemoryObjectSendStream[str]) -> None:
    async with send:
        for i in range(100):
            await send.send(f"Item {i}")


async def main() -> None:
    async with create_task_group() as tg:
        # Create a stream with a buffer of 20 items, and indicate "str" as type for
        # static type checkers
        send, receive = create_memory_object_stream(20, str)
        tg.start_soon(consumer, receive)
        tg.start_soon(producer, send)

asyncio.run(main())

I don’t have multiple writers in my use cases. If I did, closing would happen only when all of the writers were closed (streams can be cloned).