Calling coroutines from sync code

Suppose I want to call async code from a regular function and wait for its return. This function could be part of a library or could be far down a stack that is being iteratively converted to asyncio, so the fact that it’s using asyncio internally should be transparent to any callers. Maybe it’s called from async contexts and there already is an event loop, maybe there isn’t. What’s currently the best way to call a coroutine in these contexts?

async def fetch_something(): ...

def sync_func():  # could be called from async or sync contexts
    ret = call_async(fetch_something())

Options I have considered for call_async:

  • – This is wrong, according to the documentation: “This function cannot be called when another asyncio event loop is running in the same thread. […] It should be used as a main entry point for asyncio programs, and should ideally only be called once.”
  • asyncio.get_event_loop().run_until_complete() – This has started printing warnings when called from a synchronous context.
  • asyncio.new_event_loop().run_until_complete()?
  • Write a custom utility that calls get_running_loop() and new_event_loop() if that throws an exception?

Also, how can I handle the various housekeeping that does?


Another solution might be a custom function like this:

def call_async(coro):
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return loop.run_until_complete(coro)

This should take care of all necessary cleanup in case we’re not running in an event loop. Would that make sense?

I tend to do something similar to your call_async(). Though I’m also not sure if its the ‘right way’.