How to create a threadpool with its own async loop separated from the main thread

I wish to understand how can I design a “simple” toy example where from the main thread, assuming it is a long running server such as Starlette/FastAPI, before waiting it, I can spawn a separate threadpool with its own async loop event handler.

The idea of the provided code is to import this before running the main thread, in order to define the threadpool with its configurations. The idea is that each thread spawns its async loop event handler and executes the intended async coroutine. Ideally I don’t know if I need an async loop for each thread, e.g. the correspondence between threads and async loop handler is 1:1, or if I can use just another loop handler with a threadpool, e.g. 1 to N.

The goal is to understand how can I simply design and implement an alternative to integrate background long-running tasks into my application by using only a single process. Using other technologies like Celery is not an option here.

Some context about the server I am using and their features:

  • FastAPI uses Starlette and wraps the Starlette background-tasks feature to implement its magic but the internals are from Starlette
  • Startlette await the Callable if it is an async coroutine, otherwise (sync function) runs it in another threadpool.
  • I wish to implement this alternative or hook into this logic by always executing the Callable objects into a separeted threadpool with async support.

FastAPI background-tasks feature
Starlette background tasks
Starlette background task internals
Starlette internals: how tasks (sync functions) are run in a separate threadpool

import asyncio
import concurrent.futures
from typing import Callable, Any

from src.config.loader import CONFIG
from src.utils.logger import get_logger

logger = get_logger(CONFIG.logging.name, CONFIG.logging.level)

# Define the global thread pool with a specified size
THREAD_POOL_SIZE = 10
THREAD_POOL_TASK_TIMEOUT = 60 * 10 # 10 minutes
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)

# Global list to store Future objects
future_list = []

def run_async_in_thread(coroutine: Callable[..., Any], timeout: int = THREAD_POOL_TASK_TIMEOUT, *args, **kwargs) -> Any:
    """
    Run an async coroutine in a background thread and return the result.

    The idea is to run the coroutine in a separate thread to avoid blocking the main thread of FastAPI
    when the background (async) corutine is blocking in some parts of its execution.
    Without this, since it is an async coroutine, FastAPI runs it the its main thread.
    The idea is to exploit the BackgroundTasks feature of FastAPI
    to run this function in the background
    and use this function to handle the run of the original async coroutine in a separate thread
    as if it was a sync function.

    :param coroutine: The async coroutine to run.
    :param args: Positional arguments to pass to the coroutine.
    :param kwargs: Keyword arguments to pass to the coroutine.
    :return: The result of the coroutine.
    """
    def run_coroutine():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        result = loop.run_until_complete(coroutine(*args, **kwargs))
        loop.close()
        return result

    # Starts the execution of the coroutine in a separate thread
    future = thread_pool.submit(run_coroutine)

    # Add the Future object to the global list
    future_list.append(future)

    # If the coroutine does not complete within the timeout, cancel the future
    try:
        return future.result(timeout=timeout)
    except concurrent.futures.TimeoutError:
        name = coroutine.__name__
        logger.error(f"TimeoutError: Coroutine {name} with args {args} and kwargs {kwargs} did not complete within {THREAD_POOL_TASK_TIMEOUT} seconds. Canceling it.")
        future.cancel()
        raise
    except Exception as e:
        logger.error(f"Exception in future: {e}")
        raise

Have you seen anyio’s BlockingPortal? If you’re using FastAPI/Starlette you’ll have it installed already. It lets you start tasks and collect results using concurrent.futures.Future objects eg:

from anyio import sleep, TASK_STATUS_IGNORED
from anyio.from_thread import start_blocking_portal


async def service_task(*, task_status=TASK_STATUS_IGNORED):
    task_status.started('STARTED')
    await sleep(1)
    return 'DONE'


with start_blocking_portal() as portal:
    future, start_value = portal.start_task(service_task)
    print('Task has started with value', start_value)

    return_value = future.result()
    print('Task has finished with return value', return_value)

https://anyio.readthedocs.io/en/stable/threads.html

I’m not sure why you’d want to run multiple event loops in one process, you won’t currently see any performance benefits. (maybe you will in 3.14t)

You should probably just run a TaskGroup in your lifespan_context and submit background tasks to that wrapped with @asyncer.asyncify

@graingert Thanks for you reply. To clarify the why I’m trying to test/design this, it is because I am developing an app deployed on a single process/pod. Assuming this, I’m using FastAPI/Starlette as my server, and I wish to be able to execute some “loads” which is both:

  1. Async, in the sense it is defined with the keyword async, i.e. a coroutine
  2. It is long-running, in the sense it takes a “long time” to complete, where with “long” I mean I don’t want it to lower the performance of the UX when submitting new requests to the event loop but the user has to wait (if it is possible: I am not that knowledable about async programming).
  3. This loads is made of complex logics where in some parts there is network/file/computational load which actually blocks. Other parts, are “correctly” designed with async programming. If you want an example, think about this sequence of work once a request is returned to be run in the background: i) read files ii) upload files to remote storages, iii) process files, i.e. parsing/chunking/embeddings, iv) save intermediate outputes to remote storage, v) requests AI models results (as a service). The UX I wish to implement is: i) the user upload the file and the requests is returned, then ii) the load is perfomed in the background while the user is still able to use the interface.

As long as I understood, FastAPI background tasks feature actually uses its event loop to handle async coroutine loads. The feature was designed to off-load basic background tasks (the documentation talks about processing files or sending email, for instance.
Another reference states the designed “windows time” for such work should take maximum 30s).

Instead, background tasks feature uses anyio to_thread.submit_task() to compute sync load into another thread if the work is a (sync) function. Here I was trying to investigate if running othre threads (with their event loop) would actuallu decouple the UX from the background work.

I’m not sure where you found a recommendation for a 30 second limit, but you can just submit async functions of any duration as background tasks, as long as they don’t perform blocking IO