Context manager for asyncio tasks

When setting up long running asyncio tasks, you want to make sure to cancel the task when the code that originally started the task goes away.

To do this, I have the following contextmanager to ensure this happens correctly: (I’ve needed to copy and paste this code to every project I work on that uses asyncio)

from asyncio import CancelledError, Future, ensure_future
from contextlib import asynccontextmanager
from typing import Coroutine, Union

async def task_context(coro_or_future: Union[Coroutine, Future]):
    task = ensure_future(coro_or_future)
        yield task
        if not task.done():
                await task
            except CancelledError:

Here is an example of it’s use:

from asyncio import run, sleep

async def background():
    while True:
        await sleep(10)
        print('Background task stuff...')

async def main():
    async with task_context(background):


When run_main_server finishes, the background task will be canceled and wait for us.

Here is another example: aiterx/ at f74da6cad7e1cc0bda4e922b3fd208eec51b9fa5 · garyvdm/aiterx · GitHub


I guess async with task_context(background()): instead of async with task_context(background):.

Also, AsyncExitStack can be useful for multiple background tasks.

1 Like