Running multiple asyncio coroutines together

Can I get a review on whether I am getting things done correctly? Or there’s a newer way to achieve the same thing, but I somehow missed?

I don’t see any code. Where is the code?

It appears to be behind the link. Perfectly understandable IMO if you’d prefer it to be reproduced here, though.

1 Like

The final iteration is this

from aiohttp import web
import discord
from telegram.ext import ApplicationBuilder

client = discord.Client(..)
async def discord_run(exit_event: asyncio.Event):
    global client
 
    logger.info("DISCORD: Starting")
    asyncio.create_task(client.start(os.environ["DISCORD_TOKEN"]))
 
    await exit_event.wait()
 
    logger.info("DISCORD: Stopping")
    await client.close()
 
async def telegram_run(exit_event: asyncio.Event) -> None:
    application = ApplicationBuilder().token(os.environ["TELEGRAM_TOKEN"]).build()
 
    # -- snipped handlers registration --
 
    asyncio.create_task(
        application.bot.set_webhook(
            f'{os.environ["WEBHOOK_URL"]}/telegram',
            allowed_updates=Update.ALL_TYPES,
            secret_token=settings.SECRET_TOKEN,
        )
    )
 
    async with application:
        logger.info("TELEGRAM: Starting")
        await application.start()
 
        await exit_event.wait()
 
        logger.info("TELEGRAM: Stopping")
        await application.stop()
 
async def web_run(exit_event: asyncio.Event) -> None:
    application = web.Application()
 
    logger.info("WEBHOOK: Starting", url=os.environ["WEBHOOK_URL"])
    web_runner = web.AppRunner(application)
    await web_runner.setup()
 
    web_site = web.TCPSite(web_runner, port=8080)
    await web_site.start()
 
    await exit_event.wait()
 
    logger.info("WEBHOOK: Stopping")
    await web_site.stop()
    await web_runner.cleanup()
 
def exception_handler(
    loop: asyncio.AbstractEventLoop, context: dict[str, Any], exit_event: asyncio.Event
) -> None:
    message = context.get("exception", context["message"])
    logger.error("Caught exception", message=message)
 
    logger.error("MAIN: Shutting down")
    asyncio.create_task(shutdown_handler(loop, exit_event))
 
 
async def shutdown_handler(
    loop: asyncio.AbstractEventLoop, exit_event: asyncio.Event
) -> None:
    logger.info("MAIN: Sending exit event")
    exit_event.set()
 
    await asyncio.sleep(5)
 
    tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]
 
    for task in tasks:
        task.cancel()
 
    await asyncio.gather(*tasks, return_exceptions=True)
 
    loop.stop()
 
def main() -> None:
    loop, exit_event = asyncio.get_event_loop(), asyncio.Event()
 
    loop.set_exception_handler(partial(exception_handler, exit_event=exit_event))
 
    for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            s,
            lambda: asyncio.create_task(shutdown_handler(loop, exit_event)),
        )
 
    try:
        loop.create_task(telegram_run(exit_event))
        loop.create_task(discord_run(exit_event))
        loop.create_task(web_run(exit_event, web_init(telegram.application)))
        loop.run_forever()
    finally:
        loop.close()
 
 
if __name__ == "__main__":
    main()

Just curious if this is the right way to run multiple long-running coroutines together. The blog post details how I reached this.

I would run each client in separate threads.
Here is a similar example to: discord.py · PyPI - Quickstart

import asyncio
import threading
import time

class DummyClient:
    def __init__(self):
        self.loop = asyncio.new_event_loop()
        self.user = 'DummyUser'
    
    async def on_ready(self):
        print(f'We have logged in as {self.user}')
    
    async def on_message(self, message):
        if message.author == self.user:
            return
        
        if message.content.startswith('$hello'):
            await self.send_message('Hello!')
    
    async def send_message(self, content):
        print(f'Message sent: {content}')
    
    async def simulate_message(self, message):
        await self.on_message(message)
    
    async def print_periodically(self):
        while True:
            print('Print from DummyClient')
            await asyncio.sleep(2.0)  # Print every 2 seconds
    
    def run(self):
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.on_ready())
        
        # Simulate receiving messages
        self.loop.create_task(self.simulate_message(DummyMessage('User1', '$hello')))
        self.loop.create_task(self.simulate_message(DummyMessage('DummyUser', '$hello')))
        
        # Start printing periodically
        self.loop.create_task(self.print_periodically())
        
        # Keep the loop running
        self.loop.run_forever()

class DummyMessage:
    def __init__(self, author, content):
        self.author = author
        self.content = content

def run_dummy_client():
    client = DummyClient()
    client.run()

# Create a thread to run the dummy client
client_thread = threading.Thread(target=run_dummy_client)

# Start the thread
client_thread.start()

# Now you can continue with other tasks in the main thread
print("Dummy async client is running in a separate thread.")

# This loop runs in the main thread
while True:
    time.sleep(1.0)
    print('Hi!')

Result:

Dummy async client is running in a separate thread.
We have logged in as DummyUser
Message sent: Hello!
Print from DummyClient
Hi!
Hi!
Print from DummyClient
Hi!
Hi!
...
1 Like

so instead of keeping everything in the same thread in the same event-loop, better idea is to spread them into 3 different threads in production projects (where each thread has their own event loop)?

That would be straightforward and maintainable.