Concurrent asynchronous iteration

I want to iterate over an async iterable such that execution of the producer overlaps with execution of the consumer instead of them just merely alternating. Here is how I do it:

from asyncio import CancelledError, Task, create_task
from collections.abc import AsyncIterable


async def iter_fast[T](iterable: AsyncIterable[T]):
    iterator = aiter(iterable)
    pending: Task[T] | None = None
    while True:
        try:
            if pending is None:
                item = await anext(iterator)
            else:
                item = await pending
        except StopAsyncIteration:
            break
        else:
            pending = create_task(anext(iterator))
            yield item

My concern with this implementation is that there’s no way to dispose of the pending task in case there’s an exception on the consumer side:

from asyncio import run, sleep


async def get_data(n: int):
    for i in range(n):
        await sleep(1)
        yield i


async def main():
    async for item in iter_fast(get_data(3)):
        raise Exception('test')

run(main())

Is there a solution to that?

Sorry if this comes out as a stupid question, but why do you need iter_fast in the first place? I’m not sure why not just use async for item in get_data(3):.

I want execution of the body of the async for (which makes another async call in my case) to overlap with execution of get_data to exploit more concurrency.

So - an async generator not being re-entered before it is exhausted is a known problem, and not easy to solve. The event loop has been, over the years, updated so that it will close any running generators when the loop shuts down, but that won’t fix things for you.

What is possible in this case, and probably an approach I’d do if I had a similar pattern is to have a “house-keeping” task: I’d annotate related producers and consumers somewhere where this task could see then (likely a queue, but it could be a dictionary) - and the house-keeping task would from time to time check the health of each consumer/producer pair, and cancel the producer in case the consumer is done.

Actually, if the consumer wlll be aborted, it is possible to cancel the producer as a done callback of itself, with no need for a separate house keeping task.

In your example, something along this pattern:


async def main():
    producer = iter_fast(get_data(3))
    asyncio.current_task().add_done_callback(lambda _: asyncio.create_task(producer.aclose()) )
    async for item in producer:
        raise Exception('test')

run(main())

(However, the “aclose” task itself would have to be awaited. And for that, the done callback would have to add that call to a data structure where a collector task would await for it - so I guess one can’t escape the “house keeping” task anyway. But I believe that is the pattern to be explored, anyway.

1 Like