Hi all,
I was hoping the community could help me with an error I am getting (I am a newbie, so any help would be much appreciated).
I have apscheduler running function “frequent_jobs” every 2 minutes, to do a few sync methods and use the async method below to send messages through telegram bot. Every now and then, I face an error (also below). Any idea why this might be happening?
async def send_one_message(semaphore: asyncio.Semaphore, message_id, session):
if (message_id is not None):
async with semaphore:
try:
message = (await session.exec(select(NotificationMessage).where(NotificationMessage.id == message_id))).one()
await telegram_bot.send_job_reminder_message(message.telegram_chat_id , message.text)
await session.delete(message)
await asyncio.sleep(5) #throttle a bit to try and avoid telegram api issues
except Exception as err:
logger.error(f"Error: {err.__class__.__name__}, Message ID {message_id}")
logger.error(err)
async def send_notifications():
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm.session import sessionmaker
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.pool import NullPool
async_engine = create_async_engine(get_settings().db_url_async, poolclass=NullPool)
async_session = sessionmaker(async_engine, expire_on_commit=True, class_=AsyncSession)
semaphore = asyncio.Semaphore(20)
logger.debug(f"start send_notifications")
message_sending = []
try:
async with async_session() as session:
for message in (await session.exec(select(NotificationMessage))).all():
message_sending.append(asyncio.create_task(send_one_message(semaphore, message.id, session)))
if len(message_sending) > 0:
await asyncio.wait(message_sending)
await session.commit()
finally:
# try:
# await telegram_bot.bot.close_session() #close session after each batch. this causes lots of exceptions NoneType within telebot
# finally:
# pass
await async_engine.dispose()
logger.debug(f"end send_notifications")
def frequent_jobs(test_mode: bool = False):
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(send_notifications())
finally:
loop.close()
2023-03-26 20:59:31.867 | ERROR | asyncio.base_events:default_exception_handler:1744 - Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
transport: <_SelectorSocketTransport closing fd=9>
Traceback (most recent call last):
File "/usr/lib/python3.10/asyncio/selector_events.py", line 916, in write
n = self._sock.send(data)
│ │ │ └ b'\x17\x03\x03\x00\x13q\x99\x0f\xeb\x94}#\x9ct\xae\x07 (\xee6\xcc\xf0\xbb\xbf'
│ │ └ <method 'send' of '_socket.socket' objects>
│ └ <socket.socket fd=-1, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6>
└ <_SelectorSocketTransport closing fd=9>
OSError: [Errno 9] Bad file descriptor
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlmodel/orm/session.py", line 60, in exec
results = super().execute(
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1712, in execute
result = conn._execute_20(statement, params or {}, execution_options)
│ │ │ │ └ immutabledict({'prebuffer_rows': True, '_result_disable_adapt_to_context': True, 'future_result': True})
│ │ │ └ None
│ │ └ <sqlmodel.sql.expression.SelectOfScalar object at 0x7fe1e74478b0>
│ └ <function Connection._execute_20 at 0x7fe1fcdd1a20>
└ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
│ │ │ │ └ immutabledict({'prebuffer_rows': True, '_result_disable_adapt_to_context': True, 'future_result': True})
│ │ │ └ immutabledict({})
│ │ └ ({},)
│ └ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
└ <bound method ClauseElement._execute_on_connection of <sqlmodel.sql.expression.SelectOfScalar object at 0x7fe1e74478b0>>
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
return connection._execute_clauseelement(
│ └ <function Connection._execute_clauseelement at 0x7fe1fcdd1870>
└ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
│ └ <function Connection._execute_context at 0x7fe1fcdd1b40>
└ <sqlalchemy.future.engine.Connection object at 0x7fe1e7447b50>
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1916, in _execute_context
result = context._setup_result_proxy()
│ └ <function DefaultExecutionContext._setup_result_proxy at 0x7fe1fc967e20>
└ <sqlalchemy.dialects.postgresql.asyncpg.PGExecutionContext_asyncpg object at 0x7fe1e746ee00>
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 1477, in _setup_result_proxy
result = _cursor.CursorResult(
│ └ <class 'sqlalchemy.engine.cursor.CursorResult'>
└ <module 'sqlalchemy.engine.cursor' from '/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/s...
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 1263, in __init__
metadata = self._init_metadata(context, cursor_description)
│ │ │ └ [('id', 23, None, None, None, None, None), ('created_at', 1114, None, None, None, None, None), ('cost_centre_id', 23, None, N...
│ │ └ <sqlalchemy.dialects.postgresql.asyncpg.PGExecutionContext_asyncpg object at 0x7fe1e746ee00>
│ └ <function BaseCursorResult._init_metadata at 0x7fe1fc93db40>
└ <sqlalchemy.engine.cursor.CursorResult object at 0x7fe1e746ffa0>
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 1292, in _init_metadata
metadata = self._cursor_metadata(self, cursor_description)
│ │ │ └ [('id', 23, None, None, None, None, None), ('created_at', 1114, None, None, None, None, None), ('cost_centre_id', 23, None, N...
│ │ └ <sqlalchemy.engine.cursor.CursorResult object at 0x7fe1e746ffa0>
│ └ <class 'sqlalchemy.engine.cursor.CursorResultMetaData'>
└ <sqlalchemy.engine.cursor.CursorResult object at 0x7fe1e746ffa0>
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 261, in __init__
[
File "/home/deploy/.cache/pypoetry/virtualenvs/jobtracker-NdGBZN7D-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 262, in <listcomp>
(obj_elem, metadata_entry)
│ └ (18, 18, (Column('has_md_shift', Boolean(), table=<anon_1>, nullable=False), 'has_md_shift', 'has_md_shift', '%(1406082278072...
└ 'has_md_shift'
File "/usr/lib/python3.10/asyncio/sslproto.py", line 321, in __del__
self.close()
│ └ <function _SSLProtocolTransport.close at 0x7fe1ff0fb250>
└ <asyncio.sslproto._SSLProtocolTransport object at 0x7fe1e740b640>
File "/usr/lib/python3.10/asyncio/sslproto.py", line 316, in close
self._ssl_protocol._start_shutdown()
│ │ └ <function SSLProtocol._start_shutdown at 0x7fe1ff0fbeb0>
│ └ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
└ <asyncio.sslproto._SSLProtocolTransport object at 0x7fe1e740b640>
File "/usr/lib/python3.10/asyncio/sslproto.py", line 599, in _start_shutdown
self._write_appdata(b'')
│ └ <function SSLProtocol._write_appdata at 0x7fe1ff0fbf40>
└ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
File "/usr/lib/python3.10/asyncio/sslproto.py", line 604, in _write_appdata
self._process_write_backlog()
│ └ <function SSLProtocol._process_write_backlog at 0x7fe1fe9101f0>
└ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
> File "/usr/lib/python3.10/asyncio/sslproto.py", line 690, in _process_write_backlog
self._transport.write(chunk)
│ │ │ └ b'\x17\x03\x03\x00\x13q\x99\x0f\xeb\x94}#\x9ct\xae\x07 (\xee6\xcc\xf0\xbb\xbf'
│ │ └ <function _SelectorSocketTransport.write at 0x7fe1fe9cc430>
│ └ <_SelectorSocketTransport closing fd=9>
└ <asyncio.sslproto.SSLProtocol object at 0x7fe1e73b7eb0>
File "/usr/lib/python3.10/asyncio/selector_events.py", line 922, in write
self._fatal_error(exc, 'Fatal write error on socket transport')
│ └ <function _SelectorTransport._fatal_error at 0x7fe1fe9a7be0>
└ <_SelectorSocketTransport closing fd=9>
File "/usr/lib/python3.10/asyncio/selector_events.py", line 717, in _fatal_error
self._force_close(exc)
│ │ └ OSError(9, 'Bad file descriptor')
│ └ <function _SelectorTransport._force_close at 0x7fe1fe9a7c70>
└ <_SelectorSocketTransport closing fd=9>
File "/usr/lib/python3.10/asyncio/selector_events.py", line 729, in _force_close
self._loop.call_soon(self._call_connection_lost, exc)
│ │ │ │ └ OSError(9, 'Bad file descriptor')
│ │ │ └ <function _SelectorSocketTransport._call_connection_lost at 0x7fe1fe9cc670>
│ │ └ <_SelectorSocketTransport closing fd=9>
│ └ <member '_loop' of '_FlowControlMixin' objects>
└ <_SelectorSocketTransport closing fd=9>
File "/usr/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
self._check_closed()
│ └ <function BaseEventLoop._check_closed at 0x7fe1fe976320>
└ <_UnixSelectorEventLoop running=False closed=True debug=False>
File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed