(Apologies in advance if Python Help is the wrong forum, I wasn’t sure if I should post here or in Async-SIG.)
In an API I’m working on, we are adding in statistics on registered webhooks so consumers can check the health of their webhook subscriptions. Computing these statistics is far from fast, but since most of the time is just spent waiting on timeseries data to be gathered and sent over the network, it’s a good candidate for threading or asyncio, and we decided to try asyncio first.
Our application does not have any additional async code, so drawing from this Stack Overflow answer, our bridge into the async world was as follows:
import asyncio
import logging
def run_from_sync(coro):
try:
# Check if event loop already exists
loop = asyncio.get_running_loop()
logging.info("Event loop found")
return loop.run_until_complete(coro)
except RuntimeError:
# No event loop running
logging.info("Creating new loop for async coroutine")
return asyncio.run(coro)
And it was called like so:
import asyncio
from asyncio_wrapper import run_from_sync
# ...
class WebhooksService:
def __init__(self, ..., webhook_stats_svc):
# ...
self.webhook_stats = webhook_stats_svc
def get_webhooks(self, request):
# ...
# return self.__add_health_stats_to_webhooks_sync(webhooks)
return run_from_sync(
self.__add_health_stats_to_webhooks_async(webhooks)
)
def get_webhook(self, request):
# ...
# return self.__add_health_stats_to_webhook_sync(webhook)
return run_from_sync(
self.__add_health_stats_to_webhook_async(webhook)
)
async def __add_health_stats_to_webhooks_async(self, webhooks):
tasks = [
self.__add_health_stats_to_webhook_async(webhook)
for webhook in webhooks
]
return await asyncio.gather(*tasks)
async def __add_health_stats_to_webhook_async(self, webhook):
(num_acked, num_timed_out, oldest_unacked, ...) = (
await asyncio.gather(
self.webhook_stats.get_acked_in_past_week_async(
webhook.subscription_id
),
self.webhook_stats.get_timeouts_in_past_week_async(
webhook.subscription_id
),
self.webhook_stats.get_oldest_unacked_message_age_async(
webhook.subscription_id
),
# 2 more async method calls...
)
)
webhook.past_week_acks = num_acked
webhook.past_week_timeouts = num_timed_out
webhook.oldest_unacked_message = oldest_unacked
# ...
return webhook
This worked fine in my local testing. However, I ran into problems after deploying this as we are using Google PubSub to power our webhooks, and the Google Cloud Monitoring async library client requires an event loop already be running when we call the MetricServiceAsyncClient constructor:
# ...
File "/workspace/main.py", line 135, in get_webhook_stats_svc
MetricServiceClient(), MetricServiceAsyncClient()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/monitoring_v3/services/metric_service/async_client.py", line 281, in __init__
self._client = MetricServiceClient(
^^^^^^^^^^^^^^^^^^^^
# ...
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi", line 183, in grpc._cython.cygrpc._loop_policy_try_to_get_default_loop
File "/layers/google.python.runtime/python/lib/python3.11/asyncio/events.py", line 681, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'."
Since we call the constructor at the start of the request, well before asyncio_wrapper.run_from_sync, I modified our main file like so:
import asyncio
from google.cloud.monitoring_v3 import MetricServiceClient, MetricServiceAsyncClient
# ...
def process_request(request):
# ...
webhook_stats_service = get_webhook_stats_svc()
webhooks_service = WebhooksService(..., webhook_stats_service)
request_handler = RequestHandler(..., webhooks_service)
return request_handler.process_request(request)
# ...
def get_webhook_stats_svc():
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# modification: added this try-except block^
return WebhookStatsService(
MetricServiceClient(), MetricServiceAsyncClient()
)
And I feel like this should have worked, but after deploying it, we somehow ended up with multiple event loops:
Error when processing request from app "<app ID>":
Traceback (most recent call last):
# ...
File "/workspace/services/webhooks_service.py", line 104, in get_webhooks
return run_from_sync(
^^^^^^^^^^^^^^
File "/workspace/asyncio_wrapper.py", line 26, in run_from_sync
return asyncio.run(coro)
^^^^^^^^^^^^^^^^^
File "/layers/google.python.runtime/python/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
# ...
File "/workspace/services/webhooks_service.py", line 763, in __add_health_stats_to_webhooks_async
return await asyncio.gather(*tasks)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/workspace/services/webhooks_service.py", line 796, in __add_health_stats_to_webhook_async
await asyncio.gather(
File "/workspace/services/webhook_stats_service.py", line 55, in get_acked_in_past_week_async
return await self.get_time_series_total_async(metric, aggregator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/workspace/services/webhook_stats_service.py", line 272, in get_time_series_total_async
await self.metric_client_async.list_time_series(
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/monitoring_v3/services/metric_service/async_client.py", line 1225, in list_time_series
response = await rpc(
^^^^^^^^^^
# ...
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 86, in __await__
response = yield from self._call.__await__()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/grpc/aio/_interceptor.py", line 473, in __await__
call = yield from self._interceptors_task.__await__()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Task <Task pending name='Task-5' coro=<WebhookStatsService.get_time_series_total_async() running at /workspace/services/webhook_stats_service.py:55> cb=[gather.<locals>._done_callback() at /layers/google.python.runtime/python/lib/python3.11/asyncio/tasks.py:764]> got Future <Task pending name='Task-20' coro=<InterceptedUnaryUnaryCall._invoke() running at /layers/google.python.pip/pip/lib/python3.11/site-packages/grpc/aio/_interceptor.py:656> cb=[InterceptedCall._fire_or_add_pending_done_callbacks()]> attached to a different loop
My first thought was: what if asyncio.loop.run_until_complete returns RuntimeError when the loop is busy? If that was the case, asyncio_wrapper.run_from_sync would spawn a new event loop, which could cause this. I couldn’t find anything in the documentation to support this, but this Stack Overflow comment seems to suggest a loop already running other async code could do this.
But after checking the logs, I didn’t see any "Event loop found" preceeding the "Creating new loop for async coroutine" entry, so it seems no event loop was returned by asyncio.get_running_loop.
We run our service in Google Cloud Functions, which uses gunicorn under the hood (and we cannot control or configure it), so I began to wonder if there was some weirdness with how it handles threads such that the main.py-created event loop somehow ended up in a different thread from asyncio_wrapper.py, but after looking at our logs again, I realized it was able to fetch all the statistics on the first two webhooks I had setup; it was only when it started gathering stats on my third webhook that it lost sight of the event loop. It’s also not a one-off caused by a random GCP instance-specific heisenbug; I can re-deploy the code and it’ll produce the same error every time, failing after the first 2 webhooks.
At a loss, I started looking at the other functions available and decided to try asyncio.get_event_loop, and…it worked, as long as I call it in asyncio_wrapper.run_from_sync. That is, I can use either asyncio.get_event_loop or asyncio.get_running_loop in main.get_webhook_stats_svc; as long as asyncio_wrapper.run_from_sync uses asyncio.get_event_loop, that’s all that matters.
Which leaves me with a question: why does asyncio.get_event_loop work here when asyncio.get_running_loop does not? The asyncio.get_event_loop documentation doesn’t go into a ton of detail about how the two methods differ except to say asyncio.get_event_loop is much more complicated, and I do not know C well enough to parse the cpython implementation to figure this out on my own, much less determine how the difference matters to the code I have written.
Any insight would be appreciated.
Edit: I forgot to mention we are still on the Google Cloud Functions Python 3.11 runtime, though that’s more due to us not having had the time to upgrade and read through the changelogs for our dependencies. So for now, we could technically use asyncio.get_event_loop as a fallback (i.e. if asyncio.get_running_loop raises RuntimeError in asyncio_wrapper.run_from_sync call asyncio.get_event_loop in the except block), but given the 3.14 change to its behavior when no loop is present, I decided to treat the two methods the same in preparation for our eventual upgrade.