On a short note, maybe building an API for this on top of TaskGroup, instead of a callable could lead to a more readable pattern - and it would be easy to do by subclassing asyncio.TaskGroup.
What do you think?
On a short note, maybe building an API for this on top of TaskGroup, instead of a callable could lead to a more readable pattern - and it would be easy to do by subclassing asyncio.TaskGroup.
What do you think?
Thanks for your suggestion! I think a TaskGroup could be used to manage the workers and feeders.
Iâm not sure whether subclassing or composition is the right way to go, but subclassing would add a little more flexibility, so itâs worth consideration.
asyncio design discourages inheritance except for implementing asyncio protocols, which are basically pure interfaces.
Thus, inheriting from TaskGroup is not a good idea.
Also,
I have a small âextraasyncâ package for asynchronous utils here -
and I decided to add a âmax_concurrencyâ parameter to my ExtraTaskGroup
class there, based on this idea.
If you feel like contributing your pipeline call, which can be altogether nice,
please, fill an issue there with details, and some code if you have. (I intend to
release the âmax_concurrencyâ feature soon)
(dunno - I made the Semaphore embeded task group, and it doesnât look that useful now it is done. Maybe a nice pipeline build, allowing one to state per-stage parallelism, if needed, can be more interesting) -
Your example would read like this, using the TG approach:
# Original example by Martin JurÄa on Python-ideas discourse
import asyncio
from extraasync import ExtraTaskGroup
import aiofiles
async def generate_completion(item: str) -> str | None:
...
return await some_api_client.do_stuff(item)
async def main():
async with (
aiofiles.open("input.txt", "r") as in_f,
aiofiles.open("output.txt", "w") as out_f,
ExtraTaskGroup(max_concurrency=16) as tg,
):
async for line in in_f:
await out_f.write(f"{await tg.create_task(generate_completion(line))}\n")
asyncio.run(main())
The main âissueâ being there is no visible gain over using an explicit
Semaphore context block inside the for loop:
async def main():
semaphore = asyncio.Semaphore(16)
async with (
aiofiles.open("input.txt", "r") as in_f,
aiofiles.open("output.txt", "w") as out_f,
):
async for line in in_f:
with semaphore:
result = await generate_completion(line)
await out_f.write(f"{result}\n")
@jsbueno You are right that your current design for ExtraTaskGroup does not seem to provide any benefit over the semaphore-based one - because it does not actually achieve any parallelism, due to your code awaiting each item right after submitting it, thus processing them in sequence.
Here is a snippet demonstrating it:
import asyncio
import random
async def async_range(end):
for i in range(end):
yield i
active_tasks = 0
async def foo():
global active_tasks
active_tasks += 1
print(f"Active tasks: {active_tasks}")
await asyncio.sleep(random.random())
active_tasks -= 1
async def main():
semaphore = asyncio.Semaphore(16)
async for _ in async_range(10):
async with semaphore:
await foo()
asyncio.run(main())
The output will contain âActive tasks: 1â on every line.
In order to achieve parallel execution of tasks, you will need to handle task orchestration within the ExtraTaskGroup, providing items and a mapping callback, line pipeline or Executor does - otherwise youâll need to keep track of running tasks, current parallelism, and procuring results yourself.