Queue termination

Yeah, implementing this pattern on top of a Queue was what occasionally makes me think of this problem. I think the API I proposed (shutdown() making put() fail immediately and get() raise once the queue is drained) would allow implementing this easily.

See also Add a cancel method to asyncio Queues Ā· Issue #81515 Ā· python/cpython Ā· GitHub

Is anyone here actually interested in submitting a PR? What would it take to get a PR from e.g. @EpicWink or @achimnol ?

2 Likes

Let me understand the desired API by fully specifying the proposal. We add a new method to queue.Queue, multiprocessing.Queue and asyncio.Queue, with signature:

def shutdown(self, immediate: bool = False) -> None

where:

  • not immediate: future put calls raise queue.QueueShutdown (subclass of Exception), unblocking any blocked callers of put. Once the queue is empty, this exception is also raised on get (also unblocking)
  • immediate: the same, except no waiting for the queue to empty
1 Like

What’s the use case for immediate=True? My thought had been to just support the not immediate version.

An exception (especially cancellation / keyboard interrupt) occurs and the programmer wants to stop all execution (both produces and consumers), especially when using threads

Ah, that makes sense for the (threaded) queue.Queue class. Perhaps less for asyncio.queue.Queue, where, at least if you’re using task groups (I know, 3.11) cancelling the parent task will cancel child tasks. For threads there’s no such mechanism. (Maybe there should be, but that’s a much bigger design problem – I don’t think we currently can throw exceptions into threads from the outside.)

UPDATE: So yes, go ahead and implement that API for all variations.

6 Likes

I’ll make a PR over the next couple of days

4 Likes

I’ve run into this as well, looking forward to this landing!

while we’re still brainstorming finalizer names, I think using something like complete() would be more meaningful.

So personally what I’ve been using of late is the dispatch pattern (I have no idea if that’s the official name, I think it’s called pubsub in some cases). Basically we have some sort of PubSub Queue that inherits from the Queue class and sets a provision for each worker to set a done_callback if the PubSub queue no longer has any more items and if the producer calls the queue shutdown method, so that it can call the done_callbacks to close the workers. For example

from collections.abc import Callable
from asyncio import Queue

class PubSubQueue (Queue):
    #this code has not been tested
    #so I'm not sure if it works
    def __init__(self,maxsize:int = 0):
        super(PubSubQueue,self).__init__(maxsize)
        self.done_callbacks: list[Callable[],None] = []

    def add_done_callback (self,callback:Callable[[],None]) -> None:
        self.done_callbacks.append(callback)
        #consider callbacks also being coroutines

    def shutdown (self) -> None:
        for callback in self.done_callbacks:
            callback()
        #consider also adding queue finalizer functionality here

That code does not work I’m sure, but represents my work-arounds for the issue you raised. In this case I assume that workers are instances of some Worker class that implements a method to terminate the worker, not just a plain function. If it’s a coroutine, perhaps you could call it’s close method.

Interesting, I would have thought that a callback to indicate to the clients that the queue is shutting down would make be fairly inconvenient. E.g. what to do if a client is already blocked in a get() on a queue that is shutting down? It sounds like you would still have to put a None into the queue for each client.

I haven’t encountered that yet, but the callback that the client provides is actually a function that shuts down the client itself, not notify the client that the queue is closing. so the queue makes sure that all the clients bound to it are first closed before it closes itself. If the client were waiting on a queue get, that action would be aborted and the client closed. That’s what I’d think.

Yeah, if you just terminate all the clients you don’t have to worry about this. I guess the existing API encourages this way of thinking about it: each client sits in a loop calling q.get() and q.task_done(), and when the producer is done it calls q.join() to wait for all the work to be completed, and at that point it can kill the clients.

This doesn’t work so great if you’re trying to write the client using for x in <some_wrapper(q)> though.

I’m wondering if work on this API has began.

It has, I’ve just got bogged down with work and life unexpectedly over the last couple of weeks. There’s a GitHub issue and an out-of-date draft PR.

Right now, I’m working on two problems: how to have failing tests not affect future tests, and coming up with all the possible scenarios and edge cases to test on.

When using a queue to buffer and pass values between async tasks it’s imho not nice having to call queue.task_done() to indicate that the value has been consumed (which it has since it is returned by the queue.get() call). The consumers run until the app shuts down and then the consumers will be canceled though a task.cancel(). So the task_done() calls really make no sense.

It would be really nice if there would be a SimpleQueue and then a TaskQueue which implements the additional tracking.

Calling task_done() is entirely optional. You just can’t use join(). I don’t think we’re going to add a second mechanism to track tasks, but we are planning a way to shut down a Queue so that all clients calling get() will receive an exception.

The reasoning is that you call queue.get() to get the item, then process it, then call queue.task_done() to signal that you’ve finished processing it, so queue.get() doesn’t mean that the item has been consumed, but that the item is being consumed.

That’s not what I was suggesting and I agree with you that it’s not necessary.

My proposal is that there is an async Queue implementation without tracking and an implementation with tracking that inherits from the first implementation. That way it would be no additional code and maintenance and align to the queue module which implements SimpleQueue and Queue.

I’m aware that calling task_done() is optional but sometimes the users get to see a traceback and it’s very confusing if there is a Queue with 254456364 pending tasks when in reality it’s empty (task count comes from the __repr__).