Revisiting PersistentTaskGroup with Kotlin's SupervisorScope

I don’t want to end up with multiple solutions.

Simplicity is the best, but in our case, the problem is that long-lived use case needs to prevent memory leaks as exceptions are “heavy” objects.

Also, I think we should have a more generalized (less-specific, relaxed) task scope which handles nested cancellation correctly but does not enforce specific semantics on the child task completion like TaskGroup. As I’ve demonstrated, TaskGroup can be implemented as a subclass of TaskScope, and TaskScope allows writing of various task handling patterns as in aiotools, which are not feasible on TaskGroup due to enforcing grouped cancellation.

This thread seems to focus on changing TaskGroups to handle child cancellation differently, but lately I’ve been having a different itch: I would like my TaskGroup to cancel and clean up its children in __aexit__ (the default behavior is to wait for them).

My use case is simple: an open websocket connection spawns helper tasks (to subscribe to event busses, to deal with keepalives etc). When the connection is closed I need to manually cancel all these helpers. I could hack around it with raising an exception and then catching it outside of the TaskGroup, but ideally we could opt into the TaskGroup doing this by itself?

So we might want to consider an approach where multiple facets of TaskGroups can be configured.

I worry that we’d end up with a complicated interface. Your solution (raise an exception and catch it) seems totally fine. If this becomes a common use case, I’d rather add a shutdown() method than a new keyword argument.

1 Like

I think this is a fine refactoring. I will have to study your GitHub issues more.

I’ve thought about it and I dislike the shutdown() interface since I feel like I might forget to call it.

I think I might stick a TaskGroup subclass in quattro and consider it good enough for now.

In TaskGroup, we don’t need shutdown().

In TaskScope I’m proposing, shutdown() is an auxilliary, optional method to allow triggering shutdown from another context, such as signal handlers.
To implement fine-grained control of taskscope termination in its subclasses, such as when catching GeneratorExit in aiotools.as_completed_safe(), we should keep shutdown() as a public API or at least a “protected” one.

async with is an anchor indicating who “owns” the child tasks (i.e., who is the parent), while shutdown() provides a way to actively terminate the task scope in other contexts. Even without calling shutdown(), the task scope’s termination is guaranteed by the async with statements.

There is another API design issue, though.

TaskScope and TaskGroup both terminates (i.e., continues to the next statements after the async with block) when all children exits.

For long-lived server use cases, we would just keep track of children tasks which are spawns sporadically and keep the task scope “open” until we have an explicit termination signal (e.g., cancellation from the inner block, cancellation from another task or callback).

I think this use case may be covered with a simple weakref.WeakSet based wrapper, but still it would be nicer to have a “structured” cancellation scope, as shown in the following table’s SupervisorScope.

structured cancellation (async with) + auto-exit or explicit-shutdown + cancel-together
CancellationScope TaskScope TaskGroup
CancellationScope SupervisorScope

It sounds to me as if that use case requires a variant that is not a cancellation scope but just a manager of tasks that logs errors when they fail. Almost like the default asyncio.create_task() except that doesn’t log an error when a task fails. (Maybe it should? But that would probably be very noisy for some existing apps.)

For long-lived server use cases, we would just keep track of children tasks which are spawns sporadically and keep the task scope “open” until we have an explicit termination signal (e.g., cancellation from the inner block, cancellation from another task or callback).

The API I would expect, is that exiting the context would cancel the tasks in this case. So, using an aiohttp cleanup_ctx, I’d be doing:

async def supervisor(app):
    async with Supervisor() as s:
        app["supervisor"] = s
        yield

In a framework without cleanup_ctx, __aenter__()/__aexit__() can be used with startup/shutdown hooks.

It would also be nice if there was an optional timeout that could be waited before cancelling any remaining tasks. Though this could maybe be added manually after the yield with something like wait_for(gather(*s.tasks), timeout=10).

Also, not sure if you saw my message about logging exceptions, or if there’s been some other discussion on it since: create_task() recommendation seems like bad practice · Issue #104091 · python/cpython · GitHub

1 Like

Yeah, I think the async-context-manager style API is a perfect fit with aiohttp’s cleanup_context API, where the supervisor context is “anchored”.

I also have thought about introducing timeouts on graceful cancellation and then a forced cancellation followed, but currently excluded this pattern in my experimentation for now. Intrinsically coroutines should be “cooperative”, so it is hard to force-terminate a coroutine in general…

Intrinsically coroutines should be “cooperative”, so it is hard to force-terminate a coroutine in general…

Yes, the specific case I’m thinking about, is when you shutdown a web app, but the app often has background tasks for DB writes etc. that only last a couple of seconds. So, a graceful shutdown would allow these tasks to run before cancellation. So, shutdown would look like:

with suppress(asyncio.TimeoutError):
    await asyncio.wait_for(asyncio.gather(*tasks), timeout=5)
for t in tasks:
    t.cancel()
try:
    await asyncio.wait_for(asyncio.gather(*tasks), timeout=30)
except asyncio.TimeoutError:
    logging.warning("Some tasks didn't cancel in time.")

Or similar. So there is a brief moment to let tasks complete naturally, and then cancel and await.

Again, this behaviour could be done manually as in my previous comment (if we don’t want the complexity added to Supervisor), as long as the task set is available as an attribute.

Not sure what the current state is on this, but I thought of another thing; it could be a really good idea to add a .shield() method as an alternative to asyncio.shield() which would track the inner task within the supervisor. This idea comes from trying to make the aiohttp shutdown process really robust.

In aiohttp handlers might be cancelled if the connection is dropped, so a user might utilise asyncio.shield() to protect a DB write or similar. The problem occurs when the server is trying to shutdown, it will wait on handler tasks to complete before continuing to shutdown. Then it will cancel all tasks that are still running.

The problem is that there is no way to track the inner task of asyncio.shield() and it therefore could get cancelled during shutdown, which rather defeats the purpose of shielding it. By introducing a method which shields while tracking the task in the supervisor, a user can use this safely, with the cleanup_ctx on shutdown allowing the shielded task to be waited on before aiohttp cancels everything.