How to cancel all tasks created within same TaskGroup

TaskGroup is quite new thing and I didn’t find functionality for easily cancelling all other tasks that might be running in the same task group. Only method TaskGroup object has is create_task.

Is this the right way to go?

import asyncio
from asyncio import Task
from random import random


async def job_1(tasks: list[Task]):
    while True:
        await asyncio.sleep(1)  # Work that is being done
        if random() < 0.1:  # When every task should get cancelled
            for task in tasks:
                if task == asyncio.current_task():
                    print("Task 1 cancelling others first")
                    continue
                if task.done():
                    print("Some task was already done")
                task.cancel()
            return
        print("Task 1")


async def job_2(tasks: list[Task]):
    while True:
        await asyncio.sleep(1)
        if random() < 0.1:
            for task in tasks:
                if task == asyncio.current_task():
                    print("Task 2 cancelling others first")
                    continue
                if task.done():
                    print("Some task was already done")
                task.cancel()
            return
        print("Task 2")


async def daemon():
    while True:
        await asyncio.sleep(1)
        print("Daemon task")


async def main():
    async with asyncio.TaskGroup() as group:
        tasks: list[Task] = []
        task_1 = group.create_task(job_1(tasks))
        task_2 = group.create_task(job_2(tasks))
        daemon_task = group.create_task(daemon())
        tasks.append(task_1)
        tasks.append(task_2)
        tasks.append(daemon_task)


asyncio.run(main())

Trio and anyio afford a taskgroup.cancel_scope.cancel method for this.

Asyncio doesn’t have cancel scopes, and anyway the additional indirection doesn’t make much sense, thus I’m going to propose adding a taskgroup.cancel method for this.

Currently the only way seems to be to cancel the task itself, which is stored in the taskgroup as a private datum and thus really shouldn’t be used. Alternately you can pass the task itself around, which is a problem because the task might not even be running within the taskgroup’s scope any more – and cancelling an inactive taskgroup should be a no-op.

2 Likes

Matthias is right that you can cancel the task that the task group is running in (assuming that nothing else happens before/after the task group block, otherwise you’ll need to refactor it into its own function).

But another thing that will cancel all the tasks in a task group is when one of the tasks in the group raises an exception. That’s the whole point of task groups! So you could just manually inject an exception with TaskGroup.create_task() and then catch it just outside. Here’s your example reformatted to do this:

Code example using group.create_task(_raise_cancel_request())
import asyncio
from random import random

class CancelRequestException(Exception):
    pass

async def raise_cancel_request():
    raise CancelRequestException

async def job(i, group):
    while True:
        print(f"Task {i} before sleep")
        await asyncio.sleep(1)
        print(f"Task {i} after sleep")
        if random() < 0.1:
            print(f"Task {i} cancelling others")
            group.create_task(raise_cancel_request())
            return

async def daemon():
    while True:
        print("Daemon before sleep")
        await asyncio.sleep(1)
        print("Daemon after sleep")

async def main():
    try:
        async with asyncio.TaskGroup() as group:
            group.create_task(job(1, group))
            group.create_task(job(2, group))
            group.create_task(daemon())
    except* CancelRequestException:
        pass

asyncio.run(main())

(If you change random() < 0.1 to e.g. random() < 0.8 and run it a few times, you will see that sometimes task 1 will try to cancel but task 2 will then wake up and cancel too. That’s because create_task(_raise_cancel_request()) will schedule the _raise_cancel_request() but not run it straight away, so task 2 is not cancelled quite immediately. That’s not necessarily a problem but it’s certainly a difference from Matthias’s method.)

By the way, with this particular example, you can simply raise the exception directly from job()! No TaskGroup.create_task() or extra hypothetical TaskGroup.cancel() method needed. But I assume you’re talking about a situation where you might want to cancel a task group from a task running in some other task group.

It’s possible to wrap this technique up into a cancellable task group class if that’s useful. I’m not an expert on context managers so I’m not 100% sure this is right but it seems to work:

import asyncio

class _CancelRequestException(Exception):
    pass

async def _raise_cancel_request():
    raise _CancelRequestException

class CancellableTaskGroup(asyncio.TaskGroup):
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            result = await super().__aexit__(exc_type, exc_val, exc_tb)
            # This surely means that there was no exception, otherwise super().__aexit__() would
            # have wrapped it in an exception group and explicitly raised it.
            # (In principle, if the main body raised a SystemExit or KeyboardInterrupt then that
            # could follow this route, although in practice the current implementation of
            # TaskGroup always re-raises these rather than just returning True.)
            return result

        except* _CancelRequestException:
            pass  # Suppress cancel request exception

        # If we got here, the only exception in the ExceptionGroup was _CancelRequestException
        return True

    def cancel(self):
        self.create_task(_raise_cancel_request())
Example using CancellableTaskGroup
from random import random

async def job(i, group):
    while True:
        print(f"Task {i} before sleep")
        await asyncio.sleep(1)
        print(f"Task {i} after sleep")
        if random() < 0.1:
            print(f"Task {i} cancelling others")
            group.cancel()
            return

async def daemon():
    while True:
        print("Daemon before sleep")
        await asyncio.sleep(1)
        print("Daemon after sleep")

async def main():
    async with CancellableTaskGroup() as group:
        group.create_task(job(1, group))
        group.create_task(job(2, group))
        group.create_task(daemon())

asyncio.run(main())