Revisiting PersistentTaskGroup with Kotlin's SupervisorScope

About as_completed_safe()

I have mentioned an idea about addition of a safer version of asyncio.as_completed() based on PersistentTaskGroup or Supervisor at a short in-person discussion with @yselivanov and @guido in PyCon last week. After some testing and experimentation, I’m now inclinded to deprecate this idea, even as a 3rd party library function. The reasons are:

After more experimentation, I could figure out how to properly handle both timeouts and exceptions from the block body. This eliminates the necessity of making timeout changes for recognizing BaseExceptionGroup because TimeoutError is mutually exclusive to other exceptions happening.

  • It is impossible to distinguish the cancellation and other exceptions from the view of the async generator.
  • The caller must have a complicated boilerplate to handle both potential exceptions inside the block and aclose(). This complication comes from the use of async generators, not from the API’s purpose.
async def as_completed_safe(coros):
    q = asyncio.Queue()
    tasks = set()

    def result_callback(t: asyncio.Task) -> None:
        tasks.discard(t)
        q.put_nowait(t)

    async with Supervisor() as supervisor:
        for coro in coros:
            t = supervisor.create_task(coro) : Task[Unknown]
            t.add_done_callback(result_callback)
            tasks.add(t)
        await asyncio.sleep(0)
        while True:
            if not tasks:
                return
            try:
                yield await q.get()
            except (GeneratorExit, asyncio.CancelledError):
                # GeneratorExit: injected when aclose() is called
                # CancelledError: injected when a timeout occurs (i.e., the outer scope cancels the inner)
                await supervisor.shutdown()
                raise

Usage example:

@pytest.mark.asyncio
async def test_as_completed_safe_error_from_body():
    executed = 0
    cancelled = 0
    loop_count = 0

    async def do_job(delay, idx):
        nonlocal cancelled, executed
        try:
            await asyncio.sleep(delay)
            executed += 1
            return idx
        except asyncio.CancelledError:
            cancelled += 1
            raise

    with VirtualClock().patch_loop():
        results = []
        with pytest.raises(ZeroDivisionError):
            async with aclosing(as_completed_safe([
                do_job(0.1, 1),
                do_job(0.2, 2),
                # cancellation occurs here
                do_job(0.3, 3),
            ])) as ag:
                async for result in ag:
                    results.append(await result)
                    loop_count += 1
                    if loop_count == 2:
                        raise ZeroDivisionError()

    assert loop_count == 2
    assert executed == 2
    assert cancelled == 1
    assert results == [1, 2]

This is why I’m now considering gather_safe() which just returns a group result instead of an async generator.