I have a server script that has an asyncio event loop, with two nested tasks defined, called using run_forever.
The driver script is a Java code that generates a pool of threads, one per java task. Each task in java is composed of 3 steps:
- Execute a call to a C code binary.
- Do an SCP to download a binary file.
- Execute another call to a different C code binary (in the same C codebase).
The C code calls establish a socket to send JSON-encoded messages on to a server Python script.
The Python script receives the JSON message, decodes it, does some small SQL against a Postgres DB, then sends back a JSON-encoded response. One key point is that the C code, at the end of step #1 then does a print to STDOUT of the results returned from Python. The data printed to STDOUT is then parsed by the Java code, at which point the SCP is initiated.
The key snippet of the Python code is:
addr = (‘MY_HOST’, ‘PORT_NO’) –define the localhost and port no.
loop = asyncio.get_event_loop() –define asyncio loop
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) –define socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(addr) –bind to addr
sock.listen(N) –enable listening at addr for C code request, start refusing connections at N
async def sserver():
conn, addr = await loop.sock_accept(sock) *–accept connections, reply socket is *
tuple (conn, addr)* sr = SRequests(pool, loop, log) loop.create_task(sr.ahandle(conn)) *--all the work is done in ahandle*
The key method in the custom class SRequest is ahandle, a snippet of which is:
async def ahandle(self, conn):
self.conn = conn
msg_len = await self.loop.sock_recv(conn,8)
if len(lsg_len) == 8:
mlen_decoded = msg_len.decode('UTF-8') msgLength = int(mlen_decoded, 16) while 1: self.msg = await self.loop.sock_recv(conn, msgLength) if len(self.msg) == msgLength: break ... decode JSON message and generate a response.... ...handle 2 more messages from the C code
The reason we have calls to socket at all in the Python server script is because the C code always sends the length of the message first, then the contents of the message second. We cannot change the order, or design of the C code, so we need to receive and decode the “length” message first, in order to get how much to read for the second message in order to decode it correctly.
The destructor del in the Response classes that contain SQL call a commit method that commits the SQL just executed in the DB.
For N = 10, 20, 100,000
I see 4, 6 and 6 downloaded files. In all 3 cases I see the expected 60 DB connections being made, from the initial pool of 60 java threads. I see 4, 6 & 6 COMMIT, respectively, for the 3 cases of N tested.
If I add either a conn.shutdown or conn.close to the final destructor, I get either a Connection reset error, or a Bad File descriptor error thrown, and many hundreds more DB connections made.
The total size of the list of tasks to be processed on the java side can vary from 30K to 3 million or so.
So I would expect the java tasks to be responed to by the Python server in chunks of 60. The first 60 get processed, then another 60, and so on.
Why does the processing of requests stop after such a small no. of request have been processed ?
More requests are being sent from the Java driver code. Tailing the log file of the Python server script shows that after the initial 60 DB connections are made, and 4/6/6 processed, no more requests seem to be hitting the Python server. The log file just stops.
The backend DB is configured to handle a max of 1024 concurrent connections, so we are well below that limit.
Is there a limit on epoll, or somewhere else in asyncio that I’m hitting ?
Any tips or suggestions would be greatly appreciated.