Brief
Asyncio semaphores are commonly used in scenarios where resources are constrained or valuable, such as managing database connections, making API calls, or conducting AI model inference.
Prioritization in the same limiting semaphore can lead to better systems.
For example - By allowing batch jobs and realtime jobs with varying response time / priority requirements to share the same microservice instead of needing separate deployments.
Priority queues can be used, they can result in code that is less intuitive and harder to maintain.
Usage
Regular semaphore
sem = asyncio.Semaphore(10)
# ... later
async with sem:
# work with shared resource
OR
sem = asyncio.Semaphore(10)
# ... later
await sem.acquire()
try:
# work with shared resource
finally:
sem.release()
Priority semaphore
sem = asyncio.PrioritySemaphore(10)
# ... later
async with sem.priority(task_priority):
# work with shared resource
OR
sem = asyncio.PrioritySemaphore(10)
# ... later
await sem.priority_acquire(task_priority)
try:
# work with shared resource
finally:
sem.release()
Where task_priority
is a floating point number.
Implementation
I think that integrating priority management into asyncio semaphores works well with the way tasks are currently tracked in self._waiters
. The main adjustment is relatively minor: instead of appending tasks to a deque, we would utilize bisect.insort_right
to maintain a sorted deque ordered by priority. (also the priority needs to be hidden in __iter__
, remove
, popleft
, append
, etc…)
Sample implementation
import asyncio
from collections import deque
from bisect import insort_right
class PrioritySortedDeque(deque):
def __init__(self):
super().__init__()
self.append_priority = 0.0
def __iter__(self):
return (i[1] for i in super().__iter__())
def append(self, object):
insort_right(self, (self.append_priority, object))
def remove(self, value):
del self[next(i for i, v in enumerate(self) if v is value)]
def popleft(self):
return super().popleft()[1]
class PrioritySemaphore(asyncio.Semaphore):
def __init__(self, *_):
super().__init__(*_)
self._waiters = PrioritySortedDeque()
def priority(self, priority):
self._waiters.append_priority = priority
return self
async def priority_acquire(self, priority):
self._waiters.append_priority = priority
return await super().acquire()
Behavior
if __name__ == "__main__":
sem, p_sem = None, None
async def normal_semaphored_print(val):
async with sem:
await asyncio.sleep(0.0)
print(val, end=", ")
async def prioritized_print(val):
async with p_sem.priority(val):
await asyncio.sleep(0.0)
print(val, end=", ")
async def prioritized_print_2(val):
await p_sem.priority_acquire(val)
await asyncio.sleep(0.0)
print(val, end=", ")
p_sem.release()
async def main():
global sem, p_sem
p_sem = PrioritySemaphore(4)
sem = asyncio.Semaphore(4)
await asyncio.gather(*map(normal_semaphored_print, range(20, 0, -1)))
print()
await asyncio.gather(*map(prioritized_print, range(20, 0, -1)))
print()
await asyncio.gather(*map(prioritized_print_2, range(20, 0, -1)))
asyncio.run(main())
Outputs
20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1,
20, 19, 18, 17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
20, 19, 18, 17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
Performance
Based on just running the code above without the print it looks like < 50% slower and I think that is negligible.