Running generators in executors asynchronously

TL;DR

Allow generators to be run in async executors, returning an asynchronous generator.

Background

Currently if I have slow function which returns a single result, and it does not use async, and I want to call it from async code, I can use loop.run_in_executor to run this in another thread and then await the result.

If however I have generator function that slowly yields results in a blocking manner, and I want to use this from async code, there is no simple solution.

There are many large and useful libraries that do not support async functionality. Some may never do so; a pathological example of this is AWS’s botocore library, which has had an open issue for async support for nine years and counting. Many of these APIs support “streaming”, but the stream comes from a blocking generator which will block async code.

Proposed idea

It seems to me that it would be valuable to enhance loop.run_in_executor (or offer another function) so that if the result of the wrapped function call is a generator then it returns an Asynchronous Generator (see PEP 525) rather than a Future. This would provide an easy way to bridge between blocking code that slowly trickles out results (such as calls to LLMs) and non-blocking code that wants to consume the results.

Example code

If I have to following slow synchronous generator:

def slow_horses(n):
    for i in range(n):
        time.sleep(1)
        yield f"Horse {i}"

then from async code I would be able to use:

...
async for horse in loop.run_in_executor(None, slow_horses, 10):
    print(horse)
...

This async code would allow task switching while waiting for the results from the blocking generator.

You can try to implement this using an asyncio.Queue with unlimited size. Use loop.run_in_executor() to run a function that iterates the generator and puts results in the queue by using put_nowait() (they should always success). Put a special stop value after the end of iteration or in case of error. In the asynchronous code just call get() until you get a stop value.

The problem is that asyncio.Queue is not thread-safe. Looking in the code, it may work with simle Queue and LifoQueue, but not with PriorityQueue. Do not use task_done() and join(), they will not work. shutdown() perhaps is not reliable too.

Thanks @storchaka. I had looked at using asyncio.Queue but the fact that the documentation explicitly states that it’s not thread safe, and executors are based on threads, made me worried that it would likely not be portable, reliable, future-proof and/or performant.

Aside from the question of how one might hack this together, it seems like the capability is generically useful enough that this should be an extension or adjunct to loop.run_in_executor() rather than something that users need to implement for themselves.

It’s worth noting that even for the simple version of asyncio.Queue, even though the underlying duque is thread safe, putting something into the queue will result in a set_result() call on a future for the waiter on the queue, and asyncio.Future is also explicitly not thread safe.

you can utilize loop.call_soon_threadsafe from inside the function you pass to asyncio.to_thread (passing in both the loop and the queue) if you need to place it into an asyncio queue in a threadsafe manner

def sync_thread_func(loop, queue):
    for data in infinite_stream():
       loop.call_soon_threadsafe(queue.put_nowait, data)


async def ex():
    queue = asyncio.Queue()
    # ideally, task group use here,
    # but showing without for more version coverage
    background = asyncio.to_thread(sync_thread_func, asyncio.get_running_loop, queue)
    task = asyncio.create_task(background)

from there, you can consume the queue either there or in another coroutine.


With that said, it’s actually been my experience when mixing “infinitely blocking” sync and non-blocking async code that needs to communicate with each other, that it’s often easier to put the asyncio event loop in a background thread than the other way around (especially when considering signal handling and shutdown). I’ve primarily experienced this when mixing simple GUI libraries that block the main thread with networking code that’s designed to asynchronously update the user display based on incoming events, but I would guess it applies to situations like boto’s streaming generators too for similar reasons.

All of the necessary behavior to wrap this in a high level API lives here, the file is 103 lines including comments, type hints which pass in strictly typed code bases, and a license header. safe to vendor, or you can just use it as a starting point for ideas on how to combine some of the slightly lower level async and threading options if it leads to inspiration for something that feels better for your use.

1 Like