I wish to understand how can I design a “simple” toy example where from the main thread, assuming it is a long running server such as Starlette/FastAPI, before waiting it, I can spawn a separate threadpool with its own async loop event handler.
The idea of the provided code is to import this before running the main thread, in order to define the threadpool with its configurations. The idea is that each thread spawns its async loop event handler and executes the intended async coroutine. Ideally I don’t know if I need an async loop for each thread, e.g. the correspondence between threads and async loop handler is 1:1, or if I can use just another loop handler with a threadpool, e.g. 1 to N.
The goal is to understand how can I simply design and implement an alternative to integrate background long-running tasks into my application by using only a single process. Using other technologies like Celery is not an option here.
Some context about the server I am using and their features:
- FastAPI uses Starlette and wraps the Starlette background-tasks feature to implement its magic but the internals are from Starlette
- Startlette
await
theCallable
if it is an async coroutine, otherwise (sync function) runs it in another threadpool. - I wish to implement this alternative or hook into this logic by always executing the Callable objects into a separeted threadpool with async support.
FastAPI background-tasks feature
Starlette background tasks
Starlette background task internals
Starlette internals: how tasks (sync functions) are run in a separate threadpool
import asyncio
import concurrent.futures
from typing import Callable, Any
from src.config.loader import CONFIG
from src.utils.logger import get_logger
logger = get_logger(CONFIG.logging.name, CONFIG.logging.level)
# Define the global thread pool with a specified size
THREAD_POOL_SIZE = 10
THREAD_POOL_TASK_TIMEOUT = 60 * 10 # 10 minutes
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
# Global list to store Future objects
future_list = []
def run_async_in_thread(coroutine: Callable[..., Any], timeout: int = THREAD_POOL_TASK_TIMEOUT, *args, **kwargs) -> Any:
"""
Run an async coroutine in a background thread and return the result.
The idea is to run the coroutine in a separate thread to avoid blocking the main thread of FastAPI
when the background (async) corutine is blocking in some parts of its execution.
Without this, since it is an async coroutine, FastAPI runs it the its main thread.
The idea is to exploit the BackgroundTasks feature of FastAPI
to run this function in the background
and use this function to handle the run of the original async coroutine in a separate thread
as if it was a sync function.
:param coroutine: The async coroutine to run.
:param args: Positional arguments to pass to the coroutine.
:param kwargs: Keyword arguments to pass to the coroutine.
:return: The result of the coroutine.
"""
def run_coroutine():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(coroutine(*args, **kwargs))
loop.close()
return result
# Starts the execution of the coroutine in a separate thread
future = thread_pool.submit(run_coroutine)
# Add the Future object to the global list
future_list.append(future)
# If the coroutine does not complete within the timeout, cancel the future
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
name = coroutine.__name__
logger.error(f"TimeoutError: Coroutine {name} with args {args} and kwargs {kwargs} did not complete within {THREAD_POOL_TASK_TIMEOUT} seconds. Canceling it.")
future.cancel()
raise
except Exception as e:
logger.error(f"Exception in future: {e}")
raise