How to handle Ctrl-C on asyncio server and client programs

Hi community,

  1. When I Ctrl-C on terminal to kill the server, I get random error as following.

I can still get different error, If I replace:

  asyncio.run(main())

with:

  loop = asyncio.get_event_loop()
  loop.run_until_complete(main())
  1. When I Ctrl-C on server terminal, it does not terminate the program, it just disconnects one client instead.

  2. When I Ctrl-C on client terminal, it does not terminate the program, sometimes. I need Ctrl-C again to terminate it.

Exception ignored in: <coroutine object handle_conn at 0x100f35040>
Traceback (most recent call last):
  File "/Users/ljh/Documents/asyncio/src/server/server.py", line 33, in handle_conn
    writer.close()
  File "/usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/streams.py", line 344, in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/selector_events.py", line 839, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 761, in call_soon
    self._check_closed()
  File "/usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 519, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task cancelling name='Task-4102' coro=<handle_conn() done, defined at /Users/ljh/Documents/asyncio/src/server/server.py:10> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[gather.<locals>._done_callback() at /usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py:754]>
Task was destroyed but it is pending!
task: <Task pending name='Task-22526' coro=<StreamReader.read() done, defined at /usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/streams.py:648> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_release_waiter(<Future pendi...ask_wakeup()]>)() at /usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py:421]>
Task was destroyed but it is pending!
task: <Task pending name='Task-22527' coro=<StreamReader.read() done, defined at /usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/streams.py:648> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_release_waiter(<Future pendi...ask_wakeup()]>)() at /usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py:421]>
$ 
# python3 server.py <port>

import asyncio
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(
    os.path.abspath(__file__))))
from logging2.logging2 import *

async def handle_conn(reader, writer):
    seq = 0
    addr = writer.get_extra_info('peername')
    INFO(f'accepting connection from {addr}')

    while True:
        seq += 1

        try:
            writer.write(f'hello client {seq}\n'.encode())
            await writer.drain()

            data = await asyncio.wait_for(reader.read(100), timeout=0.1)
            if data:
                print(f'{data.decode()}', end='')

        except asyncio.TimeoutError as e:
            INFO(f'{e}')
            continue
        except BaseException as e:
            INFO(f'{e}')
            break

    writer.close()

async def main():
    port = sys.argv[1]
    server = await asyncio.start_server(handle_conn, '', port)

    addrs = ', '.join(str(sock.getsockname())
                      for sock in server.sockets)
    INFO(f'serving on {addrs}')

    async with server:
        await server.serve_forever()

try:
    logging2_init()
    asyncio.run(main())
except BaseException as e:
    INFO(f'{e}')


# python3 client.py <ip> <port> <client_tag>

import asyncio
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(
    os.path.abspath(__file__))))
from logging2.logging2 import *

async def handle_conn(reader, writer, message):
    seq = 0

    while True:
        seq += 1

        try:
            writer.write(f'hello from {message} {seq}\n'.encode())
            await writer.drain()

            data = await asyncio.wait_for(reader.read(100), timeout=0.1)
            if data:
                print(f'{data.decode()}', end='')

        except asyncio.TimeoutError as e:
            INFO(f'{e}')
            continue
        except BaseException as e:
            INFO(f'{e}')
            break

    writer.close()

async def main():
    ip = sys.argv[1]
    port = sys.argv[2]
    tag = sys.argv[3]
    INFO(f'connecting to {ip}:{port}')

    reader, writer = await asyncio.open_connection(ip, port)
    await handle_conn(reader, writer, tag)

try:
    logging2_init()
    asyncio.run(main())
except BaseException as e:
    INFO(f'{e}')

thanks to chatGPT, I now get this

# python3 server.py <port>

import asyncio
import signal
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(
    os.path.abspath(__file__))))
from logging2.logging2 import *

async def handle_conn(reader, writer):
    seq = 0
    addr = writer.get_extra_info('peername')
    INFO(f'accepting connection from {addr}')

    while True:
        seq += 1

        try:
            writer.write(f'hello client {seq}\n'.encode())
            await writer.drain()

            data = await asyncio.wait_for(reader.read(100), timeout=0.1)
            if data:
                print(f'{data.decode()}', end='')

        except asyncio.TimeoutError as e:
            # INFO(f'{e}')
            continue
        except BaseException as e:
            INFO(f'{e}')
            break

    writer.close()

async def main():
    port = sys.argv[1]
    server = await asyncio.start_server(handle_conn, '', port)

    addrs = ', '.join(str(sock.getsockname())
                      for sock in server.sockets)
    INFO(f'serving on {addrs}')

    async with server:
        await server.serve_forever()

try:
    logging2_init()
    # asyncio.run(main())
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(main())
    shielded_task = asyncio.shield(task)
    loop.add_signal_handler(signal.SIGINT, lambda: shielded_task.cancel())
    loop.add_signal_handler(signal.SIGTERM, lambda: shielded_task.cancel())
    loop.run_until_complete(shielded_task)
except BaseException as e:
    INFO(f'{e}')


# python3 client.py <ip> <port> <client_tag>

import asyncio
import signal
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(
    os.path.abspath(__file__))))
from logging2.logging2 import *

async def handle_conn(reader, writer, tag):
    seq = 0

    while True:
        seq += 1

        try:
            writer.write(f'hello from {tag} {seq}\n'.encode())
            await writer.drain()

            data = await asyncio.wait_for(reader.read(100), timeout=0.1)
            if data:
                print(f'{data.decode()}', end='')

        except asyncio.TimeoutError as e:
            # INFO(f'{e}')
            continue
        except BaseException as e:
            INFO(f'{e}')
            break

    writer.close()

async def main():
    ip = sys.argv[1]
    port = sys.argv[2]
    tag = sys.argv[3]
    INFO(f'connecting to {ip}:{port}')

    reader, writer = await asyncio.open_connection(ip, port)
    await handle_conn(reader, writer, tag)

try:
    logging2_init()
    # asyncio.run(main())
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(main())
    shielded_task = asyncio.shield(task)
    loop.add_signal_handler(signal.SIGINT, lambda: shielded_task.cancel())
    loop.add_signal_handler(signal.SIGTERM, lambda: shielded_task.cancel())
    loop.run_until_complete(shielded_task)
except BaseException as e:
    INFO(f'{e}')