Adding a Limit Parameter to TaskGroup for Concurrent Coroutines Management

The TaskGroup in asyncio is a fantastic abstraction that simplifies the spawning and management of multiple tasks. However, one limitation that has become apparent is the inability to limit the number of concurrently running tasks within a TaskGroup. In scenarios where resource constraints are a concern or when dealing with rate-limited APIs, this capability becomes crucial.

Proposal

I propose the introduction of a limit parameter to the TaskGroup class. This parameter would allow developers to specify the maximum number of tasks that can be concurrently executed within the group. The behaviour would align with how semaphore limits work but integrated directly into the task management flow of TaskGroup.

A similar feature has already been discussed previously but since has been forgotten about

We have created a simple implementation of this API. The main difference is we want to be able to share the bound between different groups, as we want to cap total fanout not just per-task.

If a task spawned by the TaskGroup spins up its own TaskGroup and starts spawning sub-tasks, should those tasks be subject to the same limit? I think it kind of depends on the specific reason you’re imposing a task limit.

Take the API rate limiting use case, for example. If those sub-tasks are all making API calls against the same rate-limited endpoint, I guess you want them included in the overall task limit. But if they’re doing something else asynchronously, like writing the results to a local file, or pushing them to a database, or talking to another API with its own separate rate limit, then it doesn’t make sense to apply the same task limit to those tasks anymore.

Basically, an API like this requires you to know the implementation details of the whole call stack below you in order to set a useful task limit. Whether it limits the whole tree of tasks or just the immediate sub-tasks of the caller, as soon as a callee introduces or removes a level of sub-tasks, your limiter behavior changes.

For a practical solution already present in the standard library, you can use asyncio.Semaphore for task limiting.[1]

async def top_level_task():
    limiter = asyncio.Semaphore(4)  # Only 4 tasks may run at the same time.

    async with asyncio.TaskGroup() as tg:
        for i in range(10):
            tg.create_task(intermediate_task(i, limiter))


async def intermediate_task(n, limiter):
    # This function is subject to the task limit, and creates sub-tasks of its own.
    # The task limit is applied to some of these tasks, but not others.
    async with limiter:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(limited_leaf_task(limiter))
            tg.create_task(unlimited_leaf_task())


async def limited_leaf_task(limiter):
    # This function is subject to the task limit.
    async with limiter:
        ...

async def unlimited_leaf_task():
    # This function is not subject to the task limit.
    ...

Note that with this approach you can create as many separate limiters in as many scopes as you want.
Sure, it’s a little more verbose than asyncio.TaskGroup(limit=10), but it’s a whole lot more flexible.


  1. Edit: I realized after posting you mentioned Semaphore in your initial post, so I guess you’re already aware of it and how to apply it to this problem. Still, the example may be helpful for other readers. ↩︎

2 Likes

Also related BoundedTaskGroup to control parallelism

I note also, it seems inefficient to spin up 1000 tasks simultaneously when you know you’ll only execute 10 at once. We made our implementation block task creation on the semaphore, not just task execution.

Unfortunately that means that method needs to become async, which means this cannot just be a parameter on the existing TaskGroup class.

1 Like