Revisiting PersistentTaskGroup with Kotlin's SupervisorScope

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/supervisor-scope.html
I found that Kotlin’s supervisorScope does the exact same thing that I have been going to achieve with aiotools.PersistentTaskGroup while Kotlin’s coroutineScope corresponds to asyncio.TaskGroup.

Unlike coroutineScope, a failure of a child does not cause this scope to fail and does not affect its other children, so a custom policy for handling failures of its children can be implemented. See SupervisorJob for additional details. A failure of the scope itself (exception thrown in the block or external cancellation) fails the scope with all its children, but does not cancel parent job.

There was a concern about the naming “persistent” in gh-90999, and now I’m inclined to use “SupervisorScope”. As you may already notice, it also resembles with supervisord.

How about your thoughts? Would the Kotlin’s analogy make better sense to add this API to asyncio?

4 Likes

Looking at the docs for aiotools.PersistentTaskGroup I finally understand the key feature that you want: when a child task crashes with an unexpected exception you don’t want its siblings to be cancelled.

I wonder if this can’t be done with a very simple wrapper for asyncio.TaskGroup that simple wraps each task (coroutine) in a try/except BaseException block that ignores all exceptions? (Maybe with the exception of KeyboardInterrupt and SystemExit, like TaskGroup and the rest of asyncio do.)

So we could do something like this (untested, and simplified):

class AltTaskGroup(asyncio.TaskGroup):
    def create_task(self, coro):
        async def wrapper():
            try:
                await coro()
            except (KeyboardInterrupt, SystemExit):
                raise
            except BaseException:
                pass
        return super().create_Task(wrapper())

Your draft is equivalent to wrap the child tasks with asyncio.shield().
The point is that the child tasks still must be cancellable when the parent task group is explicitly cancelled (i.e., shutdown).
So I had to change the internal logic of TaskGroup to write PersistentTaskGroup.

  • When the persistent task group is cancelled or explicitly shutdown: all child tasks should be cancelled and awaited. → This makes it different from simply shielding child tasks. Propagation of cancellation should be controlled in the task group instead of individual tasks.
  • When a child task is cancelled: all other child tasks should remain intact.
  • When a child task raised unhandled exception: an exception handler configured in the persistent task group is invoked (the default fallback is loop.call_exception_handler()). All other child tasks should remain intact.
  • If all child tasks have finished, the persistent task group should exit as well. (same to the original task group)
  • It should guarantee termination of all child tasks if the control flow has exited from the persistent task group. (same to the original task group)

One of the reason to rewrite the code is that I need to call the shutdown process other than __aexit__() handler, such as shutdown(), when the persistent task group is used as long-lived object instead of a context manager. The current TaskGroup has all its shutdown routines inside __aexit__() and it’s not reusable from other methods. It’s not “refactored” or “designed” to be subclassed.

I wonder if maybe your proposed functionality could be implemented as a flag passed to the TaskGroup class. The main effect of the flag should be that if one task crashes this shouldn’t cause all other tasks to be cancelled, instead the exception should be logged (either by loop.call_exception_handler() or in some other way). Also you want a shutdown() method.

Would that work? If I am right then the main bikeshedding might have to be about the logging API to be used. I prefer there to be no coupling between asyncio and logging, so hopefully just calling loop.call_exception_handler() works for you?

2 Likes

Yeah, that would be nice!
I’d like to be able to customize the error handler with the default fallback to loop.call_exception_handler(), and agree with you in that it’s better to remove coupling between asyncio and logging.

I’ll try to implement a modified version of TaskGroup with the optional flag.
If done, shall I make a pull request to the CPython repository?
Or would you prefer to see it in a separate repo (e.g., aiotools)?

I recommend that you work on a PR, so I can review it, and eventually merge it so it will become available in 3.12. I’m not going to be able to review code that goes into other repos.

2 Likes

Hello, new contributor here, I would like to help with that.
I was looking at the TaskGroup source code and it seems that the only thing that should change (as has already been stated) is the behaviour of TaskGroup._on_task_done when a task raises (if the appropriate flag has been set).

Do we really need to await on each task after calling .cancel()?

SystemExit and KeyboardInterrupt are still considered special, I guess ?

1 Like

Thanks for the interest! I have been too busy on my business affairs (traveling through the globe…) so could not fire up the development yet.

I recently updated my fork of CPython and created a branch to add this.

My plan is as follows:

  • First, create a separate asyncio.Supervisor class which has the same coding convention of asyncio.TaskGroup but takes the implementation from aiotools.PersistentTaskGroup.
  • Import & improve the test cases from aiotools and write new test cases.
  • Explore possibility to merge asyncio.Supervisor as a part of asyncio.TaskGroup, or search for a better naming. This may require some refactoring of the existing asyncio.TaskGroup codes.
  • Tighten up as the pull request and ask reviews from the core developers including Guido.

If you are interested, I can invite you as the collaborator in my CPython fork.
I’ll make it sure that you are also included as the PR author and co-contributor, of course.

1 Like

Do we really need to await on each task after calling .cancel()?

Yes, because after catching asyncio.CancelledError, the task may have further await statements for clean up, such as await something.aclose(), __aexit__() of async with blocks, and etc.

SystemExit and KeyboardInterrupt are still considered special, I guess ?

Yes, along with asyncio.CancelledError, they are BaseException but not Exception, to allow them to be used as a control-flow signal rather than representation of failures.
Note that asyncio.CancelledError has been changed to BaseException since Python 3.8 to avoid silent swallowing of cancellation in 3rd-party library codes.

Thanks for getting back. I’ve cloned your fork and I’m exeprimenting with it, if you want to invite me as a collaborator my Github username is the same as here (DontPanicO).

1 Like

Exception handling in Supervisor

@DontPanicO I think we need to address the following design changes from your initial implementation (see also: Asyncio, tasks, and exception handling - recommended idioms? - #11 by achimnol):

  • Supervisor should not raise the exception group by itself nor remember all exceptions. Since a supervisor is meant to run for an indefinite length of time, it should never remember individual task’s results or the task instances for its whole lifespan to avoid memory leak.
    • For the above reason, we need to have an explicit exception handler interface with the default fallback to loop.call_exception_handler().

@guido For this reason, I’d like to also suggest making Supervisor as a separate API to TaskGroup, instead of adding a mode of operation to TaskGroup, due to this significant difference of exception raising behavior.

  • TaskGroup: all subtasks share the same “destiny”
    • All tasks are cancelled at once upon both external cancellation or internal unhandled exception, otherwise they run to successful completion.
    • Unhandled exceptions are collected as an exception group.
  • Supervisor: each subtask has its own “destiny”
    • Only the cancellation from outside (e.g., shutdown()) cancels them all at once.
    • Unhandled exceptions are processed by an explicit exception handler.

gather_safe() experimentation

To embrace result collection patterns, I’d like to add a wrapper around Supervisor as follows:

@dataclass
class GroupResult:
    results: list[Any] = field(default_factory=list)
    cancelled: int = 0


async def gather_safe(coros, group_result: GroupResult) -> GroupResult:
    errors = []
    ongoing_cancelled_count = 0

    def result_callback(t: asyncio.Task) -> None:
        nonlocal errors, ongoing_cancelled_count
        try:
            group_result.results.append(t.result())
        except asyncio.CancelledError:
            ongoing_cancelled_count += 1
        except Exception as e:
            errors.append(e)

    try:
        async with Supervisor() as supervisor:
            for coro in coros:
                t = supervisor.create_task(coro)
                t.add_done_callback(result_callback)
        return group_result
    except asyncio.CancelledError as e:
        errors.append(e)  # cannot reraise directly here because the finally block will override it.
    finally:
        group_result.cancelled = ongoing_cancelled_count
        if errors:
            raise BaseExceptionGroup("unhandled exceptions in gather_safe()", errors)

Sample test code:

@pytest.mark.asyncio
async def test_gather_safe_timeout():
    detected_exc_groups = set()
    group_result = GroupResult()
    try:
        async with asyncio.timeout(0.35):
            await gather_safe([
                do_job(0.1, 1),  # sleeps 0.1s and returns 1
                fail_job(0.2),   # raises ZeroDivisionError after 0.2s
                fail_job(0.25),  # raises ZeroDivisionError after 0.25s
                do_job(0.3, 3),  # sleeps 0.3s and returns 3
                # timeout occurs here
                do_job(0.4, 4),  # cancelled
                fail_job(0.5),   # cancelled
                fail_job(0.6),   # cancelled
            ], group_result)
    except* asyncio.TimeoutError as e:
        detected_exc_groups.add("timeout")
        # we should be able to access the partial results
        assert group_result.results == [1, 3]
        assert group_result.cancelled == 3
    except* ZeroDivisionError as e:
        detected_exc_groups.add("zerodiv")
        # we should be able to access the partial results
        assert group_result.results == [1, 3]
        assert group_result.cancelled == 3
    else:
        pytest.fail("inner exception was not re-raised")
    assert detected_exc_groups == {"timeout", "zerodiv"}
    assert group_result.results == [1, 3]
    assert group_result.cancelled == 3

Note that the above code only works with my suggested patch to asyncio.timeouts.Timeout.__aexit__() as in this another thread, because the current asyncio.timeout context manager does not detect CancelledError contained in BaseExceptionGroup. I’d like to hear @yselivanov’s opinion about this point.

The above experimentation code is available at aiotools’ GitHub repo.

A New Naming Idea

  • “PersistentTaskGroup” is a bit too long. We already have one candidate: “Supervisor” following Kotlin’s SupervisorScope.
  • How about “TaskScope”, which aligns better with “TaskGroup” but emphasizes the difference between “Group” (sharing the destiny) and “Scope” (individuality)?

About as_completed_safe()

I have mentioned an idea about addition of a safer version of asyncio.as_completed() based on PersistentTaskGroup or Supervisor at a short in-person discussion with @yselivanov and @guido in PyCon last week. After some testing and experimentation, I’m now inclinded to deprecate this idea, even as a 3rd party library function. The reasons are:

After more experimentation, I could figure out how to properly handle both timeouts and exceptions from the block body. This eliminates the necessity of making timeout changes for recognizing BaseExceptionGroup because TimeoutError is mutually exclusive to other exceptions happening.

  • It is impossible to distinguish the cancellation and other exceptions from the view of the async generator.
  • The caller must have a complicated boilerplate to handle both potential exceptions inside the block and aclose(). This complication comes from the use of async generators, not from the API’s purpose.
async def as_completed_safe(coros):
    q = asyncio.Queue()
    tasks = set()

    def result_callback(t: asyncio.Task) -> None:
        tasks.discard(t)
        q.put_nowait(t)

    async with Supervisor() as supervisor:
        for coro in coros:
            t = supervisor.create_task(coro) : Task[Unknown]
            t.add_done_callback(result_callback)
            tasks.add(t)
        await asyncio.sleep(0)
        while True:
            if not tasks:
                return
            try:
                yield await q.get()
            except (GeneratorExit, asyncio.CancelledError):
                # GeneratorExit: injected when aclose() is called
                # CancelledError: injected when a timeout occurs (i.e., the outer scope cancels the inner)
                await supervisor.shutdown()
                raise

Usage example:

@pytest.mark.asyncio
async def test_as_completed_safe_error_from_body():
    executed = 0
    cancelled = 0
    loop_count = 0

    async def do_job(delay, idx):
        nonlocal cancelled, executed
        try:
            await asyncio.sleep(delay)
            executed += 1
            return idx
        except asyncio.CancelledError:
            cancelled += 1
            raise

    with VirtualClock().patch_loop():
        results = []
        with pytest.raises(ZeroDivisionError):
            async with aclosing(as_completed_safe([
                do_job(0.1, 1),
                do_job(0.2, 2),
                # cancellation occurs here
                do_job(0.3, 3),
            ])) as ag:
                async for result in ag:
                    results.append(await result)
                    loop_count += 1
                    if loop_count == 2:
                        raise ZeroDivisionError()

    assert loop_count == 2
    assert executed == 2
    assert cancelled == 1
    assert results == [1, 2]

This is why I’m now considering gather_safe() which just returns a group result instead of an async generator.

aiotools.gather_safe() vs. aiotools.as_completed_safe()

These “demo” APIs share the same goal: collect the results and exceptions from the child tasks using Supervisor, with safety guarantee to terminate all tasks upon exit while keeping sibling tasks running upon unhandled exceptions from specific child tasks.

ref) Experiment with DontPanicO's Supervisor impl. by achimnol · Pull Request #53 · achimnol/aiotools · GitHub

However, I can now clearly state the difference between them:

  • as_completed_safe() “flattens” the unhandled exceptions from child tasks using a generator, so that they can be handled one by one.
    • It works with the vanilla asyncio.timeout() well, because exceptions are raised and handled one by one.
  • gather_safe() “groups” all unhandled exceptions from child tasks using an exception group.
    • It requires modification of asyncio.timeout() to check if BaseExceptionGroup contains CancelledError or not.

As such, I think it would be nice to have both style of APIs for the same purpose, so that users could choose one. Though, I’m not sure whether the same argument would apply in the stdlib design. Of course, this is just an experimentation and to gather your thoughts about the API design, possibility on the stdlib inclusion, and potential improvements/changes required for doing so.

Don’t know if there’s much difference from aiotools.PersistentTaskGroup, but you may also want to look at aiojobs as another reference implementation. I think they are achieving the same thing. The library was originally written by @asvetlov, so should be good quality and written with a solid understanding of asyncio.

Yes, I also have experiences of using aiojobs.
It also provides graceful shutdown of a set of tasks, but it is more specialized to limit the maximum concurrency of the jobs and emphasizes integration with aiohttp’s request handlers by shielding them and making them to run to completion regardless of request cancellation or interrupts. However, such specialization may not be always wanted (e.g., websockets). aiojobs also does not provide context manager interfaces.

PersistentTaskGroup and Supervisor are more like a primitive to construct such higher-level coroutine managers. aiojobs tries to achieve too many things in a specific way (of course it is well written with good understanding), but I think we need a reusable construct to write such things.

Yep, I was mainly thinking of the Scheduler, which does the same sort of thing (I think we can probably replace aiojobs.aiohttp.atomic with something that uses TaskGroup anyway, and then maybe deprecate the library if it doesn’t have much else to offer, or refactor it to some simple TaskGroup abstractions).

One use case I’ve just been reminded of, and I’m not sure if you already have a plan for, is the ability to wait and cancel when shutting down the application. e.g. My application may spawn a task that never ends, so when I am shutting down the application I may want to wait on running tasks for a given timeout and then have all remaining tasks cancelled and awaited on.

In aiohttp, we recently introduced some wait and cancel logic directly, so we don’t really need this in TaskGroup, but it could be useful in other applications.

How much overlap is there between this thread and several others I’ve seen recently, @achimnol ? I think there’s another thread that talks about Supervisor vs. PersistentTaskGroup already. Personally, I’m not excited about revisiting as_completed() – I’d rather just deprecate it without a similarly-named or -functioning replacement.

@achimnol When you get back to this, could you also review and compare this to gh-101581 Optionally prevent child tasks from being cancelled in asyncio taskgroups by DontPanicO · Pull Request #101648 · python/cpython · GitHub, which seems to go a different route (just add defer_errors=True flag to the TaskGroup constructor)? I don’t want to end up with multiple solutions.

1 Like

Oh great!

BTW, I’m concerned about the “long-running” usecase where unhandled exceptions should not be stored in the taskgroup object (self._errors) for an indefinite length of time, to prevent memory leaks with continuously spawned & destroyed tasks. This is why I have deliberately removed the feature of raising up an exception group in aiotools.Supervisor which is a porting of @DontPanicO’s PR.

As long as this concern is taken in account, I think it’s fine to have an option like defer_errors=True, but in that case the name would require changes, ignore_errors=True or delegate_errors=handler_func.