Asyncio, tasks, and exception handling - recommended idioms?

In trying to debug asyncio code, I’ve run into some various issues, and ended up settling on this idiom for task spawning. I’m curious how other people do things, and if there should be a recommended way in the docs? Conceptually, I want to spawn a task the same way you’d spin off a thread or start an asynchronous subprocess, where I don’t ever want to await that, so any exceptions should be logged to the console.

import asyncio
import traceback

all_tasks = [] # kinda like threading.all_threads()
def task_done(task):
	all_tasks.remove(task)
	exc = task.exception() # Also marks that the exception has been handled
	if exc: traceback.print_exception(exc)
def spawn(awaitable):
	"""Spawn an awaitable as a stand-alone task"""
	task = asyncio.create_task(awaitable)
	all_tasks.append(task)
	task.add_done_callback(task_done)
	return task

Usage would be something like this:

async def task1():
	print("Starting task 1")
	await asyncio.sleep(1)
	print("Ending task 1")

async def task2():
	print("Starting task 2")
	await asyncio.slep(2) # oopsie typo

async def main():
	...
	print("blah blah stuff goes here")
	...
	spawn(task1())
	...
	spawn(task2())
	...
	await asyncio.sleep(3)
	print("Shutting down") # No hard feelings. I don't blame you. Critical error.

if __name__ == "__main__":
	loop = asyncio.new_event_loop()
	asyncio.set_event_loop(loop)
	loop.run_until_complete(main())
	print("Unfinished tasks:", all_tasks)

Critically, every spawned task is retained (see docs note which wasn’t there in Python 3.7 or 3.8), and when the task is done, any exception is immediately reported.

I’d very much like for this to have been more discoverable. Can something be added to the docs, or maybe even have an asyncio.spawn() function that behaves like this?

Hi Chris,

Clearly you’re trying to start a discussion, but I’m not sure about what.

Are you hoping to get a discussion started about the thing that hit Hacker News over the weekend (create_task() not keeping tasks alive)?

Is this your first foray into asyncio, or have you been debugging asyncio code for a long time and are you proposing this approach based on your cumulative experience? I can’t tell from your posting here.

I presume you’re not proposing that we literally hardcode a call to traceback.print_exception() in the asyncio library (it uses logging everywhere).

I personally feel that creating “background tasks” using create_task() is probably an anti-pattern, much more than creating “background threads”. Ideally almost all tasks should be waited for, e.g. using TaskGroup (new in 3.11). I certainly wouldn’t want to have a “spawn” function in the stdlib.

Instead of doing exception logging in a done-callback, why not wrap the task in a try/except?

1 Like

Not the first, because I don’t read Hacker News; consider that to be just a coincidence.

This is an ongoing project of my first really serious foray into asyncio, so what you see above is the result of a number of iterations; it’s not something I have a huge amount of experience with, so it’s entirely possible that I’ve missed something blatantly obvious to someone who spends more time with asyncio.

And no, I’m not thinking that hardcoded print_exception is a good idea, but we have sys.excepthook and threading.excepthook so maybe asyncio.excepthook could be the way to move forward? Again, if this exists, my apologies (it doesn’t exist under that exact name but I may have missed it elsewhere).

Interesting that create_task is an anti-pattern. I’m curious how you would go about doing these sorts of things - would you have a TaskGroup that you dynamically add more tasks to over time, or would there be many such groups? This is most commonly used when the app needs to, for instance, push some data out to an SSH connection (subprocess, write to its stdin), but doesn’t need to wait for it to finish.

(Also, since TaskGroup is so new, it’s going to be a while before it can be used; this project needs to be able to run on an RPi, and I believe it’s currently using Python 3.8. That’s not a fundamental problem as we could just build CPython from source if necessary, but I’d prefer to stick to the system Python if possible.)

Wrapping the task in a try/except is definitely an option. I suspect the reason I had trouble with this is due to not retaining references to spawned tasks, which I didn’t know was a problem when this project started. So it’s possible that simply wrapping everything in a try/except would have been sufficient, although we’d then also needed to have the task retention, so it wouldn’t end up materially simpler than the current way of doing things - it’d just be spelled differently (probably an async wrapper function that just goes “try: thing() except: print_traceback”).

So mainly, I’m hoping to hear from people who’ve been using asyncio actively for years, and how they (you) go about handing the unexpected. I’ve used asynchronous I/O in a wide variety of languages, and part of the art of setting up a usable system is having a robust error trapping framework so you KNOW what you’ve done wrong :slight_smile: And I’d much rather use a standard or well-known idiom than try to invent my own.

In asyncio you can set an exception handler with loop.set_exception_handler Event Loop — Python 3.12.1 documentation.

To add some clarity here: The app uses GTK as its UI, which means that any user-triggered actions come in via callback functions (connect signal to function). These are not async functions, so they have to do their work and then finish. Writing to a subprocess’s stdin is usually fast, but is technically asynchronous, and could potentially take a bit of time (if the other end has stalled or something), so we can’t just wait for it before returning. Thus the need to spin off an asynchronous task and return.

TaskGroup looks really nice for the situations where there’s a well-defined group (“do all these things and don’t return till they’re all done”), but I’m not sure how I would use it here.

If you want to keep the user-triggered tasks running when one of them raises an unhandled exception, you should NOT use the vanilla TaskGroup. It will cancel all others immediately.

I’m going to propose another API which works like asyncio.gather(..., return_exceptions=True) with a safety guarantee and which can add tasks at any time during its lifetime. Currently it’s being worked here: Asyncio supervisor by DontPanicO · Pull Request #31 · achimnol/cpython · GitHub

Taken from the comment:

Implementation Guaranteed subtask cleanup Add subtask at any time during lifespan Cancel siblings upon unhandled subexception Allow explicit shutdown() from other tasks Result collection
asyncio.as_completed() Iterate over results as they become available
asyncio.gather() with return_exceptions=True List
asyncio.TaskGroup Read task.result() after exit
aiotools.PersistentTaskGroup Read task.result() after exit
asyncio.Supervisor (achimnol/cpython#31) Read task.result() after exit
aiotools.as_completed_safe() based on PersistentTaskGroup ✕ (intended, made possible by additional task wrapping with futures) Iterate over results as they become available
aiotools.as_completed_safe() based on Supervisor (unintended side-effect) Iterate over results as they become available
2 Likes

@ambv @guido @yselivanov This is what I was going to talk with you in the sprint. Unfortunately I didn’t have chance to have a chat with you in person and now I’m back in Korea.
Please take a look and let’s discuss what new APIs should take care of.

As @yselivanov has mentioned, as_completed_safe() doesn’t need to be a part of standard asyncio but just a 3rd-paty library like aiotools. But it’s behavior differs by whether using PersistentTaskGroup (my custom impl.) and Supervisor (DontPanicO’s standard-ish impl.), so I think we may need some clarification before fixing the final design for the standard Supervisor.

As @guido has mentioned, the early asyncio APIs are somewhat inspired by Java’s concurrent.futures. Maybe now we can refer more examples as async-await syntax became more prevalent in many languages, such as Kotlin, C#, Javascript, and Rust, as well as the higher-level API design patterns including trio, anyio, and Golang.

@kumaraditya303 I heard that you are now maintaining asyncio along with Guido. Please also take a look on my above discussion.

Sorry we didn’t get more time at the conference! I hope you nevertheless had a good time.

I agree that as_completed is not an API we want to promote; you can do what it does by registering “done callbacks” for the tasks.

While I appreciate the table, I’d like to see a more detailed comparison between PersistentTaskGroup and Supervisor. AFAIK the former is just like TaskGroup but doesn’t cancel siblings when a task exits with an exception or is cancelled. This can be done by adding a new keyword argument to TaskGroup. How is Supervisor different?

1 Like

I’ve really enjoyed PyCon, don’t worry! :wink:

I’m going to experiment a little bit more with Supervisor to implement what I’ve been thinking for as_completed_safe(). If it goes well, I think we could settle down to it and explore how we could implement it using as an option to TaskGroup.

Difference between Supervisor and PersistentTaskGroup

@guido @DontPanicO
Conceptually the goal is same. The implementation has some differences, however.

Impl. PersistentTaskGroup Supervisor My opinion about reconcilation
Task result wrapped as future Separates in-body exception handling from taskgroup failure Tasks are not wrapped We should follow the Supervisor way.
Exception handling - Raises BaseExceptionGroup from collected errors
- Allows setting an explicit exception handler
- Raises BaseExceptionGroup from collected errors We should leave only the explicit exception handler to prevent memory leak, and leave the responsibility to raise exceptions or exception groups to the caller.

I think “task result wrapping” is not a strictly necessary detail. I realized that it’s fine to remove this detail after experimentation with as_completed_safe() and gather_safe() as in this thread.

So, we can reconcile both implementation into one.
You can check out the reconciled version of Supervisor at Experiment with DontPanicO's Supervisor impl. by achimnol · Pull Request #53 · achimnol/aiotools · GitHub.

(Thanks for the write-up. I am currently behind on complex topics like this, I will get back to it!)

Okay, looking at your Supervisor implementation in aiotools v2.0, it’s mostly a clone of TaskGroup, except that:

  • It doesn’t keep a list of errors from failed tasks
  • If a task fails it doesn’t abort the remaining tasks, unless (in certain cases) the error is SystemExit or KeyboardInterrupt
  • There’s a new async shutdown() method that aborts with a distinct message and waits for all tasks to exit

It seems then that when a task fails, no error will be logged unless the task itself catches the error, or an additional done-callback is registered for the purpose by the user.

Now I have a question: what is the use cases where you still would want to use async with Supervisor() as sv? I would imagine that the supervisor is just stored as a (conceptually) global variable, calling its create_task() method whenever you create a new background task, and when your app or server needs to shut down, you use await sv.shutdown(). It would just be a little awkward to also have to have that async with sitting somewhere. Without that, the implementation could probably become much simpler. What use case did I miss?

The location of async with makes it clear who is the parent task. The parent is always the task where __aenter__() is called. It also ensures that Supervisor terminates within the parent.

For instance, if we just create a Supervisor object globally, it does not have the parent task and would cause undefined behavior when handling nested cancellation. In such case, I’d prefer put it on the server’s topmost task like async def main().

It is also important to have an explicit, separate shutdown() because it may be triggered from a different tree of tasks, such as signal or event handlers.

Okay, that makes sense. Maybe the next step is to attempt a refactoring of TaskGroup and reuse as much as possible in Supervisor, as a draft PR.

We should probably also continue the API design in a CPython issue.

1 Like