Asyncio ssl error

Hi all,

I was hoping the community could help me with an error I am getting (I am a newbie, so any help would be much appreciated).

I have apscheduler running function “frequent_jobs” every 2 minutes, to do a few sync methods and use the async method below to send messages through telegram bot. Every now and then, I face an error (also below). Any idea why this might be happening?


async def send_one_message(semaphore: asyncio.Semaphore, message_id, session):
    if (message_id is not None):
        async with semaphore:
            try:
                message = (await session.exec(select(NotificationMessage).where(NotificationMessage.id == message_id))).one()
                await telegram_bot.send_job_reminder_message(message.telegram_chat_id , message.text)
                await session.delete(message)
                await asyncio.sleep(5) #throttle a bit to try and avoid telegram api issues
            except Exception as err:
                logger.error(f"Error: {err.__class__.__name__}, Message ID {message_id}")
                logger.error(err)

async def send_notifications():
    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.orm.session import sessionmaker
    from sqlmodel.ext.asyncio.session import AsyncSession
    from sqlalchemy.pool import NullPool

    async_engine = create_async_engine(get_settings().db_url_async, poolclass=NullPool)
    async_session = sessionmaker(async_engine, expire_on_commit=True, class_=AsyncSession)
    semaphore = asyncio.Semaphore(20)
    logger.debug(f"start send_notifications")
    message_sending = []
    try:
        async with async_session() as session:
            for message in (await session.exec(select(NotificationMessage))).all():
                message_sending.append(asyncio.create_task(send_one_message(semaphore, message.id, session)))
            if len(message_sending) > 0:
                await asyncio.wait(message_sending)
                await session.commit()

    finally:
        # try:
        #     await telegram_bot.bot.close_session()  #close session after each batch. this causes lots of exceptions NoneType within telebot
        # finally:
        #     pass
        await async_engine.dispose()
    logger.debug(f"end send_notifications")


def frequent_jobs(test_mode: bool = False):
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(send_notifications())
    finally:
        loop.close()
    
2023-03-26 20:59:31.867 | ERROR    | asyncio.base_events:default_exception_handler:1744 - Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
transport: <_SelectorSocketTransport closing fd=9>
Traceback (most recent call last):

  File "/usr/lib/python3.10/asyncio/selector_events.py", line 916, in write
    n = self._sock.send(data)
        │    │     │    └ b'\x17\x03\x03\x00\x13q\x99\x0f\xeb\x94}#\x9ct\xae\x07 (\xee6\xcc\xf0\xbb\xbf'
        │    │     └ <method 'send' of '_socket.socket' objects>
        │    └ <socket.socket fd=-1, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6>
        └ <_SelectorSocketTransport closing fd=9>

OSError: [Errno 9] Bad file descriptor


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlmodel/orm/session.py", line 60, in exec
    results = super().execute(
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1712, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
             │    │           │          │             └ immutabledict({'prebuffer_rows': True, '_result_disable_adapt_to_context': True, 'future_result': True})
             │    │           │          └ None
             │    │           └ <sqlmodel.sql.expression.SelectOfScalar object at 0x7fe1e74478b0>
             │    └ <function Connection._execute_20 at 0x7fe1fcdd1a20>
             └ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
           │    │     │             │               └ immutabledict({'prebuffer_rows': True, '_result_disable_adapt_to_context': True, 'future_result': True})
           │    │     │             └ immutabledict({})
           │    │     └ ({},)
           │    └ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
           └ <bound method ClauseElement._execute_on_connection of <sqlmodel.sql.expression.SelectOfScalar object at 0x7fe1e74478b0>>
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
    return connection._execute_clauseelement(
           │          └ <function Connection._execute_clauseelement at 0x7fe1fcdd1870>
           └ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
          │    └ <function Connection._execute_context at 0x7fe1fcdd1b40>
          └ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1916, in _execute_context
    result = context._setup_result_proxy()
             │       └ <function DefaultExecutionContext._setup_result_proxy at 0x7fe1fc967e20>
             └ <sqlalchemy.dialects.postgresql.asyncpg.PGExecutionContext_asyncpg object at 0x7fe1e746ee00>
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1477, in _setup_result_proxy
    result = _cursor.CursorResult(
             │       └ <class 'sqlalchemy.engine.cursor.CursorResult'>
             └ <module 'sqlalchemy.engine.cursor' from '/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/s...
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 1263, in __init__
    metadata = self._init_metadata(context, cursor_description)
               │    │              │        └ [('id', 23, None, None, None, None, None), ('created_at', 1114, None, None, None, None, None), ('cost_centre_id', 23, None, N...
               │    │              └ <sqlalchemy.dialects.postgresql.asyncpg.PGExecutionContext_asyncpg object at 0x7fe1e746ee00>
               │    └ <function BaseCursorResult._init_metadata at 0x7fe1fc93db40>
               └ <sqlalchemy.engine.cursor.CursorResult object at 0x7fe1e746ffa0>
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 1292, in _init_metadata
    metadata = self._cursor_metadata(self, cursor_description)
               │    │                │     └ [('id', 23, None, None, None, None, None), ('created_at', 1114, None, None, None, None, None), ('cost_centre_id', 23, None, N...
               │    │                └ <sqlalchemy.engine.cursor.CursorResult object at 0x7fe1e746ffa0>
               │    └ <class 'sqlalchemy.engine.cursor.CursorResultMetaData'>
               └ <sqlalchemy.engine.cursor.CursorResult object at 0x7fe1e746ffa0>
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 261, in __init__
    [
  File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 262, in <listcomp>
    (obj_elem, metadata_entry)
     │         └ (18, 18, (Column('has_md_shift', Boolean(), table=<anon_1>, nullable=False), 'has_md_shift', 'has_md_shift', '%(1406082278072...
     └ 'has_md_shift'
  File "/usr/lib/python3.10/asyncio/sslproto.py", line 321, in __del__
    self.close()
    │    └ <function _SSLProtocolTransport.close at 0x7fe1ff0fb250>
    └ <asyncio.sslproto._SSLProtocolTransport object at 0x7fe1e740b640>
  File "/usr/lib/python3.10/asyncio/sslproto.py", line 316, in close
    self._ssl_protocol._start_shutdown()
    │    │             └ <function SSLProtocol._start_shutdown at 0x7fe1ff0fbeb0>
    │    └ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
    └ <asyncio.sslproto._SSLProtocolTransport object at 0x7fe1e740b640>
  File "/usr/lib/python3.10/asyncio/sslproto.py", line 599, in _start_shutdown
    self._write_appdata(b'')
    │    └ <function SSLProtocol._write_appdata at 0x7fe1ff0fbf40>
    └ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
  File "/usr/lib/python3.10/asyncio/sslproto.py", line 604, in _write_appdata
    self._process_write_backlog()
    │    └ <function SSLProtocol._process_write_backlog at 0x7fe1fe9101f0>
    └ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
> File "/usr/lib/python3.10/asyncio/sslproto.py", line 690, in _process_write_backlog
    self._transport.write(chunk)
    │    │          │     └ b'\x17\x03\x03\x00\x13q\x99\x0f\xeb\x94}#\x9ct\xae\x07 (\xee6\xcc\xf0\xbb\xbf'
    │    │          └ <function _SelectorSocketTransport.write at 0x7fe1fe9cc430>
    │    └ <_SelectorSocketTransport closing fd=9>
    └ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 922, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
    │    └ <function _SelectorTransport._fatal_error at 0x7fe1fe9a7be0>
    └ <_SelectorSocketTransport closing fd=9>
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 717, in _fatal_error
    self._force_close(exc)
    │    │            └ OSError(9, 'Bad file descriptor')
    │    └ <function _SelectorTransport._force_close at 0x7fe1fe9a7c70>
    └ <_SelectorSocketTransport closing fd=9>
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 729, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
    │    │               │    │                      └ OSError(9, 'Bad file descriptor')
    │    │               │    └ <function _SelectorSocketTransport._call_connection_lost at 0x7fe1fe9cc670>
    │    │               └ <_SelectorSocketTransport closing fd=9>
    │    └ <member '_loop' of '_FlowControlMixin' objects>
    └ <_SelectorSocketTransport closing fd=9>
  File "/usr/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
    self._check_closed()
    │    └ <function BaseEventLoop._check_closed at 0x7fe1fe976320>
    └ <_UnixSelectorEventLoop running=False closed=True debug=False>
  File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')

RuntimeError: Event loop is closed