Exception handling in Supervisor
@DontPanicO I think we need to address the following design changes from your initial implementation (see also: Asyncio, tasks, and exception handling - recommended idioms? - #11 by achimnol):
- Supervisor should not raise the exception group by itself nor remember all exceptions. Since a supervisor is meant to run for an indefinite length of time, it should never remember individual task’s results or the task instances for its whole lifespan to avoid memory leak.
- For the above reason, we need to have an explicit exception handler interface with the default fallback to
loop.call_exception_handler()
.
@guido For this reason, I’d like to also suggest making Supervisor as a separate API to TaskGroup, instead of adding a mode of operation to TaskGroup, due to this significant difference of exception raising behavior.
- TaskGroup: all subtasks share the same “destiny”
- All tasks are cancelled at once upon both external cancellation or internal unhandled exception, otherwise they run to successful completion.
- Unhandled exceptions are collected as an exception group.
- Supervisor: each subtask has its own “destiny”
- Only the cancellation from outside (e.g.,
shutdown()
) cancels them all at once.
- Unhandled exceptions are processed by an explicit exception handler.
gather_safe()
experimentation
To embrace result collection patterns, I’d like to add a wrapper around Supervisor as follows:
@dataclass
class GroupResult:
results: list[Any] = field(default_factory=list)
cancelled: int = 0
async def gather_safe(coros, group_result: GroupResult) -> GroupResult:
errors = []
ongoing_cancelled_count = 0
def result_callback(t: asyncio.Task) -> None:
nonlocal errors, ongoing_cancelled_count
try:
group_result.results.append(t.result())
except asyncio.CancelledError:
ongoing_cancelled_count += 1
except Exception as e:
errors.append(e)
try:
async with Supervisor() as supervisor:
for coro in coros:
t = supervisor.create_task(coro)
t.add_done_callback(result_callback)
return group_result
except asyncio.CancelledError as e:
errors.append(e) # cannot reraise directly here because the finally block will override it.
finally:
group_result.cancelled = ongoing_cancelled_count
if errors:
raise BaseExceptionGroup("unhandled exceptions in gather_safe()", errors)
Sample test code:
@pytest.mark.asyncio
async def test_gather_safe_timeout():
detected_exc_groups = set()
group_result = GroupResult()
try:
async with asyncio.timeout(0.35):
await gather_safe([
do_job(0.1, 1), # sleeps 0.1s and returns 1
fail_job(0.2), # raises ZeroDivisionError after 0.2s
fail_job(0.25), # raises ZeroDivisionError after 0.25s
do_job(0.3, 3), # sleeps 0.3s and returns 3
# timeout occurs here
do_job(0.4, 4), # cancelled
fail_job(0.5), # cancelled
fail_job(0.6), # cancelled
], group_result)
except* asyncio.TimeoutError as e:
detected_exc_groups.add("timeout")
# we should be able to access the partial results
assert group_result.results == [1, 3]
assert group_result.cancelled == 3
except* ZeroDivisionError as e:
detected_exc_groups.add("zerodiv")
# we should be able to access the partial results
assert group_result.results == [1, 3]
assert group_result.cancelled == 3
else:
pytest.fail("inner exception was not re-raised")
assert detected_exc_groups == {"timeout", "zerodiv"}
assert group_result.results == [1, 3]
assert group_result.cancelled == 3
Note that the above code only works with my suggested patch to asyncio.timeouts.Timeout.__aexit__()
as in this another thread, because the current asyncio.timeout
context manager does not detect CancelledError
contained in BaseExceptionGroup
. I’d like to hear @yselivanov’s opinion about this point.
The above experimentation code is available at aiotools’ GitHub repo.
A New Naming Idea
- “PersistentTaskGroup” is a bit too long. We already have one candidate: “Supervisor” following Kotlin’s
SupervisorScope
.
- How about “TaskScope”, which aligns better with “TaskGroup” but emphasizes the difference between “Group” (sharing the destiny) and “Scope” (individuality)?