Discussion on a new streaming API for asyncio

The asyncio API could be improved by doing the following:

Combining StreamReader and StreamWriter to a single Stream class

This is useful because currently the user needs to know that he needs to close the writer and not the reader which is confusing.

I think that the cleanest way to make this work is that the class Stream will hold a reader and a writer and then it will handle the api calls by calling the corresponding reader / writer member functions.

Make the StreamWriter.write a coroutine and make it call drain

Currently the intended use to drain is as follows:

w.write(data)
await w.drain()

Why should there be 2 api functions when write could do all this in a single function call.

Adding an aclose coroutine to shutdown the stream gracefully

Currently there is only one implementation of close and it's not a coroutine, meaning that the only thing it can do it shutdown the socket forcefully.

This is how Trio does it: https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose

Conclusion

If you have more suggestions post them on this topic. Also if you have design ideas on how the new api should be implemented post them on this topic as well.
1 Like

I’ll be adding some thoughts later today or over the weekend. But the overall idea has my full support.

@njs Quick question: have you finalized the Trio streaming API design?

1 Like

No, that’s been stalled waiting on me to find more emotional energy to pick it up again. I really should do that though :slight_smile:

Not sure how this relates to https://bugs.python.org/issue41305 – from the discussion there it seems like this is supposed to be linked, but that issue is about readinto and there’s no mention of readinto here.

Anyway, I’ll bring it up: I’ve been very hesitant to add readinto to stream APIs, because I’ve never been able to figure out how to make use of it outside of a few very specific cases.

The big issue is that 99% of the time, protocol handlers need a kind of “sliding buffer”, where you repeatedly append bytes to one end, scan the buffer, and extract from the beginning. In CPython by far the simplest and most efficient way to do this is to use a bytearray, since it has efficient append-to-end and delete-from-beginning operations (amortized O(n)), and works with efficient string handling primitives like bytearray.find and re.search. But if you’re using a bytearray as your buffer structure, then readinto is useless, because bytearray doesn’t have a “reserve space at the end” primitive, and even if it did, then you’d still have to keep copying the buffer contents as you slide the buffer.

OTOH you could implement some kind of fancy chunked or circular buffer that can take advantage of readinto, but I’m pretty sure if you do this in Python then the bookkeeping overhead will overwhelm the savings you get from saving a few memcpy’s. memcpy is extremely fast; not free, but it’s only worth worrying about if everything else in your design is also super-fast.

I do think it might make sense to have a native code object that has similar semantics to bytearray but optimized for network processing, and then have a read_and_extend(buf) operation that acts like buf += read(), but can take advantage of knowledge of the internal representation to implement this using zero-copy and scatter/gather primitives. If we had a solid version of this primitive and used it throughout the ecosystem, that might be worth it. But that would require building the primitive and making it universally available and getting it universally adopted, which is a major roadblock to overcome.

Nathaniel, thanks for joining the discussion :slight_smile:

Tony, Nathaniel, To be frank I don’t really care about readinto at this stage, but I do care about the ABCs / method naming. Would be great if we could focus on the big picture and stuff we know must be there and then once that settled we can discuss nits like readinto. // my2c

I think most of the method names inherited in Trio’s Stream interface would work well for our purposes, notably:

receive_some()
send_all()
aclose()

(and send_eof() for HalfCloseableStream)

However, I’m really not a huge fan of the naming of wait_send_all_might_not_block(), and not entirely convinced that its functionality is immediately necessary in asyncio.

@njs Do you have any thoughts on how essential and widely used that specific member is at the moment? I think we may want to consider focusing on the more essential components of the streaming API, and for now stall on implementing similar congestion control/prioritization members (or adding them to the streaming ABCs) until we have a better idea of what it should look like.

Other than that, I think the distinction of a HalfCloseableStream and Stream makes sense for eventually providing TLS support in the high-level streams API (IIUC, SSL/TSL protocol does not allow for closing only the sending side). Also, I like the idea of having the socket-specific members (getsockopt and setsockopt) in a SocketStream class.

The listener class seems good to me.

As for AsyncResource, I do very much like the idea of having async context manager support for all of the relevant classes, and all of them supporting an aclose(). However, instead of having a distinct aclose() and aclose_forcefully(), I think this can be merged into a single aclose() member with a wait flag that defaults to true, similar to the approach in concurrent.futures.Executor.shudown() (or perhaps a force flag that defaults to false?).

I’ll have to spend some more time considering the Send/Receive channel classes, but I like the general concept of providing a simplistic means of passing Python objects in appropriate scenarios, instead of having to deal with raw bytes or memory views for everything. That might be something to consider later down the road though.

(See the trio docs for details on the members described above.)

The Trio ABCs are a good v1, but now that we have experience with them we have a bunch of tweaks queued up:

  • Since some versions of TLS now support send_eof, HalfCloseableStream has been deprecated in favor of adding a fallible send_eof to Stream: #823
  • the terminology needs some tweaking to cover message-oriented vs byte-oriented streams, plus receive turns out to be too wordy in practice (1208 and #1125)
  • Listener needs to be redone (#823). This is pretty straightforward, but I’ve hesitated a bit on actually doing it because the new API will have a function-that-takes-a-function, and we’re currently up in the air about what convention we want to use for handling kwargs in these cases

I’m also uncertain about wait_send_all_might_not_block (#371).

So I also considered that approach initially, and decided not to do it. What I realized is, even if you do obj.aclose(wait=True), then you still need to define and document what happens when that’s cancelled… and the answer is that it should act like obj.close(wait=False). So I figured, if everyone is going to have to implement special handling of cancellation anyway, there’s no point in making them implement a flag too – that’s just more opportunities for bugs.

In case it’s not clear, note that aclose_forcefully is not a method on the ABC. It’s a single global function that can be used on any AsyncResource.

However, part of the reason this makes sense for Trio is that we have a clean way to cancel a single operation. In asyncio, I’m not sure you actually can implement aclose_forcefully properly. You can push the aclose call into a child task, and call cancel on that task and wait for it to finish, but then once it’s finished and you get a CancelledError you have to figure out whether to catch it. If someone “outside” you cancelled you, then catching it will cause their cancellation to be silently lost, so you can’t do that. But if they didn’t, then you need to catch it. But there’s no way to know which case you’re in… unfortunately this kind of ambiguity is pretty much unresolvable within asyncio.

2 Likes

Ok, to make it happen, we need to discuss this thing.

I say creating the following:

Stream:

  • holds a stream reader
  • holds a stream writer
  • implement all read function currently implemented by just calling the stream reader’s corresponding functions
  • same with the write functions

StreamWriter:

  • Change the write function to be async
  • Remove drain (actually I don’t know how drastic it would be deleting an api that is widely used, maybe an experienced maintainer can tell me that it would be wrong)

BaseTransport:

  • Add an async aclose function
  • Maybe rename close to be close_forcefully

I know this is kind of a repeat of my suggestions at the start, but I do want to have concrete answers to each of these api changes.