Asyncio.Queue.task_done is not bound to particular object or `Queue.get` call. It can pass queue.join() with unprocessed objects in queue

I have a question about asyncio.Queue. In documentation there is an example with multiple workers. And they use the task_done call to inform queue about the finished task.

I’ve played a little bit with this example and found the task_done function ambiguous.
It is not related to particular object or Queue.get call.
And it can be called even without queue.get().
And, what is important, multiple calls of it can break queue.join() barrier. Even if the queue still has objects to process.

Here is my modified code:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:.2f} seconds')

async def bad_worker(name, queue: asyncio.Queue):
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.1)
        queue.task_done()
        print(f"{i}. Bad worker released task without even getting it. Tasks in queue: {queue.qsize()}.")

async def main():
    queue = asyncio.Queue()
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    tasks.append(asyncio.create_task(bad_worker('bad_worker', queue)))

    started_at = time.monotonic()
    await queue.join()
    print(f'AFTER queue.join() WE HAVE {queue.qsize()} TASKS IN QUEUE.')
    total_slept_for = time.monotonic() - started_at

    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

Here you can find function bad_worker which doesn’t take any task from queue. But it decrements
tasks count by queue.task_done().
And after 20 task_done calls in summary from good workers and bad worker we pass through queue.join().

Is it intended behavior?


I don’t like 2 moments here. Maybe I just don’t understand all the circumstances.

  1. Queue.task_done() is not related to a particular object or Queue.get call.
    So if somewhere in code I need to notify queue with .task_done() I cannot be sure that I notify about an object which I got with Queue.get. Some other functions may have already notified queue when current function was awaiting.

  2. Queue.join function does not care about the number of items in the queue. In code you can see that I call Queue.qsize() after await queue.join(). And it returns non-zero value.
    I guess here should be a check of the qsize.
    Why does Queue class watch at the internal counter of task_done calls instead of actual number of items in queue?
    So if somewhere in your code task_done will be called by mistake you will anyway pass through queue.join() with unprocessed items in queue. I would not like this behaviour.
    It would be nice to have at least some warning in such cases.

Yeah, that’s how it’s designed. You just need to have some discipline in your code. If that’s not what you want, you can always wrap the queue in a different mechanism that you design yourself.

Ok, I accept the need for discipline from the developer side.
But is it that bad to have a warning in queue.join() about non-zero value of items in the queue?
It could help in debugging a lot.