Can I get a review on whether I am getting things done correctly? Or there’s a newer way to achieve the same thing, but I somehow missed?
I don’t see any code. Where is the code?
It appears to be behind the link. Perfectly understandable IMO if you’d prefer it to be reproduced here, though.
1 Like
The final iteration is this
from aiohttp import web
import discord
from telegram.ext import ApplicationBuilder
client = discord.Client(..)
async def discord_run(exit_event: asyncio.Event):
global client
logger.info("DISCORD: Starting")
asyncio.create_task(client.start(os.environ["DISCORD_TOKEN"]))
await exit_event.wait()
logger.info("DISCORD: Stopping")
await client.close()
async def telegram_run(exit_event: asyncio.Event) -> None:
application = ApplicationBuilder().token(os.environ["TELEGRAM_TOKEN"]).build()
# -- snipped handlers registration --
asyncio.create_task(
application.bot.set_webhook(
f'{os.environ["WEBHOOK_URL"]}/telegram',
allowed_updates=Update.ALL_TYPES,
secret_token=settings.SECRET_TOKEN,
)
)
async with application:
logger.info("TELEGRAM: Starting")
await application.start()
await exit_event.wait()
logger.info("TELEGRAM: Stopping")
await application.stop()
async def web_run(exit_event: asyncio.Event) -> None:
application = web.Application()
logger.info("WEBHOOK: Starting", url=os.environ["WEBHOOK_URL"])
web_runner = web.AppRunner(application)
await web_runner.setup()
web_site = web.TCPSite(web_runner, port=8080)
await web_site.start()
await exit_event.wait()
logger.info("WEBHOOK: Stopping")
await web_site.stop()
await web_runner.cleanup()
def exception_handler(
loop: asyncio.AbstractEventLoop, context: dict[str, Any], exit_event: asyncio.Event
) -> None:
message = context.get("exception", context["message"])
logger.error("Caught exception", message=message)
logger.error("MAIN: Shutting down")
asyncio.create_task(shutdown_handler(loop, exit_event))
async def shutdown_handler(
loop: asyncio.AbstractEventLoop, exit_event: asyncio.Event
) -> None:
logger.info("MAIN: Sending exit event")
exit_event.set()
await asyncio.sleep(5)
tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
def main() -> None:
loop, exit_event = asyncio.get_event_loop(), asyncio.Event()
loop.set_exception_handler(partial(exception_handler, exit_event=exit_event))
for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
s,
lambda: asyncio.create_task(shutdown_handler(loop, exit_event)),
)
try:
loop.create_task(telegram_run(exit_event))
loop.create_task(discord_run(exit_event))
loop.create_task(web_run(exit_event, web_init(telegram.application)))
loop.run_forever()
finally:
loop.close()
if __name__ == "__main__":
main()
Just curious if this is the right way to run multiple long-running coroutines together. The blog post details how I reached this.
I would run each client in separate threads.
Here is a similar example to: discord.py · PyPI - Quickstart
import asyncio
import threading
import time
class DummyClient:
def __init__(self):
self.loop = asyncio.new_event_loop()
self.user = 'DummyUser'
async def on_ready(self):
print(f'We have logged in as {self.user}')
async def on_message(self, message):
if message.author == self.user:
return
if message.content.startswith('$hello'):
await self.send_message('Hello!')
async def send_message(self, content):
print(f'Message sent: {content}')
async def simulate_message(self, message):
await self.on_message(message)
async def print_periodically(self):
while True:
print('Print from DummyClient')
await asyncio.sleep(2.0) # Print every 2 seconds
def run(self):
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.on_ready())
# Simulate receiving messages
self.loop.create_task(self.simulate_message(DummyMessage('User1', '$hello')))
self.loop.create_task(self.simulate_message(DummyMessage('DummyUser', '$hello')))
# Start printing periodically
self.loop.create_task(self.print_periodically())
# Keep the loop running
self.loop.run_forever()
class DummyMessage:
def __init__(self, author, content):
self.author = author
self.content = content
def run_dummy_client():
client = DummyClient()
client.run()
# Create a thread to run the dummy client
client_thread = threading.Thread(target=run_dummy_client)
# Start the thread
client_thread.start()
# Now you can continue with other tasks in the main thread
print("Dummy async client is running in a separate thread.")
# This loop runs in the main thread
while True:
time.sleep(1.0)
print('Hi!')
Result:
Dummy async client is running in a separate thread.
We have logged in as DummyUser
Message sent: Hello!
Print from DummyClient
Hi!
Hi!
Print from DummyClient
Hi!
Hi!
...
1 Like
so instead of keeping everything in the same thread in the same event-loop, better idea is to spread them into 3 different threads in production projects (where each thread has their own event loop)?
That would be straightforward and maintainable.