Cancelable tasks cannot safely use semaphores

I’m trying to write a fairly basic library in async Python, based on a pattern that I have used successfully in other languages.

The pattern is, essentially:

  • Acquire a semaphore token.
  • Start a concurrent task that performs some work and then releases the semaphore token.
  • Wait for all of the semaphore tokes to be returned, guaranteeing that all tasks are complete.

This sort of workflow is at the core of APIs like Go’s errgroup.

Unfortunately, the pattern seems to be nearly impossible to implement correctly in Python due to an API oversight (asyncio.Task.cancel documentation is inaccurate · Issue #98275 · python/cpython · GitHub): since a Task doesn’t run at all when cancelled, it has no way to reliably return the semaphore token when it completes.

It seems like in order to work around this I will have to find some way to prevent the task from being canceled until it reaches its tryfinally block, which will greatly complicate the synchronization in my library.

This is closely related to some bugs and/or counterintuitive behaviors in asyncio.TaskGroup, such as asyncio.TaskGroup may silently discard request to run a task · Issue #116048 · python/cpython · GitHub.

If you merge the first two of these to “Start a task that acquires, performs work, then releases” This works fine.

TaskGroups seem to have had several issues that come down to semantic differences between asyncio and trio, where it was essentially ported from. I’ve had no problems just using asyncio.gather on a set of tasks when needing to block on completion of a set of tasks, if this is the only reason for your semaphore, you can simplify this even further as:

create a bunch of tasks, await the completion of the set of tasks.

If you merge the first two of these to “Start a task that acquires, performs work, then releases” This works fine.

That generally defeats the purpose of the semaphore. The point of the semaphore is to limit the amount of in-flight work, and thus the memory footprint of that work. If you move the “acquire semaphore tokens” portion of the work into the task itself, you now have an unbounded number of tasks consuming memory in the event loop’s queue.

Moreover, often you want to monitor the work to be performed: for example, increment metrics for spans or mark work items with their results. If you pass a batch of work to a Task with the expectation that the Task will dispatch that work in some way, and then the Task disappears without ever running, the results of that work will not be recorded. So, again, you end up needing more synchronization: say, repossessing the work items from the Task before you cancel it. And lots of extra complexity from that synchronization.

If your goal is reducing the maximum number of tasks alive, even ones that haven’t done anything to consume memory other than the minimal footprint of a task because all of their work hasn’t started yet, you should switch this to a producer/consumer pattern and have a fixed number of workers that pull jobs from a queue, and set a max queue size so that the producer end doesn’t eagerly fill the queue too quickly

2 Likes

you should switch this to a producer/consumer pattern and have a fixed number of workers that pull jobs from a queue

Having a fixed number of workers (a) makes it more likely that those workers themselves will become a memory leak, and (b) makes it more difficult to debug a running program due to the number of idle workers.

I went into those problems in much more detail in the context of Go concurrency patterns in my talk at GopherCon 2018: https://www.youtube.com/watch?v=5zXAHh5tJqQ

As far as I can tell, the same concerns apply for async tasks in Python.

and set a max queue size so that the producer end doesn’t eagerly fill the queue too quickly

Yes, bounding the footprint of the items in a queue is one of the most common uses of a semaphore. But you can’t always rely on the length of the queue for that: for example, if the queue items have heterogeneous sizes, you may want to use a weighted semaphore rather than simply counting the number of items.

If one part of the code is acquiring a semaphore and starting a task, then why doesn’t the code that’s cancelling a task release the semaphore?

If one part of the code is acquiring a semaphore and starting a task, then why doesn’t the code that’s cancelling a task release the semaphore?

If the task has actually been started, it will release the semaphore token itself. So now you have another synchronization problem: you have to either figure out whether the task has started, or figure out how to revoke its token without introducing a race condition.

Again, possible to do — but very much at odds with Python’s usual philosophy of simple, readable code.

Not particularly, no. I’ve been meaning to do a more in-depth writeup about asyncio, threading, and safe concurrency in Python based on my experiences using it in a highly distributed industrial setting at some point.

@Liz is correct here, as long as both sides have predictable bounds, there’s no runaway memory use or leak possible here, but this doesn’t address your ideas on a weighted semaphore. It might help to see this in a structured example to convince yourself of it.

async def producer(q: asyncio.Queue):
    while check_not_shutting_down():  # implement this however you want
       job = None
       try: 
           job = ...
           await q.put(job)  # backpressure here keeps producer holding a maximum of 1 job
       except asyncio.CancelledError:
           if job is not None: # we still have a job to handle
               await q.put(job)
           raise

async def consumer(q: asyncio.Queue):
    while job := await q.get():
        await process_job(job)  # the total work of this function at any given time is bound by number of consumers and how much work any given job can be.
        q.task_done()

async def driver():
    q = asyncio.Queue(maxsize=5)
    producers = {asyncio.create_task(producer(q)) for _ in range(2)}
    consumers = {asyncio.create_task(consumer(q)) for _ in range(5)}
    try:
        await asyncio.gather(*producers)
    except SomeShutdown:
        signal_shutdown_to_producers()
    
    for task in producers:
        task.cancel()

    await asyncio.gather(*producers, return_exceptions=True)
    # handle exceptions here how you see fit, retrieve them from the task objects
    await q.join()  # queue is now empty, and safe to shutdown consumers
    for task in consumers:
        task.cancel()
    await asyncio.gather(*consumers, return_exceptions=True)
    # handle exceptions here how you see fit, retrieve them from the task objects

The numbers there are obviously tweakable, and there’s other ways to do this as well, but this seems like a case where you’re too used to what works in go, and go has a slightly different concurrency model than asyncio does. asyncio tasks are just thin wrappers around coroutines scheduled to the event loop that don’t block execution in the current context. coroutines in python aren’t threads (but may wrap them) and are internally just state machines (technically, originally implemented as generators) If you have a fixed number of them and are tracking them, there’s fixed maximum overhead for them.

The hard part here in python isn’t ensuring that there isn’t a memory leak, but in proper signal handling for graceful shutdown of an asyncio application, as without a lot more work than some people would consider reasonable, python can interrupt anywhere, including in critical sections.

To do this reliably in a cross-platform manner, you need to use signal.set_wakeup_fd and handle this yourself, and this often means your event loop should actually be in a thread, with the main thread existing only to handle signals and interact with any potential console, tui, or gui

As for implementing a weighted semaphore or a queue with job weights, you can implement it yourself if you want, asyncio’s synchronization primitives aren’t truly primitives and you can bring your own replacements, but there’s more complication there that you’ll have to judge the worth of.

1 Like

There’s an example at Coroutines and Tasks — Python 3.13.0 documentation that might help.

Basically, have a set of the tasks that are active. If a task is cancelled and it isn’t in the set, it hasn’t started yet, so the semaphore needs to be released.

The hard part here in python isn’t ensuring that there isn’t a memory leak, but in proper signal handling for graceful shutdown of an asyncio application, as without a lot more work than some people would consider reasonable, python can interrupt anywhere, including in critical sections.

It may be that no Python-managed memory leaks — the use of weak references in the event loop queue presumably helps a lot with that — but it is certainly possible in your example to trigger Task was destroyed but it is pending! warnings from the interpreter. For example, if a call to driver is cancelled during the call to q.join(), the consumers will be collected while they are still in the pending queue.

Are you saying that it Task was destroyed but it is pending! warnings from the interpreter are expected in normal operation?

Yes, that’s what I mean about adding extra complexity. The need to explicitly track the set of active tasks makes the program much more complicated than an alternative in which all tasks run up to the first await.

No, I’m saying driver should never be canceled, only signaled to gracefully stop the work process. Hence pointing out that you have to do a lot more work to get graceful shutdown in python.

You can see an example in some personal hobby code which in turn uses another thing that I’ve begun working on making more generalizable for other people to use without reinventing it.

Cancellation shouldn’t be something arbitrary that happens anywhere, but something communicated to things that can handle it in a way that lets pending work be either recovered and stored or finished, and then shutdown.

No, I’m saying driver should never be canceled. Hence pointing out that you have to do a lot more work to get graceful shutdown in python.

So, what does that imply for library code? All of the options seem quite bad.

  • Should libraries be written to handle a CancellationError everywhere? (That adds a lot of complexity to the libraries, as discussed above.)

  • Should callers be expected to asyncio.shield all of their calls to libraries to prevent them from seeing an unexpected CancelledError? (That adds a lot of noise on the caller side, and a lot of fragility due to the risk of accidentally-missed shield calls.)

  • Should applications that use asyncio avoid using libraries entirely? (That seems laugable in a realistic software ecosystem, but I guess it’s an option…?)

  • Should applications be written so to avoid propagating cancellation down the call tree in general? (That seems like it makes asyncio cancellation much less useful than corresponding facilities in other languages, such as like Go’s context package — if you can’t propagate cancellation, you end up wasting a lot of capacity and latency on cancelled operations, which makes services a lot less stable under load.)

Even a task cancellation before the task start activates the corresponding “done” callbacks (if any). Demo below. The semaphore operation could be moved into a callback.

import asyncio

async def coro():
    print("task running")   # won't be printed
    await asyncio.sleep(5)

def done(t):
    print("done", t)        # WILL be printed

async def main():
    task = asyncio.create_task(coro())
    task.add_done_callback(done)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

Also the relatively new (3.12+) “eager tasks” feature probably could ensure that a new task was reliably started. I haven’t used it yet.

1 Like

Following the below should get you there:

  • ensure os signals are handled rather than thrown into tasks where they can interrupt critical sections such as finally blocks or __aexit__
  • avoid cancelling (via task.cancel, cancelscopes, or anything similar) tasks that aren’t designed to handle cancellation
  • ensure frequent enough context switching and signaling capacity to have your top level driving functions communicate cancellation downwards cooperatively rather than by interrupting, at least for sub-tasks that have state or side effects that requires graceful closure.
  • limit your use of libraries to those that don’t arbitrarily cancel tasks that aren’t designed to handle cancellation.
  • Use async context managers to manage async resources

Not many libraries cancel tasks that don’t belong to them, and I can’t say that I’ve even seen any that are designed to cancel while calling an arbitrary callback (they might exist though)

  • ensure os signals are handled rather than thrown into tasks where they can interrupt critical sections such as finally blocks or __aexit__

To be clear, I’m not talking about OS signals at all. Cancellation is a general mechanism, not tied to a specific trigger.

  • avoid cancelling (via task.cancel, cancelscopes, or anything similar) tasks that aren’t designed to handle cancellation
  • ensure frequent enough context switching and signaling capacity to have your top level driving functions communicate cancellation downwards cooperatively rather than by interrupting, at least for sub-tasks that have state or side effects that requires graceful closure.

So, to me that sounds like you’re recommending “avoid propagating cancellation down the call tree in general”, or at least avoiding the built-in asyncio cancellation for that purpose? (How else is one supposed to know whether a given library is “designed to handle cancellation”, since that doesn’t seem to be documented consistently?)

Which, again: seems like a pretty serious design flaw in asyncio. If the cancellation mechanism it provides isn’t actually suitable for user code to manage cancellation, why is that such a big part of the asyncio API to begin with?

  • Use async context managers to manage async resources

That’s kind of begging the question. If you can’t implement a correct async context manager, then you can’t use it to manage resources — and that’s exactly the sort of library I’m talking about.

  • limit your use of libraries to those that don’t arbitrarily cancel tasks that aren’t designed to handle cancellation.

It appears that at least some ASGI library implementations intentionally cancel the handler tasks on disconnect. For example, quart cancels the call to the application’s handler when the receiver disconnects: quart/src/quart/asgi.py at ef23e8b063ee0b1ae95ba5828570552685ad89bb · pallets/quart · GitHub


To put it another way: if asyncio programs are not intended to actually use the asyncio cancellation mechanism to propagate cancellation, that could be made a lot clearer in the asyncio documentation. Coroutines and Tasks — Python 3.13.0 documentation currently says:

Tasks can easily and safely be cancelled. When a task is cancelled, asyncio.CancelledError will be raised in the task at the next opportunity.

It is recommended that coroutines use try/finally blocks to robustly perform clean-up logic. In case asyncio.CancelledError is explicitly caught, it should generally be propagated when clean-up is complete. asyncio.CancelledError directly subclasses BaseException so most code will not need to be aware of it.

What I am saying here is:

  • Tasks cannot “easily and safely” be cancelled, because the subtlety of the API makes things much more complicated, and less safe as a result.

  • try/finally blocks do not “robustly perform clean-up logic”, because they are not necessarily ever even executed.

Those observations are in direct opposition to the official documentation.

1 Like

You can, so long as arbitrary interrupts and cancelation in this scope can’t happen. This requires using your own signal handling instead of python’s where python would turn it into an exception that can be raised at any point, including critical sections where it cannot be safely handled (why I mentioned it even though you didn’t bring it up) There’s even relatively recent examples that show KeyboardInterrupt can break python’s locks and semaphores (leaving them permanently acquired) This is even worse if you are holding external resources like a posix named semaphore.

My experience is that the official documentation is in fact wrong here, that asyncio cancellation is poorly understood, that asyncio task groups as well as uncanceling, and trio cancel scopes all introduce additional new problems because they are designed around the idea that anything can be safely cancelled or resumed. This isn’t generally true. The moment you are holding a stateful object or in the process of work with side-effects, you should not interrupt this without ensuring you have designed the semantics for what will happen to that stateful object or that you just don’t care if work or a resource is dropped or left in an incoherent state during shutdown.

More and more layers on top of it such as PEP 789 – Preventing task-cancellation bugs by limiting yield in async generators | peps.python.org won’t actually fix this, though it will reduce the number of places people will be surprised by it. People need to fundamentally change how they think about tasks and only cancel things they know are safe to cancel.

2 Likes

I don’t know about this. Anything can be cancelled if someone upstack is using asyncio.timeout or a TaskGroup, so it’s not really realistic to expect you will never be cancelled. Cancellation is also a very useful tool, and very useful patterns like deadlines depend on them. A better strategy would be to educate people how to write async code that can handle cancellation, and develop tools and patterns to better support it.

1 Like

I’d highly discourage people from using TaskGroups in general, they aren’t better than using gather properly precisely because of cancelation semantics, and that’s not just a strongly opionated view, but also reflected in all of the issues that have come up regarding cancellation on the issue tracker since they were included where it becomes clearer to more people that asyncio wasn’t made for this model in all the ways that it breaks their existing expectations by upholding all of the things that existed before TaskGroups.

I’d also say that just like you shouldn’t cancel arbitrary things that can’t handle it, you shouldn’t wrap arbitrary things that need to be gracefully stopped in a timeout.

For various reasons, this isn’t possible from an information theory standpoint when considering arbitrary interrupts and cancelation.

To the extent that it is possible in only a single-threaded application with only cancellation and run on a system that will not send an os interrupt[1], you can get “most of the way there” with asyncio.shield + try/except, but you would need to wrap every single yield point during stateful work, including down stack in library code that may not know it is part of your stateful work, such as a network request to another service.

This doesn’t actually work because stateful work can require multiple yields and reraising at each level wouldn’t result in the right layer shutting things down, you’d need to suppress cancelation without re-raising at every low level yield within your stateful work, and then still communicate to the thing driving it to stop taking new jobs. It’s truly better to just design to signal when workers should stop taking new jobs, and close everything down cooperatively when this matters.

You can write deadlines, but I don’t actually find the concept of deadlines to interact with stateful work in an asyncio application. Once stateful work has started, the assumption should be that it was in flight before the deadline and shouldn’t be stopped. There’s a lot of prior existing art in other languages about this, as well as decades of research in general about concurrency and distributed systems. A search term that might help is “at least once delivery”, which is what most distributed networking systems aim for, paired with a method of idempotency and synchronization in places where that matters, resulting in effectively “exactly once” for those places

I don’t know where along the line people stopped remembering that async was for cooperative user-space scheduling of work, but part of that when you care about total system resiliency involves not throwing arbitrary exceptions into things not capable of being designed to handle them.

I’m very selective about the async libraries I use as a result of this, and I do think that the majority of the async ecosystem in python (not just asyncio, trio, curio, quattro, and anyio here too) get this wrong.


  1. Asyncio Locks wrap possible cancellation in multiple nested levels of attempting to catch cancellation internally without calling other libraries, and can still end up broken by a KeyboardInterrupt at the wrong time. ↩︎

1 Like

This sounds like an interesting problem. Could you put together a minimal, toy example so I can take a look at it and think about it?