Asynchronous Pika (RabbitMQ) issues

I’ve been having quite some difficulty in getting asynchronous communications working using Python and Pika.

I believe that I’ve narrowed it down to…

self.channel.confirm_delivery()

… is blocking for some reason. Here’s my entire code, with some notes underneath. See the on_channel_open function for the self.channel.confirm_delivery() line of code.

import json
import logging
import pika
import time
from datetime import datetime
from src.io.io_handler import IOHandler
import threading

class RabbitMQIOHandler(IOHandler):

    def __init__(self, rabbitmq_host="localhost", inbox_name=None, message_callback=None):
        self.message_callback = message_callback
        self.inbox_name = inbox_name if inbox_name else "default_inbox"
        self.channel = None
        self.connection = None
        self.rabbitmq_host = rabbitmq_host

        # Message callback must not be None
        if not self.message_callback:
            raise ValueError(f"Message callback must not be None from inbox_name: {self.inbox_name}")
        else:
            print(f"RabbitMQIOHandler::message_callback OK for inbox_name: {self.inbox_name}", flush=True)

        # Start the I/O loop in a separate thread
        self.io_loop_thread = threading.Thread(target=self.start_io_loop, daemon=True)
        self.io_loop_thread.start()

        self.channel_ready = threading.Event()

    def start_io_loop(self):

        print(f"RabbitMQIOHandler::start_io_loop::pika.SelectConnection", flush=True)

        # Initialize the RabbitMQ connection
        self.connection = pika.SelectConnection(
            pika.ConnectionParameters(self.rabbitmq_host),
            on_open_callback=self.on_connection_open
        )

        print(f"RabbitMQIOHandler::start_io_loop::Starting I/O loop for inbox_name: {self.inbox_name}", flush=True)

        self.connection.ioloop.start()

    def on_connection_open(self, unused_connection):
        print(f"RabbitMQIOHandler::on_connection_open::Connection called for inbox_name: {self.inbox_name}", flush=True)
        
        # This creetes a channel and calls on_channel_open with the channel as an argument
        self.connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel):
        # Called when the RabbitMQ channel is opened
        print(f"RabbitMQIOHandler::on_channel_open called for inbox_name: {self.inbox_name}", flush=True)
        self.channel = channel
        self.channel.queue_declare(queue=self.inbox_name, callback=self.on_queue_declared)
        print(f"got here", flush=True)
        self.channel.confirm_delivery()
        print(f"got here 2", flush=True)
        self.channel_ready.set()
        print(f"got here 3", flush=True)
        print(f"RabbitMQIOHandler::on_channel_open completed for inbox_name: {self.inbox_name}", flush=True)
        

    def on_queue_declared(self, method_frame):

        print(f"rabbit_mq_io_handler::on_queue_declared::Queue declared: {self.inbox_name}", flush=True)

        # Called when the RabbitMQ queue is declared
        self.channel.basic_consume(
            queue=self.inbox_name,
            on_message_callback=self.on_message,
            auto_ack=True
        )

    def on_message(self, ch, method, properties, body):
        
        print(f"rabbit_mq_io_handler::on_message::Received message: {body}")

        # Called when a message is received
        if self.message_callback:
            message = json.loads(body)
            self.message_callback(message)
        else:
            logging.warning("No message callback provided.")

    def send_output(self, content: str, sender_name: str, return_address: str, recipient_inbox: str, message_type: str = "message") -> None:
        message_data = {
            "content": content,
            "sender": sender_name,
            "return_address": return_address,
            "timestamp": time.time(),
            "type": message_type
        }

        print(f"Sending message to '{recipient_inbox}': {json.dumps(message_data)}")

        # Wait until the channel is ready
        self.channel_ready.wait()

        print(f"rabbit_mq_io_handler::send_output::Channel ready for inbox_name: {self.inbox_name}")

        # Publish the message to the recipient's inbox
        try:
            self.channel.basic_publish(exchange='',
                                    routing_key=recipient_inbox,
                                    body=json.dumps(message_data),
                                    mandatory=True)
        except pika.exceptions.UnroutableError:
            print("Message could not be confirmed")


    def graceful_shutdown(self):

        # logging.info("Waiting for all messages to be published before closing...")

        # Ensure all messages are published before closing
        # This might need additional logic to ensure the completion of message publishing
        # self.channel.wait_for_confirms()

        logging.info("Shutting down RabbitMQIOHandler")
        self.connection.close()

    def purge_inbox(self):
        print(f"Purging messages in '{self.inbox_name}'...")
        self.channel.queue_purge(queue=self.inbox_name)

:information_source: When I try to send a message, “got here” prints out, but “got here 2” does not.

        print(f"got here", flush=True)
        self.channel.confirm_delivery()
        print(f"got here 2", flush=True)  # this code is never reached!

:page_facing_up: Here’s the script that sends the message:

import time
from src.io.rabbit_mq_io_handler import RabbitMQIOHandler

def message_callback(message):
    # This function will process received messages
    # For now, it just prints the message
    print(f"Received message: {message}")

# Create an instance of RabbitMQIOHandler
rabbitmq_io_handler = RabbitMQIOHandler(
    rabbitmq_host="localhost",
    inbox_name="guestbook_service",  # Assuming this is the queue you want to use
    message_callback=message_callback
)

# Just in case, wait a moment for the connection to be established
time.sleep(3) 

# Send a message using the send_output method of RabbitMQIOHandler
rabbitmq_io_handler.send_output(
    content="Hello, world 1!",
    sender_name="sender1",
    return_address="reply_queue",
    recipient_inbox="guestbook_service"
)

# Wait for a bit to ensure the message is sent
time.sleep(3)

# Gracefully shutdown
rabbitmq_io_handler.graceful_shutdown()

And here’s the output:

C:\Code>python send2.py
RabbitMQIOHandler::message_callback OK for inbox_name: guestbook_service
RabbitMQIOHandler::start_io_loop::pika.SelectConnection
RabbitMQIOHandler::start_io_loop::Starting I/O loop for inbox_name: guestbook_service
RabbitMQIOHandler::on_connection_open::Connection called for inbox_name: guestbook_service
RabbitMQIOHandler::on_channel_open called for inbox_name: guestbook_service
got here
Sending message to ‘guestbook_service’: {“content”: “Hello, world 1!”, “sender”: “sender1”, “return_address”: “reply_queue”, “timestamp”: 1710462683.881744, “type”: “message”}

Although it says "Sending message to ‘guestbook_service’…

This line never unblocks, and the message is never sent.

self.channel_ready.wait()

(Yes, I have two different parts of the code that are blocking, and that’s possible because there’s threading involved.)

I’ve been struggling with this for 2 days and would love some help!

fyi …

you have the following import but it is never used.

from datetime import datetime
1 Like

Hi,

please reference the following code snippet from your code:

        self.channel.queue_declare(queue = self.inbox_name, callback = self.on_queue_declared)
        
        print(f"got here", flush = True)
        self.channel.confirm_delivery()
        print(f"got here 2", flush = True)
        
        self.channel_ready.set()
        print(f"got here 3", flush=True)
        print(f"RabbitMQIOHandler::on_channel_open completed for inbox_name: {self.inbox_name}", flush=True)
        
    def on_queue_declared(self, method_frame):

        print(f"rabbit_mq_io_handler::on_queue_declared::Queue declared: {self.inbox_name}", flush=True)

        # Called when the RabbitMQ queue is declared
        self.channel.basic_consume(
            queue = self.inbox_name,
            on_message_callback = self.on_message,
            auto_ack = True
        )

Note that the callback method is:

on_queue_declared(self, method_frame):

However, in the body of this method, the parameter method_frame is never used. Was this an oversight?

Hi !

Just checked Pika’s documentation, it seems that Channel.confirm_delivery() is supposed to take arguments:
https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.confirm_delivery

Maybe that’s the reason why the call is blocking ?

Oh, very possible! Thanks you! I already made significant updates to my code since I posted just a day or so ago, so it will be difficult for me to test. But I bet you’re correct!