Add task pipeline to asyncio with capped parallelism and lazy input reading

Motivation

I would like to propose adding a standard way to implement async data processing pipelines that are memory-friendly, enable input data stream being generated lazily or dynamically, and enable easy capping of parallelism.

I have noticed that some people struggle with implementing this in a way that does not create a task per input item at the start (and ultimately crash the interpreter for large batches) - and neither OpenAI o1 nor Github Copilot could generate an implementation I would consider good and robust either, unfortunatelly.

Using asyncio.as_completed or asyncio.gather requires creating all tasks beforehand, which prevents generating them dynamically on-the-fly as new information becomes available (like in the case of a web crawler), and is very memory-intensive or impractical if all the input data cannot be fit into the memory. The lack of proper support for backpressure in custom simple implementations also - in some cases - leads to gradually increasing the application’s memory footprint until the interpreter crashes.

Such utility would be useful for example for communicating with rate-limited remote servers when processing a huge or dynamically created stream of input data - e.g. web crawling, generating LLM completions, etc.

I have seen this attempted in many Jupyter notebooks evaluating downstream tasks on LLMs or LLMs themselves.

Proposed API

I would like to propose adding the following function to the asyncio module:

async def pipeline[T, R](
    iterable: AsyncIterable[T] | Iterable[T],
    func: Callable[[T], R | Awaitable[R]],
    *args, # passed to func
    parallelism: int = 4,
    **kwargs, # passed to func
) -> AsyncIterable[R]:
    ...

I have no strong feelings about the function’s name, maybe map_concurrent or map_parallel would be better.

To make the usage as convenient as possible, the implementation should also handle backpressure to adjust the speed of reading its input and avoid consuming too much memory using internal buffers. Additionally, the results should be re-ordered internally to match the input order, so that there is no need for item identifiers and re-ordering on the calling code’s part.

Usage demonstration

async def generate_completion(item: str) -> str | None:
    # Call some 3rd party API to generate output from item
    try:
        result = await some_api_client.do_stuff(item)
        return result
    except BaseException as e:
        logging.error(e)
        return None

async def main():
    async with aiofiles.open("input.txt", "r") as in_f, aiofiles.open("output.txt", "w") as out_f:
        async for result in asyncio.pipeline(in_f, generate_completion, parallelism=16):
            await out_f.write(f"{result}\n")

asyncio.run(main())

Design considerations

The proposed API could be at a later point extended by allowing custom callback / object passed to the parallelism parameter as a custom dynamic parallelism controller, but I did not want to overcomplicate things. Another open question is whether overriding the executor should be supported, per-pipeline or per-task.

I would also like to recognize that result reordering might not always be necessary, and as it can under some situations result in lower overall throughput, maybe it should be configurable - but I would strongly recommend to make it enabled by default.

I suggest that any errors raised from func would cancel the pipeline and be raised from the returned AsyncIterable.

An alternative approach to address this use case could be to allow passing AsynIterable[Awaitable[T]] to asyncio.gather and asyncio.as_completed, along with a parallelism cap.

Existing work

I am currently aware of the following libraries related to this:

  • aioitertools - does not support limiting task parallelism
  • aiostream - provides this functionality through aiostream.stream.map, however, I think this is worth considering for addition into the standard library, as I consider this to be a common enough pattern. Additionally, the library is not compatible with tqdm, so many Jupyter users (within my social bubble) do not want to use it, unfortunately.
  • AnyIO - does not appear to support this pattern (or I missed it).
  • Slurry + Trio - does not appear to support this pattern (or I missed it).

Example implementation

Here I attach an example implementation, licensed as Public Domain.

It does not handle errors, I would add that in a hypothetical PR should there be interest in this (adding an error | None item to the tuples in output_queue seems like the most straightforward way).

# SPDX-License-Identifier: CC-PDM-1.0

async def pipeline[T, R](
    iterable: AsyncIterable[T] | Iterable[T],
    func: Callable[[T], R | Awaitable[R]],
    *args,
    parallelism: int = 4,
    **kwargs,
) -> AsyncIterable[R]:
    """
    Asynchronously map a function over an (a)synchronous iterable with bounded
    parallelism.

    Allows concurrent processing of items from the given iterable, up to a
    specified parallelism level. Takes a function that can be either synchronous
    (returning R) or asynchronous (returning Awaitable[R]) and returns an async
    iterable producing results in input order as soon as they become available.
    Internal queues are bounded, preventing consumption of the entire iterable
    at once in memory.

    Args:
        iterable:
            The source of items to process. Can be a synchronous or asynchronous
            iterable.
        func:
            The mapping function to apply to each item. May be synchronous
            (returning R) or asynchronous (returning Awaitable[R]). All *args
            and **kwargs are forwarded to this function.
        parallelism (int):
            Maximum number of concurrent worker tasks. Defaults to 4.
        *args:
            Extra positional arguments passed on to `func`.
        **kwargs:
            Extra keyword arguments passed on to `func`.

    Yields:
        R: The result of applying `func` to each item, in the same order as
        their corresponding inputs.  

    Notes:
        - If the callback is synchronous, it will be invoked directly in the
          event loop coroutine, so consider wrapping it with asyncio.to_thread()
          if blocking is significant.
        - This implementation uses internal queues to avoid reading from
          `iterable` too far ahead, controlling memory usage.
        - Once an item finishes processing, its result is enqueued and will be
          yielded as soon as all previous results have also been yielded.
        - If the consumer of this async iterable stops consuming early, workers
          may block while attempting to enqueue subsequent results. It is
          recommended to cancel this coroutine in such case to clean up
          resources if it is no longer needed.
        - If the work for some items is very slow, intermediate results are
          accumulated in an internal buffer until those slow results become
          available, preventing out-of-order yielding.
    """

    input_terminator = cast(T, object())
    output_terminator = cast(R, object())
    input_queue = asyncio.Queue[tuple[int, T]](parallelism)
    output_queue = asyncio.Queue[tuple[int, R]](parallelism)
    feeding_stop = asyncio.Event()
    last_fed = -1
    next_to_yield = 0
    early_results: dict[int, R] = {}

    async def _worker() -> None:
        while True:
            index, item = await input_queue.get()
            if item is input_terminator:
                input_queue.task_done()
                break
            result = func(item, *args, **kwargs)
            if isinstance(result, Awaitable):
                result = cast(R, await result)
            await output_queue.put((index, result))
            input_queue.task_done()

        await output_queue.put((-1, output_terminator))

    def _as_async_iterable(
        iterable: AsyncIterable[T] | Iterable[T],
    ) -> AsyncIterable[T]:
        if isinstance(iterable, AsyncIterable):
            return iterable

        async def _sync_to_async_iterable() -> AsyncIterable[T]:
            for item in iterable:
                yield item

        return _sync_to_async_iterable()

    async def _feeder() -> None:
        nonlocal last_fed
        async for item in _as_async_iterable(iterable):
            if len(early_results) >= parallelism:
                # There is an item that is taking very long to process. We need
                # to wait for it to finish to avoid blowing up memory.
                await feeding_stop.wait()
                feeding_stop.clear()

            last_fed += 1
            await input_queue.put((last_fed, item))

        for _ in range(parallelism):
            await input_queue.put((-1, input_terminator))

    async def _consumer() -> AsyncIterable[R]:
        nonlocal next_to_yield
        remaining_workers = parallelism
        while remaining_workers:
            index, result = await output_queue.get()
            if result is output_terminator:
                remaining_workers -= 1
                output_queue.task_done()
                continue

            early_results[index] = result
            while next_to_yield in early_results:
                # The feeding lock is set only when the results can be yielded
                # to prevent the early results from growing too much.
                feeding_stop.set()

                yield early_results.pop(next_to_yield)
                next_to_yield += 1
            output_queue.task_done()

    tasks = [
        asyncio.create_task(_worker()) for _ in range(parallelism)
    ] + [asyncio.create_task(_feeder())]

    try:
        async for result in _consumer():
            yield result
    finally:
        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

Thanks for raising the topic! I agree, applying async function to every item of async iterator is quite common problem.

Error handling is a strong challenge for the proposed design.
The if error is not None... check for each iterator element is tiresome; Python usually raises exceptions instead of returning them.
Yes, I know about return_exceptions argument for gather() but I still think that it is a little unnatural.
In itertools, if a predicate fails the whole function raises an exception. I personally prefer this approach, a user could use try/except in a callable on its own if needed.

Speaking of await gather(tasks, return_exceptions=True) I would say that in my real code I always need to know what task was failed even for meaningful logging; more complicated error handling requires more context than just exception value.

There are other things to consider: should the api work with sync callables and iterators, should it keep the order or support reordering for faster execution, some implementation notes; but all these questions could be discussed after making decision about error handling.

As an alternative, I could propose something like concurrent.futures.Executor but with async API.

class AsyncExecutor:
    def __init__(self, max_workers: int) -> None: ...
    async def submit(self, fn, /, *args, **kwargs) -> R: ... 
    async def map(self, fn, /, *iterables, timeout: float|None=None) -> AsyncIterator[R]: ...
    async def shutdown(self) -> None: ...

Like in concurrent.futures, if a fn call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

Thank you for such a quick reply!

Yes, I think raising the exception would be better than returning them. If the calling code requires the exceptions to be returned by the AsyncIterable returned from pipeline instead, it’s easy to implement at the caller’s code by wrapping func.

Here is an updated implementation proposal that handles exceptions in this way:

# SPDX-License-Identifier: CC-PDM-1.0

async def pipeline[T, R](
    iterable: AsyncIterable[T] | Iterable[T],
    func: Callable[[T], R | Awaitable[R]],
    *args,
    parallelism: int = 4,
    **kwargs,
) -> AsyncIterable[R]:
    """
    Asynchronously map a function over an (a)synchronous iterable with bounded
    parallelism.

    Allows concurrent processing of items from the given iterable, up to a
    specified parallelism level. Takes a function that can be either synchronous
    (returning R) or asynchronous (returning Awaitable[R]) and returns an async
    iterable producing results in input order as soon as they become available.
    Internal queues are bounded, preventing consumption of the entire iterable
    at once in memory.

    Args:
        iterable:
            The source of items to process. Can be a synchronous or asynchronous
            iterable.
        func:
            The mapping function to apply to each item. May be synchronous
            (returning R) or asynchronous (returning Awaitable[R]). All *args
            and **kwargs are forwarded to this function.
        parallelism (int):
            Maximum number of concurrent worker tasks. Defaults to 4.
        *args:
            Extra positional arguments passed on to `func`.
        **kwargs:
            Extra keyword arguments passed on to `func`.

    Yields:
        R: The result of applying `func` to each item, in the same order as
        their corresponding inputs.  

    Notes:
        - If the callback is synchronous, it will be invoked directly in the
          event loop coroutine, so consider wrapping it with asyncio.to_thread()
          if blocking is significant.
        - This implementation uses internal queues to avoid reading from
          `iterable` too far ahead, controlling memory usage.
        - Once an item finishes processing, its result is enqueued and will be
          yielded as soon as all previous results have also been yielded.
        - If the consumer of this async iterable stops consuming early, workers
          may block while attempting to enqueue subsequent results. It is
          recommended to cancel this coroutine in such case to clean up
          resources if it is no longer needed.
        - If the work for some items is very slow, intermediate results are
          accumulated in an internal buffer until those slow results become
          available, preventing out-of-order yielding.
    """

    input_terminator = cast(T, object())
    output_terminator = cast(R, object())
    input_queue = asyncio.Queue[tuple[int, T]](parallelism)
    output_queue = asyncio.Queue[tuple[int, R, Optional[BaseException]]](
        parallelism,
    )
    feeding_stop = asyncio.Event()
    last_fed = -1
    next_to_yield = 0
    early_results: dict[int, R] = {}

    async def _worker() -> None:
        while True:
            index, item = await input_queue.get()
            if item is input_terminator:
                input_queue.task_done()
                break
            try:
                result = func(item, *args, **kwargs)
                if isinstance(result, Awaitable):
                    result = cast(R, await result)
            except BaseException as exception:
                await output_queue.put((index, output_terminator, exception))
                return
            await output_queue.put((index, result, None))
            input_queue.task_done()

        await output_queue.put((-1, output_terminator, None))

    def _as_async_iterable(
        iterable: AsyncIterable[T] | Iterable[T],
    ) -> AsyncIterable[T]:
        if isinstance(iterable, AsyncIterable):
            return iterable

        async def _sync_to_async_iterable() -> AsyncIterable[T]:
            for item in iterable:
                yield item

        return _sync_to_async_iterable()

    async def _feeder() -> None:
        nonlocal last_fed
        async for item in _as_async_iterable(iterable):
            if len(early_results) >= parallelism:
                # There is an item that is taking very long to process. We need
                # to wait for it to finish to avoid blowing up memory.
                await feeding_stop.wait()
                feeding_stop.clear()

            last_fed += 1
            await input_queue.put((last_fed, item))

        for _ in range(parallelism):
            await input_queue.put((-1, input_terminator))

    async def _consumer():
        nonlocal next_to_yield
        remaining_workers = parallelism
        while remaining_workers:
            index, result, exception = await output_queue.get()
            if result is output_terminator:
                remaining_workers -= 1
                output_queue.task_done()
                if exception:
                    raise exception
                continue

            early_results[index] = result
            while next_to_yield in early_results:
                # The feeding lock is set only when the results can be yielded
                # to prevent the early results from growing too much.
                feeding_stop.set()

                yield early_results.pop(next_to_yield)
                next_to_yield += 1
            output_queue.task_done()

    consumer_coro = _consumer()
    tasks = [
        asyncio.create_task(_worker()) for _ in range(parallelism)
    ] + [asyncio.create_task(_feeder())]

    try:
        async for result in consumer_coro:
            yield result
    finally:
        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

I have also considered the following alternatives when implementing exception handling:

  • A local variable storing the first encountered exception - I rejected the idea as that essentially adds another communication channel and state to synchronize.
  • Calling athrow on the iterator created by calling _consumer() - This either resulted in RuntimeError: athrow(): asynchronous generator is already running or overly complicated code.

I like your idea of implementing this as asyncio.AsyncExecutor to match the concurrent.futures.Executor API, it would be a more flexible tool, and the consistency would be probably welcome by consumers of the API.

1 Like

I have a feeling that AsyncExecutor is the way to go; a functional version could be built on top of it if needed.

@missing-stardust would you work on pull request? I’m happy to review it, mentor if needed, and merge the PR finally.

Thank you for your support and this opportunity. I would be honored to work on this pull request.

I would also be grateful for your mentorship in this endeavor. This is my first contribution to Python (I have contributed to other open source projects previously).

Regarding the implementation, I have a couple of questions to ensure we’re on the same page:

  • Should asyncio.AsyncExecutor (or just asyncio.Executor) be an abstract class? This would enable easier addition of implementations with different parallelism-handling strategies.
  • If AsyncExecutor were to be an abstract class, should the concrete class be named FixedWorkerPoolExecutor to better describe its behavior and create room for alternatives with dynamic scaling of parallelism?

Please let me know if you have any preferences on these. I am also open to any other suggestions before I start working on this pull request.

asyncio.Executor is just fine, putting it into .Lib/asyncio/executor.py could be a good place to start.

Regarding abstract class or not – I don’t have a strong preference. What strategies are you talking about? Could you enumerate a few?

I general, I suggest starting with the single class. Later, during the review, we can change it. Other asyncio contributors will take a part in the review process, sure. Let’s see how things go; existing PR’s code is a crucial point for the future discussion.
Tests are required early, docs could be added later.

Thank you for the pointers. I’ll get to work and submit a PR once I have something worth showing, then I’ll link it here.

Other strategies I have considered:

  • Adaptive scaling between min and max parallelism - the executor would gradually increase parallelism until reaching max or func raises a TooManyOperationsException. The executor would decrease parallelism whenever a TooManyOperationsException is raised and automatically retry the failed operation after a configured delay. After a different configured delay, the executor could try increasing parallelism again. This could be useful when communicating with servers that apply rate limits on IP addresses as well.
  • Letting the calling code provide the logic for adjusting parallelism level on the fly. The strategy above could be implemented as a specialization of this one.

The currently proposed implementation would then be a specialization of the first one - with min=max.

Granted, these are more advanced and rarely used. It would probably be safer to start with a single implementation and then see what needs actually arise.

1 Like

aiometer is what I’d use

1 Like

Looks good, thanks for the link.

I’ve just submitted the PR: Add asyncio.Executor matching concurrent.futures.Executor by jurca · Pull Request #129769 · python/cpython · GitHub

Thank your in advance for reviewing it. I’ll be happy to add the documentation once we agree on the API.

I apologize for taking so long, getting the cancellation work reliably in every scenario took some effort.

1 Like

How about packaging it as a third party library like aioexecutor or something?

I don’t think this needs to be in the standard library, it will cause significant maintenance burden for the already few maintainers.

@kumaraditya303 Thank you for sharing your concerns about the potential maintenance burden. I understand it’s important to keep the standard library maintainable for the core developers.

However, I believe that adding this functionality to the standard library would bring benefits to the Python community.

Asynchronous data processing pipelines with controlled parallelism are a common pattern in modern Python applications, especially with the rise of async/await, as acknowledged by @asvetlov.

Furthermore, including this in the standard library would provide a consistent and reliable solution that can be widely adopted, reducing the fragmentation caused by multiple third-party libraries, and improve the quality of many codebases that currently implement their own solutions. Since creating a robust and reliable implementation is nontrivial, providing a standard solution would be highly beneficial.

Finally, the proposed asyncio.Executor aligns with existing concepts in concurrent.futures and asyncio, providing a familiar API for users, making it easy to adapt.

Regarding the maintenance burden, I believe the current implementation is concise (226 lines, including imports and blank lines) and designed to be straightforward to maintain. By going with a single class with a well-defined API, we can limit complexity and avoid unnecessary expansions unless there’s a clear need in the future.

I am committed to helping maintain this addition and addressing any issues that arise. With support from core developers like @asvetlov, who has offered to review and mentor the PR, I hope we can ensure this feature is sustainably maintained. Of course, I will respect the maintainers’ consensus on this matter.

I considered using existing third-party libraries, as listed in my original post, but they often lack the required features or compatibility with other tools (such as tqdm in Jupyter notebooks). Including this functionality in the standard library would provide a solid foundation for the ecosystem to build upon.

I’m eager to collaborate further and would appreciate any suggestions on how we can adjust the proposal to address your concerns. Please let me know if there are specific aspects of the implementation you’d like to discuss or modify.

I don’t see how this is an improvement on aiometer or why it should be included in the stdlib. This can be maintained more easily as a package on PyPI, maybe if it becomes very popular it will become obvious that it should be incorporated but until then it shouldn’t be added.

1 Like

I would like to point out that aioitertools does implement this sort of pipeline. aioitertools.asyncio.gather includes an option to limit the number of concurrent tasks, and specifically does not create more than that number of tasks up front.

https://aioitertools.omnilib.dev/en/stable/api.html#aioitertools.asyncio.gather

This is one of those things I end up creating again occasionally, but usually slightly different each time. Same building blocks, but different combinations.

I think it lives outside the standard library just fine, aiometer was pointed to earlier, and it’s about as close to comfortable as I could be with a general purpose solution. You can also do what was described in the first post pretty trivially with tasks and a semaphore if it’s just for capped parallelism while retaining laziness.

I would say that the proposal adds a concurrency primitive that has an interface similar to concurrent.futures.Executor.

The asyncio.locks module has locking primitives similar to threading ones.
I would say that I could implement my own locks quickly on top of asyncio futures, and locks should not be a part of asyncio.
In fact, locks.py is essentially a wrapper around futures, nothing more.
However, the module provides handy building blocks that asyncio users could share. Implementation is relatively simple but still it requires a deep understanding of how futures work; very many users don’t need to know about low-level asyncio API but want to use handy tools.

Executors live in stdlib for about 15 years; people are familiar with them. The proposal adds the same interface to asyncio world, replacing sync calls with async ones. No one mentioned library provides executor-like API. That’s why I think that async executors have a right to live, and that’s why they could be put into asyncio.

1 Like