As a non-expert, I’m not 100% clear what is meant by “structured concurrency” here. The way I’ve always understood it was as a fairly informal term, meaning basically “use high-level constructs rather than low-level primitives”. But the way you are using it suggests that it has a much more precise meaning - can you clarify?
No, it’s explicitly not what I’m after. Sometimes just logging to stderr and not crashing is the desired behavior, but you still want to be sure that the task is run to completion or cancelled.
Semantics like this are closer to those people already expect with the concurrent.futures apis, while still maintaining structured concurrency benefits of no task outliving it’s intended scope (in this case, the application lifetime), and errors being handled prompty, it’s just in this case, a convienient api for log and move on at failure.
This other thread is more relevant to possible designs for that
In some ways I agree with you - my intention was never to design a library function here. But my problem is that I have code that doesn’t bother abstracting this, and when I came back to it after a few months, I found it difficult to understand and reason about. That’s not how I generally find Python code to be, so my immediate assumption was that I’d missed some existing abstractions which would express my application logic more clearly. “Merging two asynchronous streams” was the most obvious abstraction I could think of, and my original post was very much about asking whether such an abstraction extisted (or if it didn’t, what other abstraction made sense here).
The fact that the discussion resulted in 3 suggested implementations for the logic of merging streams says a couple of things to me:
- Some fairly basic abstractions seem to currently be missing, from the stdlib and from “well known” 3rd party libraries.
- Getting these abstractions right feels like a pretty difficult task (after all, the experts don’t seem to be agreeing on the best approach!) so I was right to want to find a pre-built solution rather than writing my own.
The secondary lesson seems to be that my ideal of writing async code at a level of abstraction similar to how I write other Python code may not actually be possible yet. That surprises me, as asyncio isn’t exactly a new Python feature any more.
Agreed, and that’s what I’ll probably do in the short term. Use the implementation @mikeshardmind offered, as it seems the most understandable to me, and leave the experts to work out how to provide higher level abstractions for general use. It’s only when @agronholm offered another possible approach, based on anyio, that the discussion flared up again, because it seemed like anyio might have the abstraction I was originally hoping for. But as we dug into it, the same complexities started coming up, and undermined what little confidence I’d gained. After all, @agronholm seems to be suggesting that async generators - in other words, use of yield in an async function - have all sorts of pitfalls that are well known in the async community, but which I only just understand, and I certainly can’t spot when I’m writing code that might trigger these issues
And I was certainly expecting to use them in my application code…
This thread has been very useful to me, and I really don’t want to derail it by turning it into another “why I find asyncio confusing” thread. But I do want to point out that my aim is simply to write Python in the sort of high-level style that I’ve always used for my synchronous code and (via concurrent.futures) my threaded code. The fact that it’s so hard to understand how to do that with asyncio is definitely a contributing factor to my tendency to avoid it[1].
And in contrast, when I briefly had to do some work with Javascript async code, I didn’t have anything like the conceptual difficulties I’m having with async Python - which is the exact opposite of my normal experience with the two languages… ↩︎
The background is explained in Nathaniel J. Smith’s original essay here, but in short it’s a design that requires all spawned tasks to remain tied to a running parent task, and enforces that the child task must exit before that parent can. That way there’s a place that exceptions in the child can bubble up to. Encapsulation is preserved better because you know that once a function you call returns, all its child tasks are also done.
Thanks. I’d read that a long time ago, but it was worth a refresher. I’d forgotten in particular that it tied the “structured concurrency” terminology specifically to the nursery concept of nesting task lifetimes.
In the context of trio (where, if I understand correctly, there’s no way to spawn a task outside of a nursery) this makes a lot of sense. Eliminate goto and you can rely on the abstractions structured constructs give you. I have some reservations about the need to pass nurseries around willy-nilly, exposing implementation details in the public API of functions, but those are a secondary issue.
The trouble is, trio isn’t really viable for me. There’s no way to use uvicorn with a trio event loop, for instance.
And because asyncio has (for backward compatibility reasons that I understand) the unrestricted asyncio.create_task() function, you don’t get the benefits of structured concurrency, even if you have task groups and the other mechanisms of structured concurrency. To use Nathaniel’s analogy, a function can never be a black box if the language supports unrestricted goto which means you can jump into or out of its implementation without any constraints. Promising never to use it in a “dangerous” way isn’t really scalable or practical[1].
I think where this leaves me is sticking with (relatively) low-level code and treating high level abstractions (even things like task groups
) with caution. I can’t say that I like this conclusion, but it’s significant that the very low-level implementation @mikeshardmind suggested actually feels the most understandable and robust to me…
@Liz - thanks for the reminder that maybe I am being too purist in thinking like this. Your post did remind me that sometimes “just get on with it and stop worrying about the big picture” is the right answer. But I’m trying to learn as well as develop a specific application, so I still think the big design questions are worthwhile (for me at least!)
Especially when you’re relying on 3rd party libraries for a bunch of your functionality ↩︎
The trouble is, trio isn’t really viable for me. There’s no way to use
uvicornwith a trio event loop, for instance.
Both hypercorn and anycorn support this. But I understand not wanting to run Trio, as its asyncio interoperability is a bit janky. But this is precisely the reason I made AnyIO – to offer the benefits of structured concurrency to those who want to take advantage of them without having to switch to a completely foreign event loop ecosystem.
And because asyncio has (for backward compatibility reasons that I understand) the unrestricted
asyncio.create_task()function, you don’t get the benefits of structured concurrency, even if you have task groups and the other mechanisms of structured concurrency.
Are you saying that the mere existence of asyncio.create_task() nullifies all the benefits of SC?
I think where this leaves me is sticking with (relatively) low-level code and treating high level abstractions (even things like task groups
) with caution. I can’t say that I like this conclusion, but it’s significant that the very low-level implementation @mikeshardmind suggested actually feels the most understandable and robust to me…
I had some difficulties understanding the flow in the code myself and whether all error conditions were properly handled, including cancellation. But if you understand the implementation, then you should be fine.
Not so much me, I thought that was what you (and/or @mikeshardmind) were debating about, though.
And to be clear, when I was talking about “the benefits of SC”, I was meaning the wider encapsulation benefits. The need to wrap an async generator in a context manager is the sort of thing I mean here - I’d expect one of the benefits of SC to be that creating async generators could be done in a uniform manner, without any need for special measures depending on how the generator worked. You don’t need to wrap normal synchronous generators in context managers, after all.
Sorry, I’m starting to nitpick a bit too much here. Maybe my expectations are just unreasonably high. I’m happy to let this drop, as I don’t think it’s adding much to the original question (which has been pretty thoroughly answered by this point!)
I’ve got an additional consideration in which one is best for a library implementation:
The low level version based off of waiting on multiple anext handles backpressure better than any of the queue based options.
The tasks started to consume from the generators into a queue in those versions can outpace consumption of the queue.
This can be partially mitigated by setting a maxsize on the queue, but there is no way to get the exact behavior “Get up to 1 available from each generator, yield all available at each opportunity” with the queue. The best available in that case is “Get the next available from any, wait additional event loop iterations for the next one to be placable into the queue”, with a maxsize of 1.
a maxsize equal to the number of generators doesn’t work, because some generators may ready their second, third and so on values prior to another generator reading their first.
A direct comparison of both an approach utilizing a queue and that using anext after polishing each a bit more is available here along with one more tweak that would be a change in use for a specific benefit
The key choices made in each implementation were centered around the following design choices:
- Does not drop values that were already available when encountering an exception.
- Reraises underlying exceptions properly.
- Limits the number of excess context switches
- Limit the number of intermediate objects required.
- Preserves underlying generator laziness and backpressure[1]
With those goals in mind, I had to make further tweaks to both approaches[2]. While I’m reasonably happy with where both landed, the version utilizing anext is superior in places where the two differ.
A further improvement here would be yielding a sequence of up to the length of the number of generators of available values, rather than one at a time. While this would force the consumer of the merged generators to iterate in a simple for loop, it would significantly cut down the number of context switches.
An implementation of that is in the file for comparison, While it’s implemented for the version with anext, the queue version could also accomplish this with awaiting for the first value, then using queue.get_nowait + catching queue empty to get all already available.
Could certainly use some opinions on if the reduced context switches are appealing enough to others to yield a sequence of values rather than a value at a time. I personally intend to replace some existing places where I used an inline solution with the batched version after putting all of these competing implementations through their paces.
I thought it was neat that you wrote up both versions and put them on GitHub. Reading through, I have a question on line 43 (if (not cancelled) and f.cancelled():).
- If
f.cancelled() and cancelled, won’t this raise onelif exc := f.exception():? IIUC calling.exception()or.result()on a cancelled future raisesCancelledError. Or is that intended? Or can that condition not happen?
Yes, I failed in keeping that fix in sync from the actual library code, I’ve resynced that implementation with what’s currently exported in the library
The only time these can end up cancelled involves user code that fetches all tasks and then cancels then, which most people should view as a last resort during event loop shutdown, but it’s still something to do correctly
I like this implementation, but I’m still unsure on your use of aclose. Isn’t that only for placing the error differently in the case of misuse, and can’t users avoid that misuse themselves by just not reusing the generator or by using contextlib.aclosing or with a linter rule?
I’m taking the time to give it more thought. Whatever I end up on, I do intend to propose adding to asyncio via PR, but it might be better to leave that as a documented user-edge and not bother (as you suggested).
Another option is is to change the API of this so that the use is:
async with merged_generators(*gens, exceptions="behavior_choice") as gen:
async for batch in gen:
for value in batch:
await do_something(value)
Multiple things give me pause about that:
- Nothing else in asyncio wraps this way, even where it might help users avoid issues, it instead is left open for users to compose themselves.
- It’s a lot of indentation, which users tend to be averse to, for being able to propogate an error during aclose that asyncio has already chosen to silence for users in similar cases with use of asyncgen hooks.
- It’s only relevent for async gens that have time sensitive cleanup if broken out of iteration. Such generators should already be context managers to begin with, so adding yet an additional context manager just in case the code being passed in isn’t well-written doesn’t sit right with me.