How to cancel all tasks created within same TaskGroup

TaskGroup is quite new thing and I didn’t find functionality for easily cancelling all other tasks that might be running in the same task group. Only method TaskGroup object has is create_task.

Is this the right way to go?

import asyncio
from asyncio import Task
from random import random


async def job_1(tasks: list[Task]):
    while True:
        await asyncio.sleep(1)  # Work that is being done
        if random() < 0.1:  # When every task should get cancelled
            for task in tasks:
                if task == asyncio.current_task():
                    print("Task 1 cancelling others first")
                    continue
                if task.done():
                    print("Some task was already done")
                task.cancel()
            return
        print("Task 1")


async def job_2(tasks: list[Task]):
    while True:
        await asyncio.sleep(1)
        if random() < 0.1:
            for task in tasks:
                if task == asyncio.current_task():
                    print("Task 2 cancelling others first")
                    continue
                if task.done():
                    print("Some task was already done")
                task.cancel()
            return
        print("Task 2")


async def daemon():
    while True:
        await asyncio.sleep(1)
        print("Daemon task")


async def main():
    async with asyncio.TaskGroup() as group:
        tasks: list[Task] = []
        task_1 = group.create_task(job_1(tasks))
        task_2 = group.create_task(job_2(tasks))
        daemon_task = group.create_task(daemon())
        tasks.append(task_1)
        tasks.append(task_2)
        tasks.append(daemon_task)


asyncio.run(main())

Trio and anyio afford a taskgroup.cancel_scope.cancel method for this.

Asyncio doesn’t have cancel scopes, and anyway the additional indirection doesn’t make much sense, thus I’m going to propose adding a taskgroup.cancel method for this.

Currently the only way seems to be to cancel the task itself, which is stored in the taskgroup as a private datum and thus really shouldn’t be used. Alternately you can pass the task itself around, which is a problem because the task might not even be running within the taskgroup’s scope any more – and cancelling an inactive taskgroup should be a no-op.

2 Likes

Matthias is right that you can cancel the task that the task group is running in (assuming that nothing else happens before/after the task group block, otherwise you’ll need to refactor it into its own function).

But another thing that will cancel all the tasks in a task group is when one of the tasks in the group raises an exception. That’s the whole point of task groups! So you could just manually inject an exception with TaskGroup.create_task() and then catch it just outside. Here’s your example reformatted to do this:

Code example using group.create_task(_raise_cancel_request())
import asyncio
from random import random

class CancelRequestException(Exception):
    pass

async def raise_cancel_request():
    raise CancelRequestException

async def job(i, group):
    while True:
        print(f"Task {i} before sleep")
        await asyncio.sleep(1)
        print(f"Task {i} after sleep")
        if random() < 0.1:
            print(f"Task {i} cancelling others")
            group.create_task(raise_cancel_request())
            return

async def daemon():
    while True:
        print("Daemon before sleep")
        await asyncio.sleep(1)
        print("Daemon after sleep")

async def main():
    try:
        async with asyncio.TaskGroup() as group:
            group.create_task(job(1, group))
            group.create_task(job(2, group))
            group.create_task(daemon())
    except* CancelRequestException:
        pass

asyncio.run(main())

(If you change random() < 0.1 to e.g. random() < 0.8 and run it a few times, you will see that sometimes task 1 will try to cancel but task 2 will then wake up and cancel too. That’s because create_task(_raise_cancel_request()) will schedule the _raise_cancel_request() but not run it straight away, so task 2 is not cancelled quite immediately. That’s not necessarily a problem but it’s certainly a difference from Matthias’s method.)

By the way, with this particular example, you can simply raise the exception directly from job()! No TaskGroup.create_task() or extra hypothetical TaskGroup.cancel() method needed. But I assume you’re talking about a situation where you might want to cancel a task group from a task running in some other task group.

It’s possible to wrap this technique up into a cancellable task group class if that’s useful. I’m not an expert on context managers so I’m not 100% sure this is right but it seems to work:

import asyncio

class _CancelRequestException(Exception):
    pass

async def _raise_cancel_request():
    raise _CancelRequestException

class CancellableTaskGroup(asyncio.TaskGroup):
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            result = await super().__aexit__(exc_type, exc_val, exc_tb)
            # This surely means that there was no exception, otherwise super().__aexit__() would
            # have wrapped it in an exception group and explicitly raised it.
            # (In principle, if the main body raised a SystemExit or KeyboardInterrupt then that
            # could follow this route, although in practice the current implementation of
            # TaskGroup always re-raises these rather than just returning True.)
            return result

        except* _CancelRequestException:
            pass  # Suppress cancel request exception

        # If we got here, the only exception in the ExceptionGroup was _CancelRequestException
        return True

    def cancel(self):
        self.create_task(_raise_cancel_request())
Example using CancellableTaskGroup
from random import random

async def job(i, group):
    while True:
        print(f"Task {i} before sleep")
        await asyncio.sleep(1)
        print(f"Task {i} after sleep")
        if random() < 0.1:
            print(f"Task {i} cancelling others")
            group.cancel()
            return

async def daemon():
    while True:
        print("Daemon before sleep")
        await asyncio.sleep(1)
        print("Daemon after sleep")

async def main():
    async with CancellableTaskGroup() as group:
        group.create_task(job(1, group))
        group.create_task(job(2, group))
        group.create_task(daemon())

asyncio.run(main())

My very basic diff with some code examples: Add a `taskgroup.stop` method · Issue #108951 · python/cpython · GitHub

There’s active discussion about this feature natively to task groups over on that github issue.

One thing I wanted to mention here is that I had the idea of cancel_called and cancel_caught attributes too, in a similar way that Trio does for cancellation scopes. Those tend to be quite useful so perhaps they would be useful here too, and they would be really easy to add to the implementation.

Then again isn’t this a feature to consider separately? It doesn’t seem tied to task groups (even if it is in trio).

There was this SO question python - How to cancel all tasks in a TaskGroup - Stack Overflow a while ago, to which there were two working answers (the first of which is mine):

  1. Use the undocumented TaskGroup._abort method, which does exactly what you want and which I think should be made public.
  2. Use another task to wrap the function that creates the TaskGroup and cancel that task instead.

Unfortunately it does not, as explained in the github issue, and as evident by the proposed implementation PR.

1 Like

Definitely cancel_called and cancel_caught could be considered separately after/if this one gets accepted. I just mentioned them here because someone on the github issue talked about detecting why a task group finished and their idea was much more complex (raising a different cancellation-like exception).

I was caught off guard by your comment those attributes not being tied to task groups. Maybe you were thinking of them raw asyncio.Task objects? They actually already have analogues of them: they’re asyncio.Task.cancelling() and asyncio.Task.cancelled() respectively.

I’m sorry, I was on my phone and didn’t follow the links. :slight_smile: I see now that they are tied to cancel scopes in Trio, and the closest equivalent asyncio has appears to be a task group. Nevertheless let’s open a separate Discourse topic or GitHub issue for that.

(Reposting my design idea from github here for design discussion.)

What if TaskGroup.cancel() is defined as explicitly raising CancelledError from TaskGroup.__aexit__ upon successful cancellation instead of exiting normally from the suite?

Raising an exception makes the unusual cancel-exit condition more explicit and lets callers use normal exception handling to detect, recover, or propagate to enclosing scopes.

Calling cancel() would act exactly as if the suite had itself raised CancelledError and has no effect at all if the context has already exited or if the suite or a task raises a different exception. Calling cancel() from the suite immediately raises CancelledError.

This design makes TaskGroup.cancel and Task.cancel more closely related. Both would raise CancelledError out of the scope being cancelled:

  • For TaskGroup.cancel() the cancelled thing is the context of the with-statement
  • For Task.cancel() the cancelled thing is the whole body of the coroutine

The need for a try statement to recover is a drawback of this approach. That said, being explicit seems better and programming errors that swallow parent cancel by mistake are probably less likely to be severe than programming errors that silently continue past an unhandled and unexpected TaskGroup cancellation.

Example:

try:
  async with asyncio.TaskGroup() as g:
    # arrange to call g.cancel upon some condition
    g.create_task(some_thing(g.cancel))
    ...
except asyncio.CancelledError:
  # propagate when enclosing task is cancelled
  if asyncio.current_task().cancelling():
    raise
  pass  # g.cancel() was called

There is so much wrong with this comment…

  • Your snippet at the end assumes that TaskGroup.cancel() (or TaskGroup.stop() or whatever it’s called) would propagate asyncio.CancelledError but leave current_task().cancelling() == 0. This is not correct; a propagating cancellation should always have cancelling() non-zero (specifically, incremented by one compared to its old value).
  • If you had assumed correct semantics for a propagating CancelledError (with cancelling() left incremented by one) then there would be no way to tell whether it was from TaskGroup.cancel() or some external source. That’s really a fundemental problem with your API idea.
  • If you had assumed correct semantics for a propagating CancelledError then someone wanting to catch the exception would also need to remember to call uncancel(). IMO, application authors shouldn’t need to do this. The whole benefit of this API is that it gives a controlled way to manually raise a cancellation and to uncancel it later without worrying about these internal details.
  • “being explicit seems better … unexpected TaskGroup cancellation” IMO, if you cancel a specific object (task or task group or anything else), then propagating that further is the thing that needs to be made explicit. Your API, where it automatically propagates futher, is less explicit.
  • You say that Task.cancel() “would raise CancelledError out of the scope being cancelled” but Task.cancel() does not do that: it’s await task that does this (and I would always recommend await asyncio.wait([task]) exactly so that it doesn’t mix up child and parent task cancellation). [Edit: Even more relevant here, cancelling an individual task in a task group would also not cancel the task group or its enclosing task, not even when it gets to __aexit__(). Again, only awaiting the task object would cause the propagation.]
  • Your API is not even consistent with TaskGroup itself! As you mentioned on Github, when it cancels its enclosing task due to a child task raising an exception, it does not propagate that out (unless there’s another cancellation for another reason). Surely this API should be consistent with that.

Ultimately, what you are suggesting is not terribly different from just cancelling the enclosing task. You can already do that. The API suggested here is genuinely useful in a way your suggestion just isn’t, as I think you would see if you genuinely had a need in your own application. Your comments have seriously railroaded this discussion, which is really frustrating there are actual subtleties to discuss (the name for the API, what it should do before/after the context manager, etc.).

Well, I did mean that. The exception signals cancel-exit, and is intended to be caught not propagated. Though if uncaught the enclosing task does exit with cancelled() == True.

Putting that aside, what should happen below if the TaskGroup is cancelled while suspended at one of the await expressions?

async with asyncio.TaskGroup() as g:
  something.add_callback(g.cancel)
  t1 = g.create_task(first_part(...))
  t2 = g.create_task(second_part(...))
  r = await t1
  r.merge(await t2)
return r

If some other task calls tg.cancel() then, this task would be resumed and for that await would raise asyncio.CancelledError [1]. This is completely standard; it’s exactly the same as for cancelling the whole task and for aborting a task group (due to a non-cancel exception from a child task).

When the asyncio.CancelledError exception reaches tg.__aexit__(), it will be swallowed and the cancellation count decremented back down to 0. All child tasks are already cancelled by this point (that happened in tg.cancel()) but may still be running, and tg.__aexit__() will wait for those tasks to finish. It will then return without exception.

All eventualities should compose together nicely (task group is cancelled; non-cancel exception raised from child task or body of task group; cancellation happens from outside) regardless of the order that they happen.

Maybe a longer example would help?

async def taskgroup_example():
    async with asyncio.TaskGroup() as outer_tg:
        async with asyncio.TaskGroup() as middle_tg:
            async with asyncio.TaskGroup() as inner_tg:
                middle_tg.cancel()
                print("We DO reach here: tg.cancel() schedules CancelledError but does not raise it directly")
                await asyncio.sleep(0)
                print("We do NOT reach here: sleep(0) raises CancelledError")
            print("We do NOT reach here: inner_tg.__aexit__() propagates cancellation")
        print("We DO reach here: middle_tg was cancelled but is now complete")
    print("We DO reach here: outer_tg is unaffected")

  1. There is a slight race here. The await expression might already have been scheduled to resume with some other raturn value or exception. For example, if the body of the task group is waiting on await my_queue.get(), and another task calls my_queue.put_nowait(foo) immediately followed by tg.cancel(), then the await expression will return the value you pushed. But then the next await expression will actually raise the cancellation. But, again, this is just the usual behaviour for asyncio cancellation. ↩︎