How to discard pending waiters of asyncio.Lock?

General idea

I’m using a Lock and I want it to always have no more than waiter. If there’s some task waiting for the lock to be released and another task starts to wait for it too, I want the first waiter to exit waiting (this can only be done by raising an exception).

My use case

I have an asynchronous function that saves data to a file. It’s obvious that there must be at most one saving operation happening at the time, so I’m using a lock. Also there’s no point in maintaining a queue of saving operations, because as soon as there’s a need to save the new data to the file, the old data becomes outdated and we don’t need to save it (the function calls with outdated data is the ones that I want to discard). Lastly, I don’t think that my saving operation should be discarded if it already started executing. Therefore, only the tasks that are waiting for the async with lock need to be discarded, and not the ones which already stepped inside of it.

To illustrate what I want:

async def waiter(data):
    async with lock:
        await save_data(data)

await waiter(data)
# A few moments later...
await waiter(newer_data)
# If the first waiter has already started saving data, let it run. If not, discard it, because we don't care about the outdated data.

My attempt

The simplest solution I could think of was to hack the Lock a little. I know that under the hood it maintains a list of waiters which are Futures. This means I can access them and use .set_exception() on them:

async def waiter(data):
    for w in lock._waiters or []:
        if not w.done():
            w.set_exception(WaiterDiscarded())
    async with lock:
        await save_data(data)

This seems to do exactly what I want, but this is using a private attribute _waiters which can be dangerous for two reasons:

  1. I’m not sure if my code won’t break any internal behaviour of the Lock.
  2. Even if it doesn’t break anything in the current version of Python, it might do so when the implementation changes.

So, if I’m understanding you correctly, the discard behaviour happens when the third concurrent save is attempted.

  1. Attempt to save. All is quiet. Great! Begin saving.
  2. Attempt to save. Oh, someone else is saving. Hang tight. As soon as saving is done, save your work too.
  3. Attempt to save. Someone else is saving. Tell #2 to stop waiting, and we’ll wait instead.

Am I correct in my reading of your description? If so, here’s a slight change of perspective on the same algorithm:

  1. Attempt to save. All is quiet. Begin saving.
  2. Attempt to save. Another save is in progress. Hand your save data to the current saver.
  3. Attempt to save. Another save is in progress. Hand your save data to the current saver (which will override/overwrite/discard the previously-handed save data).
  4. #1 completes its original save operation. Oh hey, I’ve been handed something to save. Don’t release the lock, save again immediately.

I’m assuming three things here: you can gather the save data in advance of acquiring the lock; the save data will never be None; and the save data is not mutated between the gathering and saving phases. If that’s not true, one effective way of ensuring this is to JSONify the save data in advance, which will give you a string object independent of the core data structure.

Okay. So, here’s one way to code that up:

pending_save = None
async def save(data):
    global pending_save
    if lock.locked():
        pending_save = data
        return
    async with lock:
        while True:
            pending_save = None
            await save_data(data)
            if pending_save is None: break
            data = pending_save

This works because, in an asyncio context, it’s safe to simply ask if a lock is locked. In a threading context, you’d need a second guard mutex around that query, or use a try_lock() primitive, or something, but in asyncio, as long as there’s no await, you’re safe. Also, that was a lot of commas, and I’m not apologizing, sorry. :slight_smile:

1 Like

Yes, you understood everything correctly. Also I think your implementation will suit my needs. Thank you!

1 Like