Asyncio stops responding to requests

Hi all,

I have a server script that has an asyncio event loop, with two nested tasks defined, called using run_forever.

The driver script is a Java code that generates a pool of threads, one per java task. Each task in java is composed of 3 steps:

  1. Execute a call to a C code binary.
  2. Do an SCP to download a binary file.
  3. Execute another call to a different C code binary (in the same C codebase).

The C code calls establish a socket to send JSON-encoded messages on to a server Python script.
The Python script receives the JSON message, decodes it, does some small SQL against a Postgres DB, then sends back a JSON-encoded response. One key point is that the C code, at the end of step #1 then does a print to STDOUT of the results returned from Python. The data printed to STDOUT is then parsed by the Java code, at which point the SCP is initiated.

The key snippet of the Python code is:

import asyncio
import socket

addr = (‘MY_HOST’, ‘PORT_NO’) –define the localhost and port no.

loop = asyncio.get_event_loop() –define asyncio loop

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) –define socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False)
sock.bind(addr) –bind to addr
sock.listen(N) –enable listening at addr for C code request, start refusing connections at N

async def sserver():
while True:
conn, addr = await loop.sock_accept(sock) *–accept connections, reply socket is *

  •                                                                                          tuple (conn, addr)*
            sr = SRequests(pool, loop, log)
            loop.create_task(sr.ahandle(conn))                       *--all the work is done in ahandle*
    

loop.create_task(sserver())

try:
loop.run_forever()
except KeyboardInterrupt:
sserver.close()
loop.run_until_complete(sserver.wait_closed())
loop.close()
sock.shutdown(socket.SHUT_RDWR)
sock.close()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
sock.shutdown(socket.SHUT_RDWR)
sock.close()

The key method in the custom class SRequest is ahandle, a snippet of which is:

async def ahandle(self, conn):
self.conn = conn
while True:
while 1:
msg_len = await self.loop.sock_recv(conn,8)
if len(lsg_len) == 8:
break:

          mlen_decoded = msg_len.decode('UTF-8')
         msgLength = int(mlen_decoded, 16)

        while 1:
            self.msg = await self.loop.sock_recv(conn, msgLength)
            if len(self.msg) == msgLength:
                 break

       ... decode JSON message and generate a response....

       ...handle 2 more messages from the C code

The reason we have calls to socket at all in the Python server script is because the C code always sends the length of the message first, then the contents of the message second. We cannot change the order, or design of the C code, so we need to receive and decode the “length” message first, in order to get how much to read for the second message in order to decode it correctly.

The destructor del in the Response classes that contain SQL call a commit method that commits the SQL just executed in the DB.

For N = 10, 20, 100,000

I see 4, 6 and 6 downloaded files. In all 3 cases I see the expected 60 DB connections being made, from the initial pool of 60 java threads. I see 4, 6 & 6 COMMIT, respectively, for the 3 cases of N tested.

If I add either a conn.shutdown or conn.close to the final destructor, I get either a Connection reset error, or a Bad File descriptor error thrown, and many hundreds more DB connections made.

The total size of the list of tasks to be processed on the java side can vary from 30K to 3 million or so.
So I would expect the java tasks to be responed to by the Python server in chunks of 60. The first 60 get processed, then another 60, and so on.

Question:

Why does the processing of requests stop after such a small no. of request have been processed ?
More requests are being sent from the Java driver code. Tailing the log file of the Python server script shows that after the initial 60 DB connections are made, and 4/6/6 processed, no more requests seem to be hitting the Python server. The log file just stops.

The backend DB is configured to handle a max of 1024 concurrent connections, so we are well below that limit.

Is there a limit on epoll, or somewhere else in asyncio that I’m hitting ?

Any tips or suggestions would be greatly appreciated.

Thanks!

–Ed

I found the problem. I needed to add two items:

  1. Add callbacks to handle errors thrown during execution of a Task
  2. Add return statements at the successful end of a Task

Without either item, the error handler is only called when the final reference to the Task has gone out of scope. See https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/ for more details.

In case anyone else has a similar issue, the relevant code snippet is:

import asyncio
import socket
...
    sums_loop = asyncio.get_event_loop()
    sums_loop.set_debug(True)

    sums_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sums_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
    sums_sock.setblocking(False)
    sums_sock.bind(addr)
    sums_sock.listen(1024)

    def _handle_ahandle_throws(task: asyncio.Task):
        try:
            task.result()
        except asyncio.CancelledError:
            pass
        except Exception:
            logging.exception('Exception raised in ahandle... by task = %r', task)

    async def sum_server():
        while True:
            conn, client_addr = await sums_loop.sock_accept(sums_sock)
            sr = SUMS_Requests(sums_pool, sums_loop, log)
            sums_task = sums_loop.create_task(sr.ahandle(conn))
            sums_task.add_done_callback(_handle_ahandle_throws)

    sums_loop.create_task(sum_server())

    try:
        sums_loop.run_forever()
    except KeyboardInterrupt:
        log.writeInfo(['Received SIGINT from stop script.... shutting down now    pid = ' + str(pid)])
        sum_server.close()
        sums_loop.run_until_complete(sum_server.wait_closed())
        sums_loop.close()
        sums_sock.shutdown(socket.SHUT_RDWR)
        sums_sock.close()
    finally:
        sums_loop.run_until_complete(sums_loop.shutdown_asyncgens())
        sums_loop.close()
        sums_sock.shutdown(socket.SHUT_RDWR)
        sums_sock.close()

....

class SUMS_Requests(asyncio.Protocol, SUMSRequest):
    def __init__(self, pool, loop, log):
        self.pool = pool
        self.loop = loop
        self.log = log
        self.log.writeInfo(['Inside SUMS_Requests constructor...'])
        self.setup()
        return None

    def _handle_db_task_throws(task: asyncio.Task):
        try:
            task.result()
        except asyncio.CancelledError:
            pass
        except Exception:
            logging.exception('Exception raised by db_task = %r', task)

    async def getDBConn(self):
....

        dbConn = DBConnection(arguments.getArg('dbhost'), arguments.getArg('dbport'), arguments.getArg('database'), arguments.getArg('dbuser'), self.log, self.loop)
....
        return dbConn

Note use of add_done_callback to define the callback handler for the Task, and return statements at the end of the coroutine defined for the Task when create_task is called.

–Ed