Asyncio Semaphore with support for priorities

Brief

Asyncio semaphores are commonly used in scenarios where resources are constrained or valuable, such as managing database connections, making API calls, or conducting AI model inference.

Prioritization in the same limiting semaphore can lead to better systems.

For example - By allowing batch jobs and realtime jobs with varying response time / priority requirements to share the same microservice instead of needing separate deployments.

Priority queues can be used, they can result in code that is less intuitive and harder to maintain.

Usage

Regular semaphore

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

OR

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

Priority semaphore

sem = asyncio.PrioritySemaphore(10)

# ... later
async with sem.priority(task_priority):
    # work with shared resource

OR

sem = asyncio.PrioritySemaphore(10)

# ... later
await sem.priority_acquire(task_priority)
try:
    # work with shared resource
finally:
    sem.release()

Where task_priority is a floating point number.

Implementation

I think that integrating priority management into asyncio semaphores works well with the way tasks are currently tracked in self._waiters. The main adjustment is relatively minor: instead of appending tasks to a deque, we would utilize bisect.insort_right to maintain a sorted deque ordered by priority. (also the priority needs to be hidden in __iter__, remove, popleft, append, etc…)

Sample implementation

import asyncio
from collections import deque
from bisect import insort_right

class PrioritySortedDeque(deque):

    def __init__(self):
        super().__init__()
        self.append_priority = 0.0

    def __iter__(self):
        return (i[1] for i in super().__iter__())

    def append(self, object):
        insort_right(self, (self.append_priority, object))

    def remove(self, value):
        del self[next(i for i, v in enumerate(self) if v is value)]

    def popleft(self):
        return super().popleft()[1]


class PrioritySemaphore(asyncio.Semaphore):

    def __init__(self, *_):
        super().__init__(*_)
        self._waiters = PrioritySortedDeque()

    def priority(self, priority):
        self._waiters.append_priority = priority
        return self

    async def priority_acquire(self, priority):
        self._waiters.append_priority = priority
        return await super().acquire()

Behavior

if __name__ == "__main__":

    sem, p_sem = None, None

    async def normal_semaphored_print(val):
        async with sem:
            await asyncio.sleep(0.0)
            print(val, end=", ")

    async def prioritized_print(val):
        async with p_sem.priority(val):
            await asyncio.sleep(0.0)
            print(val, end=", ")

    async def prioritized_print_2(val):
        await p_sem.priority_acquire(val)
        await asyncio.sleep(0.0)
        print(val, end=", ")
        p_sem.release()

    async def main():
        global sem, p_sem
        p_sem = PrioritySemaphore(4)
        sem = asyncio.Semaphore(4)
        await asyncio.gather(*map(normal_semaphored_print, range(20, 0, -1)))
        print()
        await asyncio.gather(*map(prioritized_print, range(20, 0, -1)))
        print()
        await asyncio.gather(*map(prioritized_print_2, range(20, 0, -1)))

    asyncio.run(main())

Outputs

20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 
20, 19, 18, 17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
20, 19, 18, 17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 

Performance

Based on just running the code above without the print it looks like < 50% slower and I think that is negligible.

I would think that the priority queue would be more clear for this case, not less. Submitting a job to be executed by a scheduler tends to be relatively intuitive as long as you already have good separation of concerns.

I’m not sure I see a strong benefit here over just using a priority queue + a semaphore in conjunction, but if you’re considering this, the underlying implementation should likely be done using heapq rather than using a deque + bisect

This seems like a great idea for a third-party library.

I find the semaphore much more natural. The whole queue/executor paradigm makes sense in multithreading, where the thread and the jobs are separate things with different costs, but that doesn’t apply to asyncio code: the executor is just a piece of accounting, adding overhead. Having direct control over my tasks whenever possible also lets me name them, and better understand task count metrics.

All subjective of course: I completely understand anyone who feels the other way!

I thought that as well, but when I tried to implement it this is what happened -

consider an async function

async def use_limited_resource(data_in):
    await asyncio.sleep(0.01)
    print(data_in, end=", ")
    return "data_out"

the priority semaphore implementation looks like this - where lower priority_order calls are executed first

p_sem = PrioritySemaphore(4)

async def limit_4_priority_semaphore(priority_order):
    async with p_sem.priority(priority_order):
        return await use_limited_resource(data_in=priority_order)

whereas the priority queue looks like this

p_queue = asyncio.PriorityQueue()

async def p_queue_worker(p_queue):
    while True:
        priority_order, (data_in, future) = await p_queue.get()
        p_queue.task_done()
        data_out = await use_limited_resource(data_in)
        future.set_result(data_out)

async def limit_4_priority_queue(priority_order):
    future = asyncio.get_event_loop().create_future()
    p_queue.put_nowait((priority_order, (priority_order, future)))  # data_in is the priority order for demo
    return await future

and additionally we need to manage the constantly running workers in the background and the number of workers is the limit, which is somewhat less intuitive.

for _ in range(4):
    asyncio.create_task(p_queue_worker(p_queue))  # requires starting of queue workers

Sample code -


async def main():
    for _ in range(4):
        asyncio.create_task(p_queue_worker(p_queue))  # requires starting of queue workers
    out = await asyncio.gather(*map(limit_4_priority_semaphore, range(20, 0, -1)))
    print("\n", out)
    out = await asyncio.gather(*map(limit_4_priority_queue, range(20, 0, -1)))
    print("\n", out)

asyncio.run(main())

outputs

20, 19, 18, 17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
 ['data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out']
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
 ['data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out', 'data_out']

which also indicates that the queue implementation is also “lagging” for some reason? (20, 19, 18, 17) are not instantly printed.

The current implementation of semaphore involves iteration which needs to be changed to iteration of all self._waiters in a sorted order, which I don’t think heapq can do in a good way.

given that

  1. asyncio already has a priority queue
  2. a priority semaphore implementation fits naturally with the existing semaphore implementation
  3. would not need significant maintainer effort
  4. might be a common usecase (it would be common for almost any microservice in any framework built with python asyncio that needs prioritisation of requests)

I think it should be a part of standard library. I think management of coroutines in a specific order should be something that asyncio should provide (and it already partially does through priorityqueue).

1 Like

I don’t disagree; however, it’s standard for new APIs and designs to be proven out externally before being accepted into Python, where the cost of mistakes is much higher.

1 Like

I’ve gone ahead and implemented and published a version of this that uses heapq. It can’t use the same logic as asyncio.Semaphore, but I didn’t constrain myself to creating a subclass of asyncio.Semaphore to implement it. It does provide a compatible API to be allowed to be used in a drop-in fashion, this meant placing the priority setting in a contextvar, which had other benefits.

This likely needs further thought though, as this is effectively a single priority for all PrioritySemaphores in that context. This seemed much more desirable in many of the cases I have as a library maintainer, as it allows setting a priority without exposing an internal library owned semaphore, but I could conceive of a few other ways to do this that are more closely connected to each individual semaphore. I decided that this was likely uneccessary.

it lives here for now (I may place some of this on pypi at some point granularly, but this is just a repo with various asyncio recipes that I believe are written well enough for others to use, and most of which I use myself in some fashion)

use looks something like:

sem = PrioritySemaphore(5)
...

with priority_context(10):  # set priority for the context, is reset when exiting, can be safely nested
    async with sem:
        # make prioritized network request here.
2 Likes

I had the same thought about using a contextvar, FWIW :slight_smile: Now all we need is for asyncio itself to support task prioritization…

I think for it to be applied to tasks as a whole, you’d want something akin to a full os scheduler with priority being used for biased scheduling rather than strict priority ordering for completion. While I think something like this would be a good candidate for asyncio, this can be DIYed too as an event loop, event loop policy, and task factory.

1 Like