I have a question about asyncio.Queue
. In documentation there is an example with multiple workers. And they use the task_done
call to inform queue
about the finished task.
I’ve played a little bit with this example and found the task_done
function ambiguous.
It is not related to particular object or Queue.get
call.
And it can be called even without queue.get()
.
And, what is important, multiple calls of it can break queue.join()
barrier. Even if the queue still has objects to process.
Here is my modified code:
import asyncio
import random
import time
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def bad_worker(name, queue: asyncio.Queue):
i = 0
while True:
i += 1
await asyncio.sleep(0.1)
queue.task_done()
print(f"{i}. Bad worker released task without even getting it. Tasks in queue: {queue.qsize()}.")
async def main():
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
tasks.append(asyncio.create_task(bad_worker('bad_worker', queue)))
started_at = time.monotonic()
await queue.join()
print(f'AFTER queue.join() WE HAVE {queue.qsize()} TASKS IN QUEUE.')
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
Here you can find function bad_worker
which doesn’t take any task from queue
. But it decrements
tasks count by queue.task_done()
.
And after 20 task_done
calls in summary from good workers and bad worker we pass through queue.join()
.
Is it intended behavior?
I don’t like 2 moments here. Maybe I just don’t understand all the circumstances.
-
Queue.task_done()
is not related to a particular object orQueue.get
call.
So if somewhere in code I need to notify queue with.task_done()
I cannot be sure that I notify about an object which I got withQueue.get
. Some other functions may have already notified queue when current function was awaiting. -
Queue.join
function does not care about the number of items in the queue. In code you can see that I callQueue.qsize()
afterawait queue.join()
. And it returns non-zero value.
I guess here should be a check of theqsize
.
Why doesQueue
class watch at the internal counter oftask_done
calls instead of actual number of items in queue?
So if somewhere in your codetask_done
will be called by mistake you will anyway pass throughqueue.join()
with unprocessed items in queue. I would not like this behaviour.
It would be nice to have at least some warning in such cases.