BoundedTaskGroup to control parallelism

Whenever using asyncio.gather or a TaskGroup I find myself wrapping everything within a Semaphore. I’d rather just pass in my desired level of parallelism to the TaskGroup, or use a variant that wraps tasks on my behalf.

If people think this would make sense as a patch on the TaskGroup, or another subclass included in the stdlib I’ll have a go at making a PR. Here’s a simple example of what I mean:

import asyncio

async def test(i):
    print(f'running {i}')
    await asyncio.sleep(1)

class BoundedTaskGroup(asyncio.TaskGroup):
    def __init__(self, *args, max_parallelism = 0, **kwargs):
        super().__init__(*args)
        if max_parallelism:
            self._sem = asyncio.Semaphore(max_parallelism)
        else:
            self._sem = None
    
    def create_task(self, coro, *args, **kwargs):
        if self._sem:
            async def _wrapped_coro(sem, coro):
                async with sem:
                    return await coro
            coro = _wrapped_coro(self._sem, coro)

        return super().create_task(coro, *args, **kwargs)

async def main():
    async with BoundedTaskGroup(max_parallelism=2) as g:
        for i in range(10):
            g.create_task(test(i))

asyncio.run(main())
2 Likes

Thank you for sharing! Your code saved my day :smiley:

1 Like