Adding Queue.as_list()

I’d like to propose a very simple helper method asyncio.Queue.as_list() returning a shallow copy of the queue contents. (I’m working with asyncio, but this idea may concern other queue types as well.)

A typical use-case is a FIFO of jobs processed by worker(s). When such application shuts down and doesn’t have the time to drain the queue by processing all jobs and doesn’t want to lose them, it can save the queue contents to a disk. Next time it starts, it will pre-fill the new queue with saved data and can continue the work where it stopped.

A related idea is checkpointing, i.e. to save the contents from time to time, not only at exit. Getting the contents without modifying the queue and without touching the internals, just with gets and puts is awkward.

Would the queue have to be locked during the as_list operation?

In asyncio is no locking required. In threading it is required.

In asyncio the whole function body would be just return list(self._queue). But without accessing private data structures, a copy requires a full “rotation” of the queue and adjustment of the unfinished tasks count. It is ugly (I think) and inefficient (I’m sure it is).

qcopy = []
for _ in range(len(q)):
    q.put_nowait(item := q.get_nowait())
    q.task_done()
    qcopy.append(item)

This is typically a place where setting a maxsize on the queue and propagating backpressure should be thought of first.

fill the queue less eagerly (and/or propagate backpressure if the queue is being filled from another process) and you shouldn’t ever have a significant amount of outstanding work in queue to be written elsewhere rather than just finished as part of a shutdown stage.

2 Likes

Even when following the best practices, I find the probability of having queued jobs that can’t be processed quickly quite high. A transient failure of some external system is all that is needed.