Add non-`async` `try_acquire` method to `asyncio.Lock` to attempt locking immedietly from normal functions

The asyncio.Lock class only provides an async def acquire() method, but because it is async its very hard to acquire the lock from a normal function, because acquire only locks when it is awaited, which is only possible in a coroutine.

StreamWriter.write isn’t async, so it can be used to send data immedietly if called from a normal function, but in a non-coroutine it is very hard to protect access to the writer with a asyncio.Lock, which I would consider a common requirement.

I think something like a try_acquire() function, that is not async, would be a good addition.

I imagine the try_acquire function to be used like this:

writer: asyncio.StreamWriter = ...
command_in_flight_lock = asyncio.Lock()

async def wait_for_response():
    await writer.drain()
    ...
    command_in_flight_lock.release()

def send_command(cmd):
    if command_in_flight_lock.try_acquire():
        # We can send immedietly without waiting and locked the lock immedietly
        writer.write(cmd) 
        return asyncio.create_task(wait_for_response())
    # We couldn't acqurie the lock immedietly, so there is already a command in flight,
    # so we have to wait for the lock until we can send another.
    async def wait_and_send():
        await command_in_flight_lock.acquire()
        writer.write(cmd)
        return wait_for_response()
    return asyncio.create_task(wait_and_send())

This way the command can be send immedietly, when the lock can be acquired, and awaiting the response can be delayed, which would not be possible if send_command was a coroutine:

r = send_command("command")
work() # doing something else
await r

I think this function could easily be implemented by moving these lines of asyncio.Lock.acquire into another function.

I’m not a fan of the code duplication, but I also can’t think of another way around the “coroutines actually don’t start until they are awaited”, so not using coroutines seems like the best option.

Another downside of this solution is, that try_acquire didn’t reserve a spot in waiters list and if we run into the second case the waiter would only be registered when the await is reached, which might result in different ordering? Not sure if that is important.
That could maybe be solved with a different function that registers a waiter and returns an Optional<Future> that resolves when the Lock is acquired by the current caller?

Similar methods would probably also made sense for Condition and Semaphore

What do you think?

The shown functionality looks like a small write-job queue. If you create a queue and a corresponding worker task, you could send write-jobs using Queue.put_nowait() which is non-async.

The only easy way for interacion with asyncio from non-async code is to schedule a task and forget about it. But if you need a response, things get complicated. The main problem is what to do instead of waiting if something is not available. Asyncio can swith to other task. Non-asyncio can either poll in some intervals or block - and the latter only if running in its own thread.

Queue.put_nowait() would only write to the stream after control flow is yielded to the event loop and the task is read from the Queue, if I understand your idea correctly.
But I want to write to the stream immedietly, like it is intended by StreamWriter.write, which already does have an internal buffer and queues writers accordingly.
And that is achieved with the try_acquire example.

Sadly it looks like the StreamWriter queue also relies on event loop yielding to write the buffer to the socket instead of flushing it out of band? But that only happens if os.write would block, which shouldn’t happen all the time.

When would this hypothetical try_acquire sync method release the lock?

To me it looks like you’re only interested in checking if the lock is already taken or not, which you can already do with lock.locked().

The .write only appends data to a buffer.

If there is data in the buffer, the event loop checks the destination file descriptor/socket and if it is writeable, it writes one chunk from the buffer and carries on. The next check+write will happen at the next event loop iteration, i.e. every time a task switch is occurring (simplified).

The async .drain passively waits until the buffer gets empty.

The try_acquire would not release the lock. When and how releasing the lock would happen depends on the case, but in the example it happens in the wait_for_response method, after the response is received and another command can be send.

I don’t think just checking lock.locked() is enough, because the lock needs to stay locked until the response is received. Only checking lock.locked() would only work, if we could also release the lock immedietly again, which isn’t always the case. In the example it has to stay locked to prevent another command being sent until the previous command is processed.

It is also not enough to check lock.locked() and then acquiring the lock later with await lock.acquire() in the coroutine-task (asyncio.create_task(wait_for_response())), because there are no strong guarantees, that our task will execute first, so another task, that was waiting on something else, could acquire the lock before us, which would result in an error and two commands could be send.

If you are talking about StreamWriter.write that seems to be incorrect. At least according to the documentation :

The method attempts to write the data to the underlying socket immediately. If that fails, the data is queued in an internal write buffer until it can be sent.

So only when the data cannot be written immedietly does the event loop trigger.
I assume the _SelectorSocketTransport is the default Transport and its write method is also consistent with the documentation:

I checked the source (of _SelectorSocketTansport.write) and indeed there is an optimized path trying to write immediately. Other transports may do the same. My apologies for incorrect information from my memory.