When working with queue = asyncio.Queue(), there is a way to get the number of items currently waiting to be processed using queue.qsize(). However, there is no way to get the number of tasks currently being processed. The queue internally keeps track of these items in queue._unfinished_tasks, although that attribute it is not intended to be used by a consumer since it starts with a leading underscore. The same applies to queue._is_shutdown.
It would be nice to expose these in the public facing API.
Consider a case where there is a task report which needs to run for the duration of the queue.
import random
import asyncio
q = asyncio.Queue()
stats = {"remaining": 0, "processing": []}
async def report():
# ideally run until all items
# have finished processing
# q._unfinished_tasks > 0
while q.qsize() > 0:
stats["remaining"] = q.qsize()
print(stats)
await asyncio.sleep(0.5)
async def task():
while q.qsize() > 0:
x = await q.get()
stats["processing"].append(x)
await asyncio.sleep(random.random())
stats["processing"].remove(x)
q.task_done()
async def main():
for x in range(10):
q.put_nowait(x)
await asyncio.gather(report(), task(), task())
if __name__ == "__main__":
asyncio.run(main())
The script never prints remaining: 0 because even when q.qsize() == 0, items are still processing. There is no easy way to know when all items have finished processing.
Thank you for your suggestion, but I think the crux of this post is to not use attributes intended for internal use. If I were to use q._unfinished_tasks I could just replace the task() function.
async def task():
while q._unfinished_tasks > 0:
...
In case there are no plans to expose internal attributes publicly, inheriting from asyncio.queue would be another alternative as you suggested.