Should using cache between Python coroutines be blocking?

I needed to use cache in a web application built in Python. Since I couldn’t directly use lru_cache in a coroutine, I built a simple decorator that would allow me to use it:

from asyncio import Lock, sleep


async def acquire_lock_async(lock: Lock) -> None:
    await lock.acquire()
    return
    

class AsyncCacheable:
    def __init__(self, coro_func: Awaitable) -> Any:
        self.coro_func = coro_func
        self.done = False
        self.result = None
        self.lock = Lock()

    def __await__(self):
        """
        A class that wraps a coroutine function and caches its result.
        Safe to be used in an asynchronous context.
        """
        while True:
            if self.done:
                return self.result
            if not self.lock.locked():
                try:
                    yield from acquire_lock_async(self.lock).__await__()
                    self.result = yield from self.coro_func().__await__()
                    self.done = True
                finally:
                    self.lock.release()
                return self.result
            else:
                yield from sleep(0.05)


def async_cacheable(coro_func: Awaitable) -> Awaitable:
    def wrapper(*args, **kwargs):
        return AsyncCacheable(lambda: coro_func(*args, **kwargs))
    return wrapper

@lru_cache(maxsize=8)
    @async_cacheable
    async def get_company_id(self, simulation_id: int):
        simulation_in_db = await self.get_by_id(_id=simulation_id)
        if not simulation_in_db:
            raise ValueError("Simulation not found")
        company_id = simulation_in_db["company_id"]
        return company_id

I tested it and it works fine. But now I have doubts about whether I have been on the right path. Does what I’ve done with the Lock make sense to make it safe between coroutines? It should be like that?

Thanks!

Did you consider using a regular cache and storing Future instances in it? That way, you don’t block accessing one key while the value for another is being computed.

1 Like

What do you mean by using regular cache? I can’t use lru_cache directly. In any case, I think that in my code access to a key is not blocked while another is calculated (unless it is for the same key).
How do you think I can improve it?

Why cannot you use lru_cache directly? According to the docs it’s threadsafe:

The cache is threadsafe so that the wrapped function can be used in multiple threads. This means that the underlying data structure will remain coherent during concurrent updates.

This would imply it’s also safe to use in coroutines.

1 Like

I can’t use it directly because when the program tries to access the cache I get:

RuntimeError: cannot reuse already awaited coroutine

I guess I know how you got the error: You used lru_cache as decorator on an async function,
then when the function is called again with exactly the same arguments, it will be triggered…
I was actually not thinking of this kind of pattern, but of using lru_cache inside coroutines.

1 Like

This is why I said to store a Future. You can await that multiple times. I think the simplest way to get one is with loop.create_task. You’ll need to not declare your cached function itself async.

1 Like

Well, I think what you are referring to is something like this, right?:

async def get_company_id(self, simulation_id: int):
        simulation_task = self.task_get_company_id(simulation_id)
        simulation_in_db = await simulation_task
        if not simulation_in_db:
            raise ValueError("Simulation not found")
        company_id = simulation_in_db["company_id"]
        return company_id
    
    @lru_cache(maxsize=8)
    def task_get_company_id(self, simulation_id: int):
        return create_task(self.get_by_id(_id=simulation_id))

In this way, in the first execution the task will be waited and completed, and the following times the result of the task will be available in such a way that the database is not called in each execution, right?

This implementation is much simpler than the one I had, I only have one question: Since lru_task is already threadsafe, I shouldn’t worry about two choirs calling the database, in case they are almost at the same time, am I correct?

@alicederyn @hansgeunsmeyer

I think the problem will be if the task fails. By default lru_cache does not save exception results, but in this case it will save the completed task with the value None. How could I address it?

Something as in your previous post was indeed what I was thinking of.

Yes, I think you might need sth special for that. How to do this depends I think on the kind of tasks you have and what you want to do/can do in case of failure: is there a real exception, or is it just a case of some object not being found in a db (and does that need to be turned into an exception), can you have an automatic retry, or does this not make sense. So, I would just set up my own conventions for this.

1 Like

asyncio is single threaded and not thread safe. This algorithm would be suitable for the “normal” way of using asyncio where you spin up exactly one event loop.

With the exception stored, actually. If you don’t want that, sadly lru_cache does not have a method for removing a single entry. I’d suggest using a third-party library - perhaps GitHub - aio-libs/async-lru: Simple LRU cache for asyncio meets your needs

2 Likes