Yeah, implementing this pattern on top of a Queue was what occasionally makes me think of this problem. I think the API I proposed (shutdown() making put() fail immediately and get() raise once the queue is drained) would allow implementing this easily.
Is anyone here actually interested in submitting a PR? What would it take to get a PR from e.g. @EpicWink or @achimnol ?
Let me understand the desired API by fully specifying the proposal. We add a new method to queue.Queue, multiprocessing.Queue and asyncio.Queue, with signature:
def shutdown(self, immediate: bool = False) -> None
where:
- not immediate: future
putcalls raisequeue.QueueShutdown(subclass ofException), unblocking any blocked callers ofput. Once the queue is empty, this exception is also raised on get (also unblocking) - immediate: the same, except no waiting for the queue to empty
Whatās the use case for immediate=True? My thought had been to just support the not immediate version.
An exception (especially cancellation / keyboard interrupt) occurs and the programmer wants to stop all execution (both produces and consumers), especially when using threads
Ah, that makes sense for the (threaded) queue.Queue class. Perhaps less for asyncio.queue.Queue, where, at least if youāre using task groups (I know, 3.11) cancelling the parent task will cancel child tasks. For threads thereās no such mechanism. (Maybe there should be, but thatās a much bigger design problem ā I donāt think we currently can throw exceptions into threads from the outside.)
UPDATE: So yes, go ahead and implement that API for all variations.
Iāll make a PR over the next couple of days
Iāve run into this as well, looking forward to this landing!
while weāre still brainstorming finalizer names, I think using something like complete() would be more meaningful.
So personally what Iāve been using of late is the dispatch pattern (I have no idea if thatās the official name, I think itās called pubsub in some cases). Basically we have some sort of PubSub Queue that inherits from the Queue class and sets a provision for each worker to set a done_callback if the PubSub queue no longer has any more items and if the producer calls the queue shutdown method, so that it can call the done_callbacks to close the workers. For example
from collections.abc import Callable
from asyncio import Queue
class PubSubQueue (Queue):
#this code has not been tested
#so I'm not sure if it works
def __init__(self,maxsize:int = 0):
super(PubSubQueue,self).__init__(maxsize)
self.done_callbacks: list[Callable[],None] = []
def add_done_callback (self,callback:Callable[[],None]) -> None:
self.done_callbacks.append(callback)
#consider callbacks also being coroutines
def shutdown (self) -> None:
for callback in self.done_callbacks:
callback()
#consider also adding queue finalizer functionality here
That code does not work Iām sure, but represents my work-arounds for the issue you raised. In this case I assume that workers are instances of some Worker class that implements a method to terminate the worker, not just a plain function. If itās a coroutine, perhaps you could call itās close method.
Interesting, I would have thought that a callback to indicate to the clients that the queue is shutting down would make be fairly inconvenient. E.g. what to do if a client is already blocked in a get() on a queue that is shutting down? It sounds like you would still have to put a None into the queue for each client.
I havenāt encountered that yet, but the callback that the client provides is actually a function that shuts down the client itself, not notify the client that the queue is closing. so the queue makes sure that all the clients bound to it are first closed before it closes itself. If the client were waiting on a queue get, that action would be aborted and the client closed. Thatās what Iād think.
Yeah, if you just terminate all the clients you donāt have to worry about this. I guess the existing API encourages this way of thinking about it: each client sits in a loop calling q.get() and q.task_done(), and when the producer is done it calls q.join() to wait for all the work to be completed, and at that point it can kill the clients.
This doesnāt work so great if youāre trying to write the client using for x in <some_wrapper(q)> though.
Iām wondering if work on this API has began.
It has, Iāve just got bogged down with work and life unexpectedly over the last couple of weeks. Thereās a GitHub issue and an out-of-date draft PR.
Right now, Iām working on two problems: how to have failing tests not affect future tests, and coming up with all the possible scenarios and edge cases to test on.
When using a queue to buffer and pass values between async tasks itās imho not nice having to call queue.task_done() to indicate that the value has been consumed (which it has since it is returned by the queue.get() call). The consumers run until the app shuts down and then the consumers will be canceled though a task.cancel(). So the task_done() calls really make no sense.
It would be really nice if there would be a SimpleQueue and then a TaskQueue which implements the additional tracking.
Calling task_done() is entirely optional. You just canāt use join(). I donāt think weāre going to add a second mechanism to track tasks, but we are planning a way to shut down a Queue so that all clients calling get() will receive an exception.
The reasoning is that you call queue.get() to get the item, then process it, then call queue.task_done() to signal that youāve finished processing it, so queue.get() doesnāt mean that the item has been consumed, but that the item is being consumed.
Thatās not what I was suggesting and I agree with you that itās not necessary.
My proposal is that there is an async Queue implementation without tracking and an implementation with tracking that inherits from the first implementation. That way it would be no additional code and maintenance and align to the queue module which implements SimpleQueue and Queue.
Iām aware that calling task_done() is optional but sometimes the users get to see a traceback and itās very confusing if there is a Queue with 254456364 pending tasks when in reality itās empty (task count comes from the __repr__).