Back-propagation of contextvar changes from worker threads

The current status quo in async libraries regarding the propagation of PEP 567 context variables is that the context of the current task is copied to to worker thread (asyncio.to_thread(), trio.to_thread.run_sync(), anyio.to_thread.run_sync()) but any changes made to those variables are not propagated back. This has become a problem in application frameworks, and at least asgiref has this workaround in place in its AsyncToSync utility class. Issues (#953 and #4696) have been raised in FastAPI where synchronous dependencies in its DI system cannot manipulate the context variables of the current request because they’re being run in a thread pool.

The question here is: should I/O frameworks like asyncio do this for them? Is there a good reason why it wasn’t done in the first place?

2 Likes

Personally I think this is a situation where everyone (every library, framework and user) is being reasonable but when you put it all together things fall apart:

  1. It’s reasonable for a library like structlog to use contextvars
  2. It’s reasonable for a web framework to wrap user code in tasks (various example use cases linked above)

But then when you try to do both at the same time you end up leaking implementation details to users because of the interaction of tasks and contextvars and things simply don’t work as intended.

1 Like

The links provide so much detail that I cannot figure out what the problem is. I am not doubting there is a problem, I just need help understanding it. Could someone give a scenario that shows the need for back-propagation without going into too much detail of FastAPI? (I am not familiar with it so the linked issues are difficult for me to follow.)

Could someone give a scenario that shows the need for back-propagation without going into too much detail of FastAPI?

Suppose we have a web application with two endpoints, /sync and /async. These endpoints authenticate the user and set the current user in a contextvar (user: ContextVar[str] = ContextVar('user')). Then the web app has logging middleware which appends this information to log entries. The /async endpoint will work correctly, but the /sync endpoint will not because it’s run in a worker thread. Since the context variables set in the worker thread won’t propagate back, the value of the user contextvar will not be visible to the middleware, whereas when the /async endpoint sets it, it will be seen by the middleware because they run within the same Task.

1 Like

Here’s an attempt to summarize in <100LOC with no dependencies/web stuff:

import asyncio
from contextvars import ContextVar
from typing import Callable, Coroutine, Dict

user: ContextVar[str] = ContextVar("user")

Endpoint = Callable[..., Coroutine[None, None, None]]

class App:
    def __init__(self) -> None:
        self.endpoints: Dict[str, Endpoint] = {}

    async def __call__(self, path: str) -> None:
        await self.endpoints[path]()


async def async_endpoint() -> None:
    user.set("adrian")


def sync_endpoint() -> None:
    user.set("adrian")


def sync_to_async(endpoint: Callable[..., None]) -> Endpoint:
    """Simplification of asgiref's SyncToAsync or Starlette's wrapping of sync endpoints"""
    async def wrapped_endpoint() -> None:
        await asyncio.to_thread(endpoint)
    return wrapped_endpoint


def some_middleware_using_tasks(endpoint: Endpoint) -> Endpoint:
    """Simplification of Starlette's BaseHTTPMiddleware"""
    async def wrapped_endpoint() -> None:
        return await asyncio.create_task(endpoint())
    return wrapped_endpoint


def logging_middleware(endpoint: Endpoint) -> Endpoint:
    """Simplification of a middlware that logs stuff using contextvars"""
    async def wrapped_endpoint() -> None:
        await endpoint()
        print(f"user: {user.get()}")
    return wrapped_endpoint


async def main() -> None:
    app = App()
    app.endpoints["/async-logged"] = logging_middleware(async_endpoint)
    app.endpoints["/sync"] = sync_to_async(sync_endpoint)
    app.endpoints["/sync-logged"] = logging_middleware(sync_to_async(sync_endpoint))
    app.endpoints["/async-in-task-logged"] = logging_middleware(some_middleware_using_tasks(async_endpoint))

    await asyncio.create_task(app("/async-logged"))  # OK
    await asyncio.create_task(app("/sync"))  # OK because we never check the contexvar
    try:
        await asyncio.create_task(app("/sync-logged"))  # fails because contextvar is not propagated
    except LookupError:
        pass
    else:
        raise AssertionError("should have failed")

    try:
        await asyncio.create_task(app("/async-in-task-logged"))  # fails because contextvar is not propagated
    except LookupError:
        pass
    else:
        raise AssertionError("should have failed")

if __name__ == "__main__":
    asyncio.run(main())

As you can see the behavior changes based on what ideally would be implementation details of the web framework, logging library, etc.

I should probably bail out at this point because I have no useful understanding of middleware (I dropped out of the web development class before that topic came up :-).

I take it the middleware does not run in the same worker thread as the endpoint? But it would like to see the user (which is apparently parsed out of the request by the endpoint code) when logging the response.

It would seem that contextvars simply aren’t the right mechanism to pass the user from the endpoint to the middleware, even though it happens to work in the async version.

The semantics of contextvars are fixed by a PEP and I don’t see how we could make a change (either to contextvars or to asyncio) to automatically back-propagate changes across thread boundaries. (The Python runtime has no concept of “worker threads” vs. other types of threads.)

Assuming you agree with that assessment, do you have a specific proposal for an API to add that would address this problem?

PS. I don’t follow why you think this could have been asyncio’s problem (especially since the problem seems to be occurring in the sync case) – perhaps you can elaborate? Certainly I couldn’t help you answer the two questions at the end of your OP without more clarification.

The API I would like is contextvars.current_context() -> contextvars.Context.

Then whatever is using it can manually pass the context around (e.g. passing the object into a worker thread) thus giving it control of how the execution looks from the outside by composing current_context() with asyncio.to_thread() and asyncio.create_task()

This is already implemented in C, it just needs to be exposed: cpython/context.c at 2176898308acafedf87a48d33f29645e79b9af86 · python/cpython · GitHub

I don’t think anybody here is advocating a change to the semantics laid out by PEP 567. But I suppose the contextvars module could gain some additional functionality to support this use case? I just skimmed through the PEP but I didn’t find anything saying why there is no way to get the current context, outside of the C interface. If we could do that, it would be a simple matter of loop.run_in_executor(None, current_context().run, sync_func). No back and forth copying needed. And if for some reason this is unacceptable, some utility function/method to merge one context to another would be helpful.

I wonder if the implementation makes the concept of “current context” hard to use for your use case? IIRC the context is immutable (unlike e.g. threading.locals() or os.environ), represented by an immutable hash map (“HAMT”), and every time some code uses cvar.set() a new root for the tree is produced and stored internally as a thread-local. So even if you called current_context().run(sync_func, args), the context in the current thread would be unaffected by the changes made by sync_func(args).

To my understanding, context.run() can only run in one place at a time, yes? But nothing would prevent its use in a worker thread when the task’s coroutine is not being run, isn’t that right? So theoretically we should be able to do something along the lines of:

context = current_task().get_context()
await get_running_loop().run_in_executor(None, context.run, my_sync_func)

Here’s a little PoC snippet I just made using threads:

from contextvars import Context, ContextVar
from threading import Thread

cvar = ContextVar("cvar")


def run_in_thread():
    print("value in run_in_thread:", cvar.get())
    cvar.set(cvar.get()[::-1])


context = Context()

for arg in ("foo", "bar"):
    context.run(cvar.set, arg)
    thread = Thread(target=context.run, args=[run_in_thread])
    thread.start()
    thread.join()
    print("value back in main thread:", context.run(cvar.get))

You could write a function today:

def current_context():
    return current_task()._context

and experiment with that. You have to think like your own adversary though – try to come up with scenarios where it wouldn’t work.

Except that there is no attribute like that available?
AttributeError: '_asyncio.Task' object has no attribute '_context'

On a hunch, I tried this on PyPy and got a bit further. Still, I get RuntimeError: cannot enter context: <_contextvars.Context object at 0x00007f037b02b088> is already entered. I’m not sure why this happens, but it puts a damper on the plans to use the task context for worker threads.

I guess that the C accelerator code for asyncio doesn’t implement the non-public attributes. There’s a way to disable the accelerator, I think you do sys.modules['_asyncio'] = None before the first asyncio import. PyPy doesn’t have the accelerator module, so that’s why that worked for you.

I do think that what you want is fundamentally not possible and you should look into a different way to communicate information back to middleware, that doesn’t involve context vars.

I agree with you that designing a system like what we are describing is not a good idea. Within the context of web frameworks, there are indeed better ways to do this.

Unfortunately, this is not a single system that Alex or I get to design. The issue is interaction between a web framework using tasks/threads and something like a logging library, a database ORM or other libraries where it is quite reasonable to use contextvars. These need to work in web frameworks, we can’t just say they’re incompatible.

As it stands right now I feel that neither party (libraries using contextvars or web frameworks using tasks/threads) are doing anything unreasonable, it’s the interaction between them that creates problems, and there is no way to fix either of them can fix it.

Clearly this is not a simple problem. You know your requirements, hence I feel it is up to you to propose a specific solution. I and @yselivanov can help a little by explaining what is not possible, but in the end you have to investigate the implementation yourself before you can propose something that would be acceptable. And you would have to show how you convince existing frameworks to adopt your solution (or how applications can adopt it without the frameworks’ cooperation).

1 Like

I’ve designed contextvars to have just one direction of value propagation. Back propagation would complicate the design, implementation, and makes reasoning about the API harder in many cases.

That said, there’s a quick hack that can enable back propagation for you. You can put context variable value in a list, e.g. instead of storing 42 you’d store [42] in it. Then instead of setting a new value with cvar.set(new_value) you’d do container = cvar.get(); container[0] = new_value.

I’m using lists as an example, any mutable container would do.

Hi Yury, thank you for taking a look.

Does it make any difference if we think of this as “choosing which context you execute in” or “choosing if the context gets copied” instead of “back propagation”?

For the back propagation case, I wrote a helper function in Starlette that seems to work, although you can probably poke a bunch of holes in it.

You can put context variable value in a list, e.g. instead of storing 42 you’d store [42] in it. Then instead of setting a new value with cvar.set(new_value) you’d do container = cvar.get(); container[0] = new_value .

Unfortunately it’s not as simple as using a mutable container here: web framework developers don’t have control over how other libraries are using contextvars.

Why can’t frameworks define their own wrapper over contextlib.ContextVar that would use the mutable container trick? It could even be a PyPI module declaring some state.Variable API that would be implemented with the contextlib machinery and support the sync/async middleware use case? Users can be told to use this API instead of raw contextlib.ContextVar – that’s a documentation issue. Any framework (asyncio, trio) that supports contextlib would do the right thing and such API should just work with them.

What I’m trying to say here is that the fundamental machinery of context vars is pretty powerful and you can build primitives on top of it.

Does it make any difference if we think of this as “choosing which context you execute in” or “choosing if the context gets copied” instead of “back propagation”?

I think I understand the original problem: context changes made in a thread-pool won’t be visible to the caller. (FWIW same applies to the asyncio.create_task() boundary.) But I don’t understand what the quoted paragraph proposes. :confused:

Because said libraries might not even be aware of the framework around them? Or am I not understanding something?