Add task pipeline to asyncio with capped parallelism and lazy input reading

On a short note, maybe building an API for this on top of TaskGroup, instead of a callable could lead to a more readable pattern - and it would be easy to do by subclassing asyncio.TaskGroup.

What do you think?

Thanks for your suggestion! I think a TaskGroup could be used to manage the workers and feeders.

I’m not sure whether subclassing or composition is the right way to go, but subclassing would add a little more flexibility, so it’s worth consideration.

1 Like

asyncio design discourages inheritance except for implementing asyncio protocols, which are basically pure interfaces.
Thus, inheriting from TaskGroup is not a good idea.

Also,

I have a small “extraasync” package for asynchronous utils here -

and I decided to add a “max_concurrency” parameter to my ExtraTaskGroup
class there, based on this idea.

If you feel like contributing your pipeline call, which can be altogether nice,
please, fill an issue there with details, and some code if you have. (I intend to
release the “max_concurrency” feature soon)

(dunno - I made the Semaphore embeded task group, and it doesn’t look that useful now it is done. Maybe a nice pipeline build, allowing one to state per-stage parallelism, if needed, can be more interesting) -

Your example would read like this, using the TG approach:

# Original example by Martin Jurča on Python-ideas discourse

import asyncio
from extraasync import ExtraTaskGroup

import aiofiles


async def generate_completion(item: str) -> str | None:
    ...
    return await some_api_client.do_stuff(item)

async def main():
    async with (
        aiofiles.open("input.txt", "r") as in_f,
        aiofiles.open("output.txt", "w") as out_f,
        ExtraTaskGroup(max_concurrency=16) as tg,
    ):
        async for line in in_f:
            await out_f.write(f"{await tg.create_task(generate_completion(line))}\n")


asyncio.run(main())

The main “issue” being there is no visible gain over using an explicit
Semaphore context block inside the for loop:

async def main():
    semaphore = asyncio.Semaphore(16)
    async with (
        aiofiles.open("input.txt", "r") as in_f,
        aiofiles.open("output.txt", "w") as out_f,
    ):
        async for line in in_f:
            with semaphore:
                result = await generate_completion(line)
            await out_f.write(f"{result}\n")

@jsbueno You are right that your current design for ExtraTaskGroup does not seem to provide any benefit over the semaphore-based one - because it does not actually achieve any parallelism, due to your code awaiting each item right after submitting it, thus processing them in sequence.

Here is a snippet demonstrating it:

import asyncio
import random


async def async_range(end):
    for i in range(end):
        yield i


active_tasks = 0


async def foo():
    global active_tasks
    active_tasks += 1
    print(f"Active tasks: {active_tasks}")
    await asyncio.sleep(random.random())
    active_tasks -= 1


async def main():
    semaphore = asyncio.Semaphore(16)
    async for _ in async_range(10):
        async with semaphore:
            await foo()

asyncio.run(main())

The output will contain “Active tasks: 1” on every line.

In order to achieve parallel execution of tasks, you will need to handle task orchestration within the ExtraTaskGroup, providing items and a mapping callback, line pipeline or Executor does - otherwise you’ll need to keep track of running tasks, current parallelism, and procuring results yourself.