Why might asyncio.get_running_loop fail to find an extant event loop?

(Apologies in advance if Python Help is the wrong forum, I wasn’t sure if I should post here or in Async-SIG.)

In an API I’m working on, we are adding in statistics on registered webhooks so consumers can check the health of their webhook subscriptions. Computing these statistics is far from fast, but since most of the time is just spent waiting on timeseries data to be gathered and sent over the network, it’s a good candidate for threading or asyncio, and we decided to try asyncio first.

Our application does not have any additional async code, so drawing from this Stack Overflow answer, our bridge into the async world was as follows:

import asyncio
import logging

def run_from_sync(coro):
    try:
        # Check if event loop already exists
        loop = asyncio.get_running_loop()
        logging.info("Event loop found")
        return loop.run_until_complete(coro)
    except RuntimeError:
        # No event loop running
        logging.info("Creating new loop for async coroutine")
        return asyncio.run(coro)

And it was called like so:

import asyncio
from asyncio_wrapper import run_from_sync
# ...

class WebhooksService:
    def __init__(self, ..., webhook_stats_svc):
        # ...
        self.webhook_stats = webhook_stats_svc

    def get_webhooks(self, request):
        # ...
        # return self.__add_health_stats_to_webhooks_sync(webhooks)
        return run_from_sync(
            self.__add_health_stats_to_webhooks_async(webhooks)
        )

    def get_webhook(self, request):
        # ...
        # return self.__add_health_stats_to_webhook_sync(webhook)
        return run_from_sync(
            self.__add_health_stats_to_webhook_async(webhook)
        )

    async def __add_health_stats_to_webhooks_async(self, webhooks):
        tasks = [
            self.__add_health_stats_to_webhook_async(webhook)
            for webhook in webhooks
        ]
        return await asyncio.gather(*tasks)

    async def __add_health_stats_to_webhook_async(self, webhook):
        (num_acked, num_timed_out, oldest_unacked, ...) = (
            await asyncio.gather(
                self.webhook_stats.get_acked_in_past_week_async(
                    webhook.subscription_id
                ),
                self.webhook_stats.get_timeouts_in_past_week_async(
                    webhook.subscription_id
                ),
                self.webhook_stats.get_oldest_unacked_message_age_async(
                    webhook.subscription_id
                ),
                # 2 more async method calls...
            )
        )
        webhook.past_week_acks = num_acked
        webhook.past_week_timeouts = num_timed_out
        webhook.oldest_unacked_message = oldest_unacked
        # ...
        return webhook

This worked fine in my local testing. However, I ran into problems after deploying this as we are using Google PubSub to power our webhooks, and the Google Cloud Monitoring async library client requires an event loop already be running when we call the MetricServiceAsyncClient constructor:

# ...
  File "/workspace/main.py", line 135, in get_webhook_stats_svc
    MetricServiceClient(), MetricServiceAsyncClient()
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/monitoring_v3/services/metric_service/async_client.py", line 281, in __init__
    self._client = MetricServiceClient(
                   ^^^^^^^^^^^^^^^^^^^^
# ...
  File "src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi", line 183, in grpc._cython.cygrpc._loop_policy_try_to_get_default_loop
  File "/layers/google.python.runtime/python/lib/python3.11/asyncio/events.py", line 681, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'."

Since we call the constructor at the start of the request, well before asyncio_wrapper.run_from_sync, I modified our main file like so:

import asyncio
from google.cloud.monitoring_v3 import MetricServiceClient, MetricServiceAsyncClient
# ...

def process_request(request):
    # ...
    webhook_stats_service = get_webhook_stats_svc()
    webhooks_service = WebhooksService(..., webhook_stats_service)
    request_handler = RequestHandler(..., webhooks_service)
    return request_handler.process_request(request)

# ...

def get_webhook_stats_svc():
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        asyncio.set_event_loop(asyncio.new_event_loop())
    # modification: added this try-except block^
    return WebhookStatsService(
        MetricServiceClient(), MetricServiceAsyncClient()
    )

And I feel like this should have worked, but after deploying it, we somehow ended up with multiple event loops:

Error when processing request from app "<app ID>":
Traceback (most recent call last):
# ...
  File "/workspace/services/webhooks_service.py", line 104, in get_webhooks
    return run_from_sync(
           ^^^^^^^^^^^^^^
  File "/workspace/asyncio_wrapper.py", line 26, in run_from_sync
    return asyncio.run(coro)
           ^^^^^^^^^^^^^^^^^
  File "/layers/google.python.runtime/python/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
# ...
  File "/workspace/services/webhooks_service.py", line 763, in __add_health_stats_to_webhooks_async
    return await asyncio.gather(*tasks)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/services/webhooks_service.py", line 796, in __add_health_stats_to_webhook_async
    await asyncio.gather(
  File "/workspace/services/webhook_stats_service.py", line 55, in get_acked_in_past_week_async
    return await self.get_time_series_total_async(metric, aggregator)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/services/webhook_stats_service.py", line 272, in get_time_series_total_async
    await self.metric_client_async.list_time_series(
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/monitoring_v3/services/metric_service/async_client.py", line 1225, in list_time_series
    response = await rpc(
               ^^^^^^^^^^
# ...
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 86, in __await__
    response = yield from self._call.__await__()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/grpc/aio/_interceptor.py", line 473, in __await__
    call = yield from self._interceptors_task.__await__()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Task <Task pending name='Task-5' coro=<WebhookStatsService.get_time_series_total_async() running at /workspace/services/webhook_stats_service.py:55> cb=[gather.<locals>._done_callback() at /layers/google.python.runtime/python/lib/python3.11/asyncio/tasks.py:764]> got Future <Task pending name='Task-20' coro=<InterceptedUnaryUnaryCall._invoke() running at /layers/google.python.pip/pip/lib/python3.11/site-packages/grpc/aio/_interceptor.py:656> cb=[InterceptedCall._fire_or_add_pending_done_callbacks()]> attached to a different loop

My first thought was: what if asyncio.loop.run_until_complete returns RuntimeError when the loop is busy? If that was the case, asyncio_wrapper.run_from_sync would spawn a new event loop, which could cause this. I couldn’t find anything in the documentation to support this, but this Stack Overflow comment seems to suggest a loop already running other async code could do this.

But after checking the logs, I didn’t see any "Event loop found" preceeding the "Creating new loop for async coroutine" entry, so it seems no event loop was returned by asyncio.get_running_loop.

We run our service in Google Cloud Functions, which uses gunicorn under the hood (and we cannot control or configure it), so I began to wonder if there was some weirdness with how it handles threads such that the main.py-created event loop somehow ended up in a different thread from asyncio_wrapper.py, but after looking at our logs again, I realized it was able to fetch all the statistics on the first two webhooks I had setup; it was only when it started gathering stats on my third webhook that it lost sight of the event loop. It’s also not a one-off caused by a random GCP instance-specific heisenbug; I can re-deploy the code and it’ll produce the same error every time, failing after the first 2 webhooks.

At a loss, I started looking at the other functions available and decided to try asyncio.get_event_loop, and…it worked, as long as I call it in asyncio_wrapper.run_from_sync. That is, I can use either asyncio.get_event_loop or asyncio.get_running_loop in main.get_webhook_stats_svc; as long as asyncio_wrapper.run_from_sync uses asyncio.get_event_loop, that’s all that matters.

Which leaves me with a question: why does asyncio.get_event_loop work here when asyncio.get_running_loop does not? The asyncio.get_event_loop documentation doesn’t go into a ton of detail about how the two methods differ except to say asyncio.get_event_loop is much more complicated, and I do not know C well enough to parse the cpython implementation to figure this out on my own, much less determine how the difference matters to the code I have written.

Any insight would be appreciated.

Edit: I forgot to mention we are still on the Google Cloud Functions Python 3.11 runtime, though that’s more due to us not having had the time to upgrade and read through the changelogs for our dependencies. So for now, we could technically use asyncio.get_event_loop as a fallback (i.e. if asyncio.get_running_loop raises RuntimeError in asyncio_wrapper.run_from_sync call asyncio.get_event_loop in the except block), but given the 3.14 change to its behavior when no loop is present, I decided to treat the two methods the same in preparation for our eventual upgrade.

It looks like your framework is running tasks in worker threads. Every thread has its own event loop (which is why you are getting multiple loops), and asyncio tasks can only be used in the threads that spawned them.

I don’t think your setup is intended to be compatible with asyncio unfortunately. You should try threading instead, for example a thread pool.

Well, that’s unfortunate. Confusingly, the workaround I used works (it provides a large speedup in prod over the synchronous code I had previously written), but I have to wonder if that’s just due to a cpython implementation detail which may change.

Actually, I think I see your problem. The asyncio.run() function closes the event loop when it is done, which is bad if there are other tasks relying on it. You should just use get_event_loop(), which will leave the current thread’s event loop open for the next task to use.