TL;DR: Let’s allow async with
to accept both sync and async context managers in a single statement, as long as there’s at least one async context manager present. This would eliminate the nested indentation that currently plagues mixed context manager code. GitHub issue here.
The Problem
If you write async Python with context managers, you’ve hit this pain point: every time you need to mix sync and async context managers, you get punished with another level of indentation:
# status quo: the staircase of doom
async with acquire_lock() as lock:
with open('config.json') as f:
async with connect_to_db() as db:
with temp_directory() as tmpdir:
# Your actual code is now 16 spaces deep;
# you and your colleagues are sad.
You can get around this with contextlib.AsyncExitStack()
, but only at the cost of some serious footguns for the unwary (e.g. mis-nesting contexts, the sharp edges in .pop_all()
, etc).
The Proposal
Allow async with
to accept both sync and async context managers:
async with (
acquire_lock() as lock,
open('config.json') as f,
connect_to_db() as db,
temp_directory() as tmpdir,
):
# Your code starts at a reasonable indentation
# Your colleagues might actually review your PR
The rule is simple: if there’s at least one async context manager in the group, you can use async with
for the whole thing. This is semantically identical to a safe use of AsyncExitStack
, but much prettier code and harder to get wrong.
Unsatisfying workarounds
Add checkpoints
Wrapping sync context managers in async helpers adds await points where your async framework can switch tasks. Context manager boundaries are terrible places for task switches - they often handle timeouts, locks, or error recovery - and so this can introduce bugs, as well as the performance overhead.
def as_acm(ctx):
@asynccontextmanager
def inner():
with ctx as val:
await sleep(0)
yield val
return inner()
async with (
acquire_lock() as lock,
as_acm(open('config.json')) as f,
connect_to_db() as db,
as_acm(temp_directory()) as tmpdir,
):
# hope you didn't need an atomic section there...
An AsyncExitStack
helper function
The semantics of a async with multicontext(...)
helper are better than the as_acm()
trick, but you lose the locality of as
clauses, and with that the code becomes uglier and harder to understand:
@asynccontextmanager
async def multicontext(*cms):
assert any(hasattr(cm, "__aenter__") for cm in cms)
async with AsyncExitStack() as stack:
enter, aenter = stack.enter_context, stack.enter_async_context
yield tuple(
(enter(cm) if hasattr(cm, "__enter__") else await aenter(cm))
for cm in cms
)
async with (
acquire_lock(),
open('config.json'),
connect_to_db(),
temp_directory(),
) as (lock, db, f, tmpdir):
# Wait, which order were those in again?
Implementation Considerations
The change should be straightforward:
- Runtime check that at least one async context manager is present
- Proper handling of
__enter__
/__exit__
vs__aenter__
/__aexit__
The semantics are clear: sync context managers use their normal protocol, async ones use the async protocol, execution order remains left-to-right. AsyncExitStack
already does this; I’m just proposing to support it directly at the statement level where code can be as clean and simple as possible.
Prior Art
I’ve been using both of the “unsatisfying workarounds” above in production Trio code; they do indeed work and are indeed unsatisfying. We would be delighted to switch over to using the async with
statement as soon as possible.
Backwards Compatibility
This is purely additive. All existing code continues to work. The only “breaking” change is that code that was previously a RuntimeError
would now work, with the only reasonable non-error semantics.
What do you think?
Am I missing some crucial consideration here? Is there a technical blocker I’m not seeing? For those using structured concurrency with asyncio.TaskGroup
or Trio or AnyIO, this would be particularly nice - you could finally group your timeout/cancellation contexts naturally with your resource contexts without the indentation penalty.