I’m trying to write some code that runs a subprocess, and yields the data from its stdout and stderr streams as it arrives, with the two streams “tagged” (so merging them in the subprocess call isn’t an option). I’m pretty sure (but I’m willing to be proved wrong!) that the best way of doing this is to read the streams in background tasks and add the data to a queue, which the driving function reads and yields values from. So I have the following code:
async def read_stream(stream, name, queue):
while True:
chunk = await stream.read(1024)
if not chunk:
break
await queue.put((chunk, name))
async def run(*cmd):
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
queue = asyncio.Queue()
async def watcher():
await asyncio.gather(read_stream(proc.stdout, "stdout", queue), read_stream(proc.stderr, "stderr", queue))
await queue.put((None, None))
watcher_task = asyncio.create_task(watcher())
try:
while True:
chunk, name = await queue.get()
if name is None:
break
yield (chunk, name)
except asyncio.CancelledError:
# Client disconnected — terminate subprocess
proc.kill()
raise
finally:
await watcher_task
await proc.wait()
This works, mostly, but there are a number of problems with it. First of all, it’s pretty messy and I find it hard to get my head round the logic (I say this from direct experience, having come back to the code after a few weeks, and spending time trying to remember how it works ). In addition, there’s a number of potential issues - the (None, None) sentinel makes the typing messy, and there’s possible problems with exceptions (the docs for gather say that if an exception occurs, the other tasks are left running in the background, which seems like it’s not what I want…)
What’s particularly frustrating here is that the whole thing is implementing what feels to me like a fairly basic operation - take the two streams stdout.read(1024) and stderr.read(1024) and merge them. If this were synchronous code, I feel like I’d find this pretty easy to write. Essentially, all of this just feels like a version of asyncio.as_completed() but for streams rather than tasks. But there’s no such function available in asyncio, and it’s surprisingly tricky to write in user code.
Am I missing an obvious way of doing this? And if not, can anyone offer any help in cleaning up this code to produce something more robust? If it can be made sufficiently generic, I’d be happy to package it up as a PR adding an asyncio.merge_streams() function for the stdlib, if that seems like a reasonable option.
Using a task group could at least help with the exception handling. The semantics are described in the docs as
The first time any of the tasks belonging to the group fails with an exception other than asyncio.CancelledError, the remaining tasks in the group are cancelled. No further tasks can then be added to the group. At this point, if the body of the async with statement is still active (i.e., __aexit__() hasn’t been called yet), the task directly containing the async with statement is also cancelled. The resulting asyncio.CancelledError will interrupt an await , but it will not bubble out of the containing async with statement.
I’m assuming there’s a subtle bug in the original since you have two producers and stop consuming after only one of them sends a sentinel value, if I’m misunderstanding the actual intent though, you can change it a bit.
So I had a very similar issue at work - I needed to merge several streams of events (one was producing text-to-speech data from OpenAI, another was producing some control events, a third was yielding ticks on a certain timer…).
You can rewrite it like:
async def watcher():
async with TaskGroup() as tg:
tg.create_task(read_stream(proc.stdout, "stdout", queue))
tg.create_task(read_stream(proc.stderr, "stderr", queue))
await queue.put((None, None))
Anyway, I don’t have a ready-to-go solution for you, but I can share my experience.
My solution was to spawn subtasks for each of these producers (using a TaskGroup, but never mind that), with either each subtask pushing events into a separate queue or being an instance of AsyncGenerator itself.
I ended up implementing a merge function that essentially takes a number of AsyncIterator[T] and merges then into an AsyncIterator[T1 | T2 | T3 | …]. This also required a small adapter that would take a queue and turn it into an AsyncIterator.
The end-of-stream was also a little tricky, like you mention. I ended up going with an Enum sentinel.
So the resulting code was something like:
async with TaskGroup() as tg:
# Spawn all children, with their inbox and outbox queues
async for event in merge(
tg,
make_iterator(queue1.get),
make_iterator(queue2.get),
make_iterator(queue3.get)
):
# deal with events
It’s a tricky thing and the stdlib doesn’t help a lot at this point. I wanted to have a higher-level abstraction that’s composable, but if you’re looking for something simple pushing into a shared queue is good, like you’re doing already. The taskgroup is useful so that when the consumer decides to quit the children don’t get abandoned (I used the Quattro create_background_task() so they all got cancelled instead of awaited).
Wrapping the streams as generators if needed is something a user can do (and in your case, you’d want the ability to do that since you’re having each stream merge with as a tuple including the name of the stream)
It doesn’t use task groups, but will properly propagate exceptions and not leave hanging tasks.
It took me a second to realize why you were using the (None, None) sentinel and how the control flow of this actually worked. At first glance, I thought this would block until the streams were completely read and filled the queue before yielding any values from run, but working through it made me realize that that’s not the case.
As I understand it currently [1], run does the following (in execution order, assuming you haven’t set the event loop to eagerly start tasks and as best as I can guess with the jumping around nature of async)
Create a subprocess to run cmd and directs stdout and stderr from that subprocess to StreamReader instances on the returned Process instance, proc
proc starts running in the background
queue = asyncio.Queue()
Create a queue to use for combining the “tagged” streams
watcher_task = asyncio.create_task(watcher())
Create a task to run the watcher coroutine and schedule it to run later
try:
while True:
chunk, name = await queue.get()
...
Start a loop to actually start reading data and merging it through the queue. Since you haven’t yielded control of the event loop yet, watcher has not had a chance to start running yet, so queue is definitely empty at this point, so it will wait here until something is available. The event loop will move on to any other waiting jobs (in isolation, this is just going to be the task to run watcher)
watcher starts running and creates two new tasks (implicitly through asyncio.gather which will schedule tasks for any coroutines it is passed) to read the data from the proc.stdout and proc.stderrStreamReader instances. await asyncio.gather(... will wait for both of those tasks to finish before moving on to the rest of the body of watcher
async def read_stream(stream, name, queue):
while True:
chunk = await stream.read(1024)
if not chunk:
break
await queue.put((chunk, name))
Two instances of read_stream are now on the event loop and can start running (since watcher is waiting for them to finish and queue is still empty). They each start reading data from their own StreamReader instance in chunks of (at most) 1024 bytes and puts them in queue with the string name to tag them.
Abandoning the numbering of steps now, since now it’s into the actual meat of the code and it’ll be doing the async jumping around. There are 4 tasks on the event loop, one for each of the streams, one for the “main” task in run, and one for watcher (which is just waiting for the two read_stream tasks to finish, so won’t come into play).
A simpler version of the loop that run is doing now (ignoring the if name is None part since it won’t be True until the read_stream tasks finish)
while True:
chunk, name = await queue.get()
yield (chunk, name)
and each of the read_stream tasks are doing
while True:
chunk = await stream.read(1024)
if not chunk:
break
await queue.put((chunk, name))
chunk will be whatever bytes are available (up to 1024 bytes) in each stream, until EOF is received, at which point it will be an empty bytes object and trigger the if not chunk: suite, which will then finish the task.
(Focusing on happy path and ignoring the potential for any exceptions)
These 3 loops will do the actual work you care about, i.e. reading from stdout and stderr of the subprocess running cmd until both of those streams are complete and send EOF, merging them together in queue with a string tag, and yielding the tagged chunks from run.
Once both of the read_stream tasks complete, the event loop can yield control back to the task for watcher, which then runs its final line
await queue.put((None, None))
The (None, None) sentinel serves to indicate to run that both streams are finished and lets it break out of its queue reading loop
try:
while True:
chunk, name = await queue.get()
if name is None:
break
yield (chunk, name)
which then completes the cleanup in the finally suite
finally:
await watcher_task
await proc.wait()
I made a few modifications to your code that I think make it a bit more robust/simpler/clearer - replacing the (None, None) sentinel with shutting down the queue, using the TaskGroup API to get better error handling for the read_stream tasks, and combining the cleanup waits for proc and watcher_task
import asyncio
async def read_stream(stream, name, queue):
while True:
chunk = await stream.read(1024)
if not chunk:
break
await queue.put((chunk, name))
async def run(*cmd):
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
queue = asyncio.Queue()
async def watcher():
try:
# Using TaskGroup API
# exception in one task will cancel the other
async with asyncio.TaskGroup() as tg:
stdout_task = tg.create_task(
read_stream(proc.stdout, "stdout", queue)
)
stderr_task = tg.create_task(
read_stream(proc.stderr, "stderr", queue)
)
except* ExceptionGroup:
# do whatever error handling is desired
# or just omit the except*
...
finally:
# Use queue.shutdown instead of (None, None) sentinel
queue.shutdown()
watcher_task = asyncio.create_task(watcher())
try:
while True:
try:
chunk, name = await queue.get()
yield (chunk, name)
except asyncio.QueueShutDown:
break
except asyncio.CancelledError:
# Client disconnected — terminate subprocess
proc.kill()
raise
finally:
# Combine these together in a `gather` so that they both will run,
# even if `watcher_task` raises an exception
await asyncio.gather(watcher_task, proc.wait())
Over explaining every step here to make sure I’ve got it correct, anyone please correct me if I’ve got any mistakes ↩︎
Nice! Yes, this is a standalone app and I can choose my Python version, so “the latest possible” is my preference. The use of a done callback to do the shutdown is something I wouldn’t have thought about, the docs emphasise the await style to the point where callbacks feel like something you should avoid. But it’s much cleaner here than the sentinel approach.
And yes, reading one sentinel was a bug. In the original code I extracted this from, the sentinel handling was slightly different and avoided that bug (I hope! Must check… )
While I don’t need this, it does have some interesting aspects that I need to study. The use of ensure_future isn’t clear to me - I have real trouble understanding the difference between coroutines, futures and tasks, and as a result I usually end up just changing things until something works, which is a terrible way of coding, as I never really understand what I end up with.
I really wish there was better documentation of the basics principles of Python async. It feels like there’s a mixture of low-level detail and high-level usage examples, but no real explanation of the architecture anywhere. And it doesn’t help that there’s a bunch of stuff that’s language-level (where the language ref is heavy on technical details but extremely light on motivation and usage) alongside a bunch of library stuff that is comparatively light on technical detail.
For example, one thing that struck me today while thinking about this is that at the language level, there’s no means for async code to ever run concurrently - you only ever get concurrency in the context of an event loop, and the event loop is implemented in asyncio. I noticed this when thinking about merging two async iterators - at the language level it makes no sense to say “yield whichever of these two iterators returns a value first” because at that level, control flow is completely linear, and there is no concurrency. You need an event loop to even think about one returning before the other. Again, this is something I wish the docs explained more clearly, because without that explanation, my whole intuition about async iterators was basically wrong. (And I still haven’t worked out how to fix my intuition to match my new understanding )
Thanks. I almost came up with this, but I made a critical error. And again, it’s because my intuition was flawed. In some ways this goes back to the “pain points with async” discussion - somehow, the async docs for Python do a very bad job of giving people a good intuitive model of how async actually works. And as a result, you get people like me asking dumb questions and frustrating experienced users because we keep talking past each other as a result of having completely mismatched mental models. I wish I knew how to fix this - maybe when I become an expert, I’ll remember enough of the pain I went through to be able to write some beginner documentation that leads people to think in the right way. Who knows?
But let’s not get this thread diverted into the same endless debate the last thread suffered. The mistake I made was not making watcher into a named function, but trying to inline it into the calling function. Actually what I did was write the body inline, and never even considered breaking it out into a function, because it’s simple enough not to need factoring out. But the problem is, that’s not all an async function does. It’s not just a structuring mechanism, it’s also something that I don’t have a good name for, but I’ll call a “unit of concurrency”. You can run watcher as a background task while the queue consumer processes the data, whereas if you inline the code, it has to run before the consumer - destroying the concurrency you were trying to achieve.
Is there an official name for this “unit of concurrency” property? Because it feels like there should be, in order to emphasise the fact that async functions, unlike sync ones, are not simply ways of factoring out common code.
This is where I’ve been trying to go with my refactorings. As I mentioned above, I got very stuck because I expected such a function to be largely just procedural code, like a sync equivalent (something like an itertools.interleave) would be. But the fact that concurrency is involved means that in fact it’s intimately tied to the event loop (and hence asyncio functions).
Ideally, I’d like something like merge to be part of the stdlib - it’s clearly a worthwhile candidate, intuitively useful, with some fairly common use cases, and tricky enough that even an expert has to take care while implementing it. If not in the stdlib, then at least part of a well-known asynciotools utility library, so that people can easily find it. Because the whole “my intuition is flawed” problem means that it’s hard for inexperienced users like me to even frame a search for what they are after in a way that gives any useful results.
One of the really frustrating things about asyncio for me is the fact that it’s been around for a long time now, and we still don’t have any sort of “common utilities” library. There seems to be a very discouraging attitude in the community of “you can write that for yourself” - when there’s a bunch of evidence from people like me that sorry, but we really can’t…
It is, thanks. I’ll study that code from quattro. It’ll definitely help me in my learning. I still wish that more of this sort of stuff was available “off the shelf”, but I don’t want to imply that I’m blaming individuals for the fact that isn’t the case - it’s just how things are, and while it’s a shame, it’s not any one person’s fault.
Oh, that’s fantastic! Thanks very much for this - I shall certainly steal it I was heading vaguely in that direction myself, but there’s a huge amount of subtlety in your code that I had no idea was going to be needed, and I’m not sure I’d have ever worked out on my own.
Personally, I’d love it if this was in the stdlib. It feels very much like a “useful building block” to me, which I feel is what the stdlib should be. I don’t know how much support you’ll get from others, and I don’t feel like I have the authority in this area to push it through myself, but I’ll definitely support a PR if you make one.
Yes, one of the things about that code that I don’t like is how non-obvious that is. I made the same mistake myself when re-reading the code, and I wrote it
Your summary of the code flow looks right to me. One specific point:
I hadn’t completely appreciated that point - in my mind, based on experience with threads, I’d just vaguely thought of watcher as “running” at this point. Another area where I need to improve my intuition, I guess!
But yes, your analysis of the code is really helpful, and matches what I intended it to do, even if it’s possibly a bit of a happy accident that it actually does
Thanks for the rewrite - @mikeshardmind had also mentioned queue.shutdown, and that’s a clear improvement. The use of gather to wait on the watcher and the proc seems like a good improvement as well, although I do find it hard to reason about - if wait raises an exception, will exceptions in watcher get lost? Will we even continue waiting for gather to complete? I was tempted to say “do I even care?” at that point, but that’s sort of the issue - if I’m even tempted to just give up and hope things will be OK, what does that say about reliability?
I’m unclear about the introduction of ExceptionGroup handling. I don’t really see how I could end up with a group - as the comment says, an exception in one task will cancel the other, so there should only ever be a single exception, surely? Groups feel like a complication I’d prefer to avoid, but I don’t know if I can. I don’t think I want to let them propagate upwards. From what I understand, someone in the chain has to have an except* clause for proper handling, and I don’t want to make that my caller’s responsibility - the whole business with task groups is an implementation detail of this function that shouldn’t be visible to the caller.
Thanks to everyone for the comments so far. They have been immensely useful.
I’ll want to iterate on the latest version slightly more before suggesting PRing it, but I agree that this is a building block that has wide application, but isn’t trivial.
Your comment about ExceptionGroups is interesting, because I realize I know there are plenty of reasons someone would want to consume everything possible, even if one of the generators raises, and that led me to think also about whether or not a version for the stdlib should call aclose (effectively taking ownership of the generator)
By the time the event loop gets back to the right task to report back on which are done, there might be multiple done in an exception state. The core of the event loop can basically be thought of as a big while loop that just advances the current tasks scheduled forward to their next suspension point, doing book keeping and scheduling of new tasks and callbacks as required.
It does make me wonder what the UI of the run function should be, though. For my actual application, it doesn’t matter much - I just log exceptions and go back to the main request handling loop. But for applications which do care about exceptions, do I need to point out that run can raise an exception group? Because as I understand it, to catch (say) a ValueError, the caller needs to have code to handle both a bare ValueError and one embedded in an exception group. Conversely, if I don’t want users to have to be prepared for exception groups, it’s not at all clear what I should do with one. Neither option seems particularly attractive…
Maybe I should re-raise the first exception in a group, on the basis that it’s the one that directly caused execution to terminate. Any other exceptions were “just” encountered while returning to the caller, and can be discarded. That seems like bad logic, though.
In the context of my use case, this doesn’t really matter. But for library code, I imagine it’s a difficult design choice.
I know right off the top of my head of a couple of places in my $work codebase that could be simplified with that construct[1], so I fully agree that it’s a really nice-to-have building block (that I would definitely not have tried to implement myself).
It’d also work quite nicely with the proposal in PEP 828 to allow yield from and async yield from in async generators. Paul’s example could be factored down to just
This is a cool idea but I have reservations about the implementation. You’re essentially running each iteration of the provided generators in a fresh task, right? Task churn aside, imagine if the code inside the generator uses an asyncio.timeout() or a TaskGroup around the yield; or any sort of context-locals are used (for observability for whatever). It simply wouldn’t work properly.
I’m not concerned about task churn here, the overhead is actually less than that of the queue’s internals.
The only way this should break is if the code passed into it is already broken. There’s nothing preventing a user from just getting the next value from an async generator themselves, anext is and has been well-defined.
The issue here would be people using asyncio.timeout() or similar in a place it isn’t actually safe to do so. I’ve stated my distaste for it in other recent asyncio threads, but it shouldn’t have been added prior to fixing the issues with yielding across contexts. Any issue that arises from this would be in user code, and should be fixed by fixing the async generator function until 789 is revisited. pep789 highlights that it’s the responsibility of the generator function to get this right currently.