This is a follow-on of Queue termination after implementation brought to light some aspects not considered during the design.
@guido brought up the following points regarding design decisions to be made:
What should full() and empty() return when the queue is shut down?
Do we need a new inquiry method (is_alive()?) to tell whether a queue has been shut down?
When the queue is shut down and empty, should get_nowait() raise QueueShutDown or QueueEmpty?
How do task_done() and join() interact with shutting down?
In addition, Guido asked about the necessity of immediate=True (argument to queue shutdown), and you see my comments in the original Discourse topic, but I argue the benefit outweighs the implementation compexity.
Right now the PRs are in a good state with comprehensive testing (although I’m still not convinced my gist has a bug regarding multiprocessing queue immediate shutdown), so see them for the current decisions made for the design, but in brief:
full and empty haven’t had any changes
There is no is_alive (or is_shutdown, or get_shutdown_state) method
get and get_nowait raise when queue is empty and has been shutdown
task_done and join raise when the queue is shutdown immediately, and don’t when shutdown eventually
Sorry for being slow. This is definitely going to target 3.13, so I am focusing on things that need my attention more immediately. But thanks for taking my questions seriously, and we will get through this!
I like this. Maybe this suggests a different name for immediate? E.g. drain=True or flush=True or discard=True. The “immediate” term doesn’t really convey the meaning (perhaps), plus the English half of my brain wants to change it to immediately=True.
I think so. flush feels more idiomatic to me, but discard better informs the user of the data loss. I’ll rename the parameter in the consuming implementation later today. Any further arguments for other names will likely change my mind.
Yeah, the __aiter__ version looks good at first blush.
Processing items from a queue using this feels much better than the ol’
while True:
item = await queue.get()
if item is sentinel:
break
(or I guess the modern except ShutDown: now)
… with the unfortunate side-effect of not being able to stick a timeout on the queue.get() operation. But I feel like this is a broader problem with async iterators, and ideally would be solved by an independent helper function which is out of scope here in any case, so never mind that now.
You could execute it inside async with asyncio.timeout(10) as to: and then keep calling to.reschedule(asyncio.get_running_loop().time() + 10). But you’re quickly entering the area where the cure is worse than the disease. And why not assume that either a new item will eventually appear in the queue, or the queue will be shut down?
@guido if we like the async iterator idea (and it sounds like we do), I’d be happy to put together a PR for it (including tests and docs). Just ping me when the termination stuff gets merged.