Support concurrent task execution with async context managers

I’ve long conceptualized the with statement as syntax sugar for a try-finally inside a function that calls its argument:

def inner_code(context_value):
    # Do something with context_value.
    # Notably, in a normal synchronous context it doesn't make a ton of
    # sense to try and allow concurrent operations in a with statement.
    return

# Sugared version.
@contextmanager
def with_example():
    context_value = do_some_setup()
    try:
        yield context_value
    finally:
        teardown(context_value)

with with_example() as context_value:
    inner_code(context_value)

# Desugared version.
def with_example_desugared(inner):
    context_value = do_some_setup()
    try:
        inner(context_value)
    finally:
        teardown(context_value)

# Equivalent to the above with statement, in vague terms.
with_example_desugared(inner_code)

It occurred to me, when attempting to simplify interactions with an asyncio subprocess, that we get increased freedoms with an async context manager. Say we want to interact with a subprocess using only StreamReaders (this doesn’t handle a bunch of edge cases or a reader that stops reading, but bear with me) - you can do that with something that resembles async with:

class AsyncBytesIO(object):
    def __init__(self, data): self._data = data
    async def read(self, size=-1):
        assert size > 0  # Simplicity.
        val = self._data[:size]
        self._data = self._data[size:] # Inefficient, sure. Just demonstrating.
        return val

async def with_piped_subprocess(output_reader: asyncio.StreamReader):
    # Consume the stream, one line at a time.
    while not output_reader.at_eof():
        print(await output_reader.readline())

async def pipe_subprocess(input_reader: asyncio.StreamReader, inner):
    proc = await asyncio.create_subprocess_exec(
        "cat", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)

    async def feed():
        while True:
            data = await input_reader.read(io.DEFAULT_BUFFER_SIZE)
            if not data:
                break
            proc.stdin.write(data)
            await proc.stdin.drain()
        proc.stdin.close()

    try:
        await asyncio.gather(feed(), inner(proc.stdout))
    finally:
        await proc.wait()

async def main():
    values = AsyncBytesIO(b"line1\nline2\n")
    await pipe_subprocess(values, with_piped_subprocess)

asyncio.run(main())

If we continue to think about async with as sugar for a function that calls one of its arguments, it’s pretty clear that we might want an async context manager to run some background task alongside the inner code. With this model, the above abstraction might be instead represented as:

async def main():
    values = AsyncBytesIO(b"line1\nline2\n")
    async with pipe_subprocess(values) as output_stream:
        await with_piped_subprocess(output_stream)

As another example use-case to think through, imagine a Lock context manager that maintains a lock as long as the inner process continues to execute. It might want/need to periodically re-up the Lock’s representation in case there’s some form of timeout or stall detection (sure, you might not always want this behavior because maybe your code really did hang, but it’s an example).

I realize this leaves a lot of open questions around how the async context manager’s background work gets cancelled, how this would interact with @asynccontextmanager, and how you’d model this in object form. I’m mostly curious if this line of thinking seems reasonable to other folks, and whether these seem like tractable problems.