Not particularly, no. I’ve been meaning to do a more in-depth writeup about asyncio, threading, and safe concurrency in Python based on my experiences using it in a highly distributed industrial setting at some point.
@Liz is correct here, as long as both sides have predictable bounds, there’s no runaway memory use or leak possible here, but this doesn’t address your ideas on a weighted semaphore. It might help to see this in a structured example to convince yourself of it.
async def producer(q: asyncio.Queue):
while check_not_shutting_down(): # implement this however you want
job = None
try:
job = ...
await q.put(job) # backpressure here keeps producer holding a maximum of 1 job
except asyncio.CancelledError:
if job is not None: # we still have a job to handle
await q.put(job)
raise
async def consumer(q: asyncio.Queue):
while job := await q.get():
await process_job(job) # the total work of this function at any given time is bound by number of consumers and how much work any given job can be.
q.task_done()
async def driver():
q = asyncio.Queue(maxsize=5)
producers = {asyncio.create_task(producer(q)) for _ in range(2)}
consumers = {asyncio.create_task(consumer(q)) for _ in range(5)}
try:
await asyncio.gather(*producers)
except SomeShutdown:
signal_shutdown_to_producers()
for task in producers:
task.cancel()
await asyncio.gather(*producers, return_exceptions=True)
# handle exceptions here how you see fit, retrieve them from the task objects
await q.join() # queue is now empty, and safe to shutdown consumers
for task in consumers:
task.cancel()
await asyncio.gather(*consumers, return_exceptions=True)
# handle exceptions here how you see fit, retrieve them from the task objects
The numbers there are obviously tweakable, and there’s other ways to do this as well, but this seems like a case where you’re too used to what works in go, and go has a slightly different concurrency model than asyncio does. asyncio tasks are just thin wrappers around coroutines scheduled to the event loop that don’t block execution in the current context. coroutines in python aren’t threads (but may wrap them) and are internally just state machines (technically, originally implemented as generators) If you have a fixed number of them and are tracking them, there’s fixed maximum overhead for them.
The hard part here in python isn’t ensuring that there isn’t a memory leak, but in proper signal handling for graceful shutdown of an asyncio application, as without a lot more work than some people would consider reasonable, python can interrupt anywhere, including in critical sections.
To do this reliably in a cross-platform manner, you need to use signal.set_wakeup_fd
and handle this yourself, and this often means your event loop should actually be in a thread, with the main thread existing only to handle signals and interact with any potential console, tui, or gui
As for implementing a weighted semaphore or a queue with job weights, you can implement it yourself if you want, asyncio’s synchronization primitives aren’t truly primitives and you can bring your own replacements, but there’s more complication there that you’ll have to judge the worth of.