Ad-hoc __aiter__ with asynchronous generators (or pythonic alternatives)?

We can make a class Iterable by simply defining an __iter__ method that yields from self. I’ve read PEP 525, searched and found threads like this one, but I’m still unsure -

  1. Is there an equivalent implementation for __aiter__, since “yield from” won’t work?
  2. If not:
    a. Are there any planned changes (PEP etc.) that will make the equivalent possible?
    b. How do you currently implement __aiter__ in your classes (or, what do you consider most pythonic)? Separate AsyncIterator class? AsyncGenerator?

Thanks!

Yes, but there are some caveats. The example from the data model doc should help here, but including it directly to say more on below

class Reader:
    async def readline(self):
        ...

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == b'':
            raise StopAsyncIteration
        return val

you’ve asked “if not” for b here, but I think this is worth explaining a bit even with it being “if so”

If you have any sort of statefulness that is hard to attach to a class, sometimes simply returning self and having a way to directly iterate is the wrong method of handling it. This is especially true if you want the iteration to be re-usable. In cases like this, the better approach is a dedicated iterator class or a function which is an async generator.

An example of this might be a db query that is wrapped, lazily fetches, and importantly, allows reusing the result. (reuse of a wrapped query could be desirable for debouncing if the wrapped queries are kept around in some sort of bounded cache with a ttl)

In a case like this, you have more machinery involved and probably want to create a separate async iterator class to return from __aiter__.

I read that as part of my search but took it as an example (not showing the __init__) of how a separate AsynchronousIterator might be implemented to be used by other classes, not as an even implied suggestion of how a class could be made asynchronously iterable?

Still inclined that way. As you alluded to, making an iterable its own iterator can be trouble enough without concurrency, I don’t even want to imagine debugging with it…

The two options I mentioned in the end, a separate AsyncIterator or a [separate] AsyncGenerator. I was really hoping I had missed something like -

def __aiter__(self):
    async yield from self

Well, here are two full program examples including the correct type information that would correspond with each. I’ll try and think about how the docs could be improved to reflect what’s actually needed later, as I can see how the current examples don’t quite do enough here.

example program implementing this with re-usability
import asyncio
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Generic, TypeVar

T = TypeVar("T")

class X(Generic[T]):
    """
    Parameters
    ----------
    fill_func: Callable[[int, int], Awaitable[list[T]]]
        A callable to fill in values. should take two values, the first being
        the first element needed, the second being the last element needed, inclusive, 0-indexed.

        If the start is out of bounds, return an empty list
        If the end is out of bounds but the start is not, return the remaining elements

        eg: async def filler(start: int, stop: int) -> list:
    """

    def __init__(self, fill_func: Callable[[int, int], Awaitable[list[T]]]) -> None:
        self._values: list[T] = []
        self._fill_func: Callable[[int, int], Awaitable[list[T]]] = fill_func
        self._exhausted: bool = False


    async def _get_value(self, index: int) -> T:
        try:
            return self._values[index]
        except IndexError:
            if self._exhausted:
                raise

            vals = await self._fill_func(len(self._values), index)
            if not vals:
                self._exhausted = True
                raise
            self._values.extend(vals)
            return self._values[index]

    def __aiter__(self):
        async def iterator() -> AsyncIterator[T]:
            index = 0
            while index < len(self._values) or not self._exhausted:
                try:
                    yield (await self._get_value(index))
                except IndexError:
                    return
                index += 1

        return iterator()


async def filler(start: int, stop: int) -> list[int]:
    """ This here to simulate some external paginated request API """
    if start > 10:
        return []
    print(f"filling from {start} to {stop} inclusive")
    stop = min(11, stop + 1)
    return list(range(start, stop))


async def main() -> None:

    x = X(filler)
    async for i in x:
        print(i)

    async for i in x:
        print(i)


if __name__ == "__main__":
    asyncio.run(main())

The output of this is:

filling from 0 to 0 inclusive
0
filling from 1 to 1 inclusive
1
filling from 2 to 2 inclusive
2
filling from 3 to 3 inclusive
3
filling from 4 to 4 inclusive
4
filling from 5 to 5 inclusive
5
filling from 6 to 6 inclusive
6
filling from 7 to 7 inclusive
7
filling from 8 to 8 inclusive
8
filling from 9 to 9 inclusive
9
filling from 10 to 10 inclusive
10
0
1
2
3
4
5
6
7
8
9
10
And the equivalent for implementing it on the class itself without reusability
import asyncio
from collections.abc import Awaitable, Callable
from typing import Generic, TypeVar

T = TypeVar("T")

class X(Generic[T]):
    """
    Parameters
    ----------
    fill_func: Callable[[int, int], Awaitable[list[T]]]
        A callable to fill in values. should take two values, the first being
        the first element needed, the second being the last element needed, inclusive, 0-indexed.

        If the start is out of bounds, return an empty list
        If the end is out of bounds but the start is not, return the remaining elements

        eg: async def filler(start: int, stop: int) -> list:
    """

    def __init__(self, fill_func: Callable[[int, int], Awaitable[list[T]]]) -> None:
        self._lock = asyncio.Lock()
        self._fill_func: Callable[[int, int], Awaitable[list[T]]] = fill_func
        self._exhausted: bool = False
        self._current_index: int = 0

    async def __anext__(self) -> T:
        async with self._lock:
            try:
                if self._exhausted:
                    raise StopAsyncIteration from None

                vals = await self._fill_func(self._current_index, self._current_index)
                if not vals:
                    self._exhausted = True
                    raise StopAsyncIteration from None
                return vals[0]
            finally:
                self._current_index += 1

    def __aiter__(self):
        return self


async def filler(start: int, stop: int) -> list[int]:
    if start > 10:
        return []
    print(f"filling from {start} to {stop} inclusive")
    stop = min(11, stop + 1)
    return list(range(start, stop))


async def main() -> None:

    x = X(filler)
    async for i in x:
        print(i)

    async for i in x:
        print(i)  # Note: this will never print


if __name__ == "__main__":
    asyncio.run(main())

Each pattern has merits, which one is correct for your own use will depend on your desired API. Depending on the actual use, you might want to fetch more than just the next value needed (such as the next n based on a paginated API). This also includes any necessary synchronization.