Expose unfinished tasks and shutdown status in asyncio.Queue

When working with queue = asyncio.Queue(), there is a way to get the number of items currently waiting to be processed using queue.qsize(). However, there is no way to get the number of tasks currently being processed. The queue internally keeps track of these items in queue._unfinished_tasks, although that attribute it is not intended to be used by a consumer since it starts with a leading underscore. The same applies to queue._is_shutdown.

It would be nice to expose these in the public facing API.

1 Like

What is the use-case for exposing those as public attributes?

Consider a case where there is a task report which needs to run for the duration of the queue.

import random
import asyncio

q = asyncio.Queue()
stats = {"remaining": 0, "processing": []}


async def report():
    # ideally run until all items
    # have finished processing
    # q._unfinished_tasks > 0
    while q.qsize() > 0:
        stats["remaining"] = q.qsize()
        print(stats)
        await asyncio.sleep(0.5)


async def task():
    while q.qsize() > 0:
        x = await q.get()
        stats["processing"].append(x)
        await asyncio.sleep(random.random())
        stats["processing"].remove(x)
        q.task_done()


async def main():
    for x in range(10):
        q.put_nowait(x)

    await asyncio.gather(report(), task(), task())


if __name__ == "__main__":
    asyncio.run(main())

The script never prints remaining: 0 because even when q.qsize() == 0, items are still processing. There is no easy way to know when all items have finished processing.

{'remaining': 10, 'processing': []}
{'remaining': 8, 'processing': [0, 1]}
{'remaining': 5, 'processing': [2, 4]}
{'remaining': 5, 'processing': [2, 4]}
{'remaining': 2, 'processing': [5, 7]}

In main coroutine, when gather ends, q.qsize() and q._unfinished_tasks are set to 0.

async def main():
    for x in range(10):
        q.put_nowait(x)

    await asyncio.gather(report(), task(), task())
    print(f'{q.qsize() = } / {q._unfinished_tasks = }')

If you need to modify certain behaviors, add new attributes, perhars you can create your own queue by inheriting from asyncio.queue.

Thank you for your suggestion, but I think the crux of this post is to not use attributes intended for internal use. If I were to use q._unfinished_tasks I could just replace the task() function.

async def task():
    while q._unfinished_tasks > 0:
        ...

In case there are no plans to expose internal attributes publicly, inheriting from asyncio.queue would be another alternative as you suggested.

My reply about inheriting concerned just this part