About as_completed_safe()
I have mentioned an idea about addition of a safer version of asyncio.as_completed()
based on PersistentTaskGroup
or Supervisor
at a short in-person discussion with @yselivanov and @guido in PyCon last week. After some testing and experimentation, I’m now inclinded to deprecate this idea, even as a 3rd party library function. The reasons are:
After more experimentation, I could figure out how to properly handle both timeouts and exceptions from the block body. This eliminates the necessity of making timeout
changes for recognizing BaseExceptionGroup
because TimeoutError
is mutually exclusive to other exceptions happening.
It is impossible to distinguish the cancellation and other exceptions from the view of the async generator.-
The caller must have a complicated boilerplate to handle both potential exceptions inside the block andaclose()
. This complication comes from the use of async generators, not from the API’s purpose.- This difficultiy is also described at Using ExceptionGroup at Anthropic (experience report)
- We also need to carefully handle
GeneratorExit
andCancelledError
as written below.
async def as_completed_safe(coros):
q = asyncio.Queue()
tasks = set()
def result_callback(t: asyncio.Task) -> None:
tasks.discard(t)
q.put_nowait(t)
async with Supervisor() as supervisor:
for coro in coros:
t = supervisor.create_task(coro) : Task[Unknown]
t.add_done_callback(result_callback)
tasks.add(t)
await asyncio.sleep(0)
while True:
if not tasks:
return
try:
yield await q.get()
except (GeneratorExit, asyncio.CancelledError):
# GeneratorExit: injected when aclose() is called
# CancelledError: injected when a timeout occurs (i.e., the outer scope cancels the inner)
await supervisor.shutdown()
raise
Usage example:
@pytest.mark.asyncio
async def test_as_completed_safe_error_from_body():
executed = 0
cancelled = 0
loop_count = 0
async def do_job(delay, idx):
nonlocal cancelled, executed
try:
await asyncio.sleep(delay)
executed += 1
return idx
except asyncio.CancelledError:
cancelled += 1
raise
with VirtualClock().patch_loop():
results = []
with pytest.raises(ZeroDivisionError):
async with aclosing(as_completed_safe([
do_job(0.1, 1),
do_job(0.2, 2),
# cancellation occurs here
do_job(0.3, 3),
])) as ag:
async for result in ag:
results.append(await result)
loop_count += 1
if loop_count == 2:
raise ZeroDivisionError()
assert loop_count == 2
assert executed == 2
assert cancelled == 1
assert results == [1, 2]
This is why I’m now considering gather_safe()
which just returns a group result instead of an async generator.