I have a question about asyncio.Queue. In documentation there is an example with multiple workers. And they use the task_done call to inform queue about the finished task.
I’ve played a little bit with this example and found the task_done function ambiguous.
It is not related to particular object or Queue.get call.
And it can be called even without queue.get().
And, what is important, multiple calls of it can break queue.join() barrier. Even if the queue still has objects to process.
Here is my modified code:
import asyncio
import random
import time
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def bad_worker(name, queue: asyncio.Queue):
i = 0
while True:
i += 1
await asyncio.sleep(0.1)
queue.task_done()
print(f"{i}. Bad worker released task without even getting it. Tasks in queue: {queue.qsize()}.")
async def main():
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
tasks.append(asyncio.create_task(bad_worker('bad_worker', queue)))
started_at = time.monotonic()
await queue.join()
print(f'AFTER queue.join() WE HAVE {queue.qsize()} TASKS IN QUEUE.')
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
Here you can find function bad_worker which doesn’t take any task from queue. But it decrements
tasks count by queue.task_done().
And after 20 task_done calls in summary from good workers and bad worker we pass through queue.join().
Is it intended behavior?
I don’t like 2 moments here. Maybe I just don’t understand all the circumstances.
-
Queue.task_done()is not related to a particular object orQueue.getcall.
So if somewhere in code I need to notify queue with.task_done()I cannot be sure that I notify about an object which I got withQueue.get. Some other functions may have already notified queue when current function was awaiting. -
Queue.joinfunction does not care about the number of items in the queue. In code you can see that I callQueue.qsize()afterawait queue.join(). And it returns non-zero value.
I guess here should be a check of theqsize.
Why doesQueueclass watch at the internal counter oftask_donecalls instead of actual number of items in queue?
So if somewhere in your codetask_donewill be called by mistake you will anyway pass throughqueue.join()with unprocessed items in queue. I would not like this behaviour.
It would be nice to have at least some warning in such cases.