PEP 828: Supporting 'yield from' in asynchronous generators

Hi friends! Just wanted to say that the Steering Council has appointed @yselivanov as PEP delegate for PEP 828 and he has accepted. Given that Yury authored the original async/await and async generator PEPs, he’s well-positioned to make the call here. He will make a recommendation to the SC as soon as he finish deliberating.

Thanks you all for your patience!

19 Likes

If return values aren’t added then pluggy will be stuck using clutches for async

3 Likes

I think it’s a little more complicated than that, because to ensure timely cleanup you generally need to write with closing(gen()) as g: yield from g (or async variant). Unfortunately the [async] yield from expression in particular makes it very attractive to write the more concise but often-incorrect inline version instead.

As usual, I want to ensure that Trio users and flake8-async can ensure that every async function transitively yields-or-awaits back into the framework, so that timeouts can fire etc. Either the current proposal of distinguished async yield from and yield from expressions, or banning yield from <sync gen>-in-async-gen (lightly preferred, as I also have not seen uses for this), would work for me; but a yield from accepting either sync or async generators would not.

3 Likes

I’ve tried the reference implementation but I’m not sure the exception handling (athrow) is working right. Or maybe it is working but that indicates a mismatch between sync and async generators. Or maybe I’m missing something?

This version (all async) works:

async-test
"""async-test"""

import asyncio

async def get_nums():
    try:
        yield 1
    except EOFError:
        print("Don't care about EOF")
    yield 2
    yield 3
    print("get_nums finished")

async def my_gen():
    async yield from get_nums()
    print("my_gen done")

async def amain():
    g = my_gen()

    print(await anext(g))
    print(await g.athrow(EOFError()))
    print(await anext(g))
    try:
        print(await anext(g))
    except StopAsyncIteration:
        print("fin")

asyncio.run(amain())

This version (mixed sync async) doesn’t work:

sync-test
"""sync-test"""

import asyncio

def get_nums():
    try:
        yield 1
    except EOFError:
        print("Don't care about EOF")
    yield 2
    yield 3
    print("get_nums finished")

async def my_gen():
    yield from get_nums()
    print("my_gen done")

async def amain():
    g = my_gen()

    print(await anext(g))
    print(await g.athrow(EOFError())) # RuntimeError: Task got bad yield: 2
    print(await anext(g))
    try:
        print(await anext(g))
    except StopAsyncIteration:
        print("fin")

asyncio.run(amain())

I agree with


I’m a big fan of PEP 380 (yield from) in synchronous contexts. I’m glad it pushed back against the criticisms of allowing return values. I discovered yield from in the process of writing an incremental parser a couple of years ago. I was revisiting it a couple of days ago, and long story short I ended up discovering this PEP.

A very stripped down (and for a different task) version of the parser is below.

incremental-sync-parser
from typing import Generator, TypeAlias, TypeVar

T = TypeVar("T")

Returner: TypeAlias = Generator[None, None, T]

class IncrementalChunker:
    def __init__(self) -> None:
        self._results:   list[int]     = list()
        self._buffer:    bytes         = bytes()
        self._generator: Returner[int] = self._parse_forever()

        self._wakeup()

    def __iter__(self) -> Generator[int]:
        while self._results:
            yield self._results.pop(0)

    def feed(self, more_bytes: bytes) -> None:
        self._buffer += more_bytes
        self._wakeup()

    def _wakeup(self) -> None:
        self._generator.send(None)

    def _await_n_bytes(self, n: int) -> Returner[bytes]:
        while len(self._buffer) < n:
            yield

        ret          = self._buffer[:n]
        self._buffer = self._buffer[n:]

        return ret

    def _parse_forever(self) -> Returner[None]:
        while True:
            parsed = yield from self._parse_single()
            self._results.append(parsed)

    def _parse_single(self) -> Returner[bytes]:
        b_1 = yield from self._await_n_bytes(1)
        b_2 = yield from self._await_n_bytes(1)
        # could've used `_await_n_bytes(2)` but I'm
        # demonstrating a point of how we can wait

        return b_1 + b_2

decoder = IncrementalChunker()

decoder.feed(b"Hello")
for t in decoder:
    print("Round 1", t)
# Round 1 b'He'
# Round 1 b'll'

decoder.feed(b"")
for t in decoder:
    print("Round 2", t)

decoder.feed(b" ")
for t in decoder:
    print("Round 3", t)
# Round 3 b'o '

decoder.feed(b" World!")
for t in decoder:
    print("Round 4", t)
# Round 4 b' W'
# Round 4 b'or'
# Round 4 b'ld'

I’ll post later ( :sleeping_face: sorry I’m really busy with a project rn ) about trying to mimic asyncio (at-least the basics of PEP-3156 based on PEP 380 but pre PEP-492) using yield from, and trying to implement async generators as a subsequence of the yields. Just to see if it’s possible to spell out the yield from and/or async yield from formal semantics similar to PEP 380 in py code if possible, rather than browsing any intense reference implementation in C, and comparing yield from (sync) vs await (async).

It looks like this case isn’t covered by our test suite, so I must have missed it. The bug is likely just a missing _PyAsyncGenValueWrapperNew call somewhere.

I have a branch where I started this, but I haven’t got around to finishing it yet. I think it’s less important for this PEP because the semantics are supposed to be exactly the same as PEP 380, so any differences should be immediately flagged as bugs.

1 Like

For the async iteration case, it’s important to spell out exactly where the yields are versus the awaits, and that’s not something PEP 380 covers.

Synchronous iteration is literally identical, so that can refer back to the existing behaviour (and highlight that as part of the rationale for requiring different syntax to request async iteration).

2 Likes