Asyncio - add shield scope context manager for improved control flow in sensitive code sections

See github issue here

Proof of Concept Code

This is being created just to open a discussion, if anyone is interested in this or not.
Please reply if you are interested in this feature.

Personally, the lack of this feature is why I have not used asyncio previously, because shutdown code was too painful/complicated to write.

Feature or enhancement

Add a shield_scope context manager which can shield a block of code from cancellation and block waiting until it completes. A pending cancellation for a task would either be raised during the next non-shielded await or at the end of the task.

await cancellable_work()
with shield_scope():
    state = 2
    await work_to_complete_after_state_change()
    await other_important_work()
print('sync code, cannot be cancelled')
await maybe_cancelled_work()

Pitch

The current shielding mechanism, asyncio.shield works by wrapping one future in another so the cancellation applies to the outer future while the inner future continues to run. While this solution is effective in protecting the awaitable you pass in, control flow becomes muddled (the exception bubbles up while this future is now running detached in the background) and the code the shield covers often has to grow to encompass other resources to avoid control flow issues, limiting the value of the cancellation features of asyncio.

The simplest current workaround to maintaining control flow using asyncio.shield seems to be to wrap up the coroutine in a task and wait on that after the cancellation occurs. This solution adds a lot of overhead (Task object + loop cycle until task can start) to shielded functions, which can add up quickly in hot code paths. If shielded functions are nested, where to re-raise the cancellation also becomes a challenge requiring contextual information to know if we are still inside a coroutine which is shielded.

import asyncio


async def work():
    print("task started")
    task = asyncio.create_task(asyncio.sleep(5))
    try:
        await asyncio.shield(task)
    except asyncio.CancelledError:
        print("task cancellation caught")
        await task
        print("task ran to completion")
        raise


async def main():
    task = asyncio.create_task(work())
    await asyncio.sleep(1)
    task.cancel()
    await task


asyncio.run(main())

The following example hopefully provides a more concrete example of how a shield scope could more efficiently solve this shielding / control flow need and be easier to work with.

The following shows how a shield_scope might be used. If the same code were run with asyncio.shield wrapping the publish coroutines, the resource_lock acquired by create_resource would be released before publish completed and the delete_resource coroutine would finish publishing before the create_resource coroutine.

import asyncio

# lets say we have a collection of resources and a pub/sub system 
# which we want to send all signals related to those resources to
# lets also say that we want all messages (created, deleted, updated, etc.)
# to be sequential for each resource. to do that, we use a lock for each
# resource. there may be some contention around each lock, as
# each operation for a resource may take awhile, so we want to maintain
# the ability to cancel waiting on the lock at any time (such as a client of a web server might do)


async def publish(msg, delay):
    print(f"starting publish: {msg}")
    await asyncio.sleep(delay)
    print(f"ending publish: {msg}")


async def create_resource(resource_lock):
    print("create - wait lock")
    await resource_lock.acquire()
    print("create - acquire lock")
    try:
        print("create - resource")
        # once we have created the resource, we have to finish publishing
        # or the other parts of the system wont know about it
        # we have to hold the lock or publishing of the created message
        # may interleave with publishing of the deleted message
        with shield_scope():
            await publish("create - resource", delay=3)
    finally:
        resource_lock.release()
        print("create - release lock")


async def delete_resource(resource_lock):
    # the lock is outside the shield so we can still cancel waiting on it
    # if there is contention with the lock. maybe we come back later.
    print("delete - wait lock")
    await resource_lock.acquire()
    print("delete - acquire lock")
    try:
        print("delete - resource")
        with shield_scope():
            await publish("delete - resource", delay=2)
    finally:
        resource_lock.release()
        print("delete - release lock")


async def main():
    resource_lock = asyncio.Lock()
    creator = asyncio.create_task(create_resource(resource_lock))
    await asyncio.sleep(0)
    deleter = asyncio.create_task(delete_resource(resource_lock))
    await asyncio.sleep(0)
    creator.cancel()
    await asyncio.sleep(5)


asyncio.run(main())

Previous Discussions Removed because I don’t have permission to post that many links

Let’s continue the discussion in the issue rather than here.