This is my first time working with asyncio and I can’t understand why this task is blocking. Everything is awaited that should be awaited. Please help? Using Python 3.10.6 on Ubuntu 22.04.2 LTS.
#!/usr/bin/env python
import ssl
import asyncio
from aiomysql import create_pool, DictCursor
from config import conf
user = conf['MySQL SecureCoop user']
password = conf['MySQL SecureCoop password']
host = conf['MySQL host']
# TODO Move into config
db = 'securecoop'
cafile = '/etc/nagios/ssl/cacert.pem'
async def connect():
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
ssl_ctx.check_hostname = False
ssl_ctx.load_verify_locations(cafile=cafile)
# TODO Move into config
init_command = """\
SET autocommit = 1;
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
"""
# TODO Move into config
autocommit = True
loop = asyncio.get_event_loop()
# TODO Move into config
maxsize = 100
pool = await create_pool(user=user,
db=db,
host=host,
password=password,
loop=loop,
ssl=ssl_ctx,
init_command=init_command,
maxsize=maxsize,
autocommit=autocommit)
return pool
async def query(sql, params: tuple = tuple()):
pool = await connect()
async with pool.acquire() as conn:
cur = await conn.cursor(DictCursor)
await cur.execute(sql, params)
return cur
async def outgoing_worker():
while True:
sql = 'SELECT * FROM `websocket_outgoing_queue`'
results = await query(sql)
print(repr(results))
for row in await results.fetchall():
serial = row.get('serial', None)
sql = '''DELETE FROM `websocket_outgoing_queue`
WHERE `serial` = %s'''
params = (serial,)
await query(sql, params)
values = row.get('values', None)
print(f'ws.send(json.dumps({repr(values)}))')
print('About to await asyncio.sleep(1)')
await asyncio.sleep(1)
async def asyncio_create_queue_tasks():
outgoing_worker_task = asyncio.create_task(outgoing_worker())
await outgoing_worker_task
print('Should reach here inside asyncio_create_queue_tasks()')
loop = asyncio.get_event_loop()
print('About to asyncio_create_queue_tasks()')
asyncio.run(asyncio_create_queue_tasks())
print('Should reach here inside the main routine')
loop.run_forever()
Output:
(SecureCoop) root@api02:/SecureCoop# ./test_asyncio_db_async.py
/SecureCoop/./test_asyncio_db_async.py:81: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
About to asyncio_create_queue_tasks()
<aiomysql.cursors.DictCursor object at 0x7fe1ecd88e50>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd88f10>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd8a500>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd8b220>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd88fa0>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd89f30>
About to await asyncio.sleep(1)
...