I’ve working on writing high-performance asyncio network code and after having tried all three of the available APIs, I’m wondering if I’m writing my code incorrectly.
The official docs suggest that working with socket objects directly is more convenient at the expense of performance, but my testing shows the opposite.
A simple benchmark using this server:
import socket
def socket_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 1234))
s.listen()
while True:
conn, _= s.accept()
bytes_read = 0
mv = memoryview(bytearray(MESSAGE_SIZE))
while bytes_read < MESSAGE_SIZE:
read = conn.recv_into(mv[bytes_read:])
if read == 0:
raise OSError("Closed by peer")
bytes_read += read
conn.sendall(MESSAGE)
conn.close()
socket_server()
Am I misunderstanding the use case of streams or using them in an poorly-performing manner?
I’ve also written a similar Protocol-based test client that slightly outperforms sockets as data size increases, but the lack of an awaitable write() method makes them a non-starter for my use case. Is there a way to ensure that a Transport completes sending its message over the wire before continuing execution?
asyncio streams are the slowest. loop.sock_*() functions are not bad but imply many epoll register/unregister calls for processed sockets.
The best performant and the hardest to implement approach is using asyncio protocols. I wrap all underlying logic into a class like Connection to provide high-level API with await conn.read() and await conn.write() interface.
The protocol-based way consists of two parts: reading and writing.
The writing requires correct handling of pause_reading() / resume_reading() callbacks; you can look at ./Lib/asyncio/streams.py:FlowControlMixin for inspiration. The flow control allows the implementation of await conn.write() on top of this concept. From the performance perspective, the crucial thing is to use zero-copy where possible. For example, asyncio transport has tr.writelines() method; the method calls sock.send_msg() for Python 3.12+ to avoid extra memcpy().
Efficient reading could be achieved by utilizing zero-copy BufferedProtocol as well. The idea is to allocate a buffer (bytearray) first and read into the buffer instead of alloc → read → process → free cycle provided by Protocol.data_received(). BufferedProtocol.get_buffer() returns a memoryview(self._buf)[self._offset:], BufferedProtocol.buffer_updated(n) does self._offset += n until the buffer has enough free space, self._offset = 0 otherwise. BufferedProtocol is more effective if the parser of incoming data also works with memory views without actual data copying and constructs python objects just before emitting them.
For example, HTTP headers parser that converts a buffer into a python string, splits the string into lines by s.split('\r\n'), splits each line into a header name and value, and emits corresponding name: value pairs is not very fast: each str.split() creates a bunch of new python strings (alloc+memcpy+free). The fast parser could first calculate offsets for each header name and value and create emitted Python strings only once. It would be the best solution if a non-copy parser already exists as a third-party C library.
Sorry for the long text; I hope you’ll get my point. Please feel free to ask if you have any questions.
Thanks for the detailed breakdown, Andrew! It sounds like the approach of essentially cloning FlowControlMixin for write handling and implementing a simple BufferedProtocol protocol should be highly performant. Am I misunderstanding the complexity of doing this? Here’s a small test example I wrote up:
class FastProtocol(asyncio.BufferedProtocol):
def __init__(self):
super().__init__()
self._buffer = memoryview(bytearray(MESSAGE_SIZE))
self._offset = 0
self._done = None
self._connection_lost = False
self._paused = False
self._drain_waiters = collections.deque()
self._loop = asyncio.get_running_loop()
def connection_made(self, transport):
self.transport = transport
async def write(self, message: bytes):
self.transport.write(message)
await self._drain_helper()
async def read(self):
self._done = self._loop.create_future()
await self._done
def get_buffer(self, sizehint: int):
return self._buffer[self._offset:]
def buffer_updated(self, nbytes: int):
if not self._done.done():
self._offset += nbytes
if self._offset == MESSAGE_SIZE:
self._done.set_result(True)
def pause_writing(self):
assert not self._paused
self._paused = True
def resume_writing(self):
assert self._paused
self._paused = False
for waiter in self._drain_waiters:
if not waiter.done():
waiter.set_result(None)
def connection_lost(self, exc):
self._connection_lost = True
# Wake up the writer(s) if currently paused.
if not self._paused:
return
for waiter in self._drain_waiters:
if not waiter.done():
if exc is None:
waiter.set_result(None)
else:
waiter.set_exception(exc)
async def _drain_helper(self):
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
return
waiter = self._loop.create_future()
self._drain_waiters.append(waiter)
try:
await waiter
finally:
self._drain_waiters.remove(waiter)
def data(self):
return self._buffer
Is there a reason asyncio streams aren’t implemented in this way? They already wrap transports/protocols internally, so speeding up their performance significantly seems to be mostly a matter of improving the underlying implementations. Are there other motivations behind the behavior of streams that preclude better native performance?
The snippet is good in general, but there is an important misunderstanding:
The message size usually varies, and the allocated buffer is usually much larger than the message size.
Say the message size is 1-500 KB, and the buffer size is 4MB.
The implementation needs two offsets: read_off and ready_off.
<--------------------------- buffer --------------------->
: <= ready_off : <= read_off
<---- data to process -----> <-- reading space -->
async def FastProtocol.read(): should return a memoryview for buf[:ready_off: read_off]; or it waits for the data if nothing is available.
In turn, the protocol should ask for reading into buf[read_off:] until read_off doesn’t reach the right bound (or something close to it, e.g. right_bound - buf_size // 4 to avoid enforcement of reading a few bytes).
After reaching the right bound, the reading space moves to the left: read_off = 0; buf[read_off: ready_off].
<--------------------------- buffer ------------------------------------>
: <= read_off : <= ready_off : <= right_bound
<----- reading space ----> <---- data to process ---->
When all data is processed, ready_off is moved to the left (ready_off = 0), and everything starts from the beginning.
If the idea is still unclear, I can try to display Python code.
I see, so the intent is to implement a semi-circular buffer inside the Protocol, wrapping around when the writing offset reaches the end?
Can you explain the difference between right_bound and the end of the buffer? From my testing so far, if you track offsets and messages carefully, a message could have a section both at the end of the buffer and at the start, having wrapped around with the writing offset. Does adding a separate right_bound simplify this?
I went ahead and tested out both stream alternatives. They were both faster than asyncio’s streams, but still slower than our implementation using sockets. We’d also prefer to avoid adding external dependencies to our library, even ones as popular as trio and anyio.
Our initial implementation used the asyncio loop.* socket methods to implement network I/O since we use the synchronous versions in our synchronous API.
@NoahStapp Does adding a separate right_bound simplify this?
It depends on your message structure. Sometimes, the wrapping is undesirable, and proper tuning of right_bound could prevent it.
In another case, the parser could construct the message from two parts, the first at the right and the second at the left.
Even if you have such an advanced parser, reducing the right bound could help reduce the number of syscalls.
For example, len(buf) is 1024 (in reality, it should be 4MB or even 32MB for better network utilization), read_off is 1000, and the socket buffer has 200 unread bytes.
If proto.get_buffer() returns memory_view(buf)[1000:] the reading of 200 ‘ready’ bytes requires two syscalls: read_info(buf[1000:]) for fetching first 24 ‘ready’ bytes and next read_into(buf[0:ready_off]) for fetching next 176 bytes for the socket buffer.
Fine-tuning of right_bound could avoid the first call and switch to read_into(buf[0:ready_off]) to get all the data from the socket read buffer in the single syscall.
It can make a visible speedup. Also, having a buffer big enough to store multiple messages in the buffer by the single read_into() syscall is essential.
If the message parsing is fast, reducing the number of syscalls is the key. If not, the task is not io-bound anymore, and network performance is not a bottleneck.