Whenever using asyncio.gather or a TaskGroup I find myself wrapping everything within a Semaphore. I’d rather just pass in my desired level of parallelism to the TaskGroup, or use a variant that wraps tasks on my behalf.
If people think this would make sense as a patch on the TaskGroup, or another subclass included in the stdlib I’ll have a go at making a PR. Here’s a simple example of what I mean:
import asyncio
async def test(i):
print(f'running {i}')
await asyncio.sleep(1)
class BoundedTaskGroup(asyncio.TaskGroup):
def __init__(self, *args, max_parallelism = 0, **kwargs):
super().__init__(*args)
if max_parallelism:
self._sem = asyncio.Semaphore(max_parallelism)
else:
self._sem = None
def create_task(self, coro, *args, **kwargs):
if self._sem:
async def _wrapped_coro(sem, coro):
async with sem:
return await coro
coro = _wrapped_coro(self._sem, coro)
return super().create_task(coro, *args, **kwargs)
async def main():
async with BoundedTaskGroup(max_parallelism=2) as g:
for i in range(10):
g.create_task(test(i))
asyncio.run(main())