Asynchronous Pika (RabbitMQ) issues

I’ve been having quite some difficulty in getting asynchronous communications working using Python and Pika.

I believe that I’ve narrowed it down to…

… is blocking for some reason. Here’s my entire code, with some notes underneath. See the on_channel_open function for the line of code.

import json
import logging
import pika
import time
from datetime import datetime
from import IOHandler
import threading

class RabbitMQIOHandler(IOHandler):

    def __init__(self, rabbitmq_host="localhost", inbox_name=None, message_callback=None):
        self.message_callback = message_callback
        self.inbox_name = inbox_name if inbox_name else "default_inbox" = None
        self.connection = None
        self.rabbitmq_host = rabbitmq_host

        # Message callback must not be None
        if not self.message_callback:
            raise ValueError(f"Message callback must not be None from inbox_name: {self.inbox_name}")
            print(f"RabbitMQIOHandler::message_callback OK for inbox_name: {self.inbox_name}", flush=True)

        # Start the I/O loop in a separate thread
        self.io_loop_thread = threading.Thread(target=self.start_io_loop, daemon=True)

        self.channel_ready = threading.Event()

    def start_io_loop(self):

        print(f"RabbitMQIOHandler::start_io_loop::pika.SelectConnection", flush=True)

        # Initialize the RabbitMQ connection
        self.connection = pika.SelectConnection(

        print(f"RabbitMQIOHandler::start_io_loop::Starting I/O loop for inbox_name: {self.inbox_name}", flush=True)


    def on_connection_open(self, unused_connection):
        print(f"RabbitMQIOHandler::on_connection_open::Connection called for inbox_name: {self.inbox_name}", flush=True)
        # This creetes a channel and calls on_channel_open with the channel as an argument

    def on_channel_open(self, channel):
        # Called when the RabbitMQ channel is opened
        print(f"RabbitMQIOHandler::on_channel_open called for inbox_name: {self.inbox_name}", flush=True) = channel, callback=self.on_queue_declared)
        print(f"got here", flush=True)
        print(f"got here 2", flush=True)
        print(f"got here 3", flush=True)
        print(f"RabbitMQIOHandler::on_channel_open completed for inbox_name: {self.inbox_name}", flush=True)

    def on_queue_declared(self, method_frame):

        print(f"rabbit_mq_io_handler::on_queue_declared::Queue declared: {self.inbox_name}", flush=True)

        # Called when the RabbitMQ queue is declared

    def on_message(self, ch, method, properties, body):
        print(f"rabbit_mq_io_handler::on_message::Received message: {body}")

        # Called when a message is received
        if self.message_callback:
            message = json.loads(body)
            logging.warning("No message callback provided.")

    def send_output(self, content: str, sender_name: str, return_address: str, recipient_inbox: str, message_type: str = "message") -> None:
        message_data = {
            "content": content,
            "sender": sender_name,
            "return_address": return_address,
            "timestamp": time.time(),
            "type": message_type

        print(f"Sending message to '{recipient_inbox}': {json.dumps(message_data)}")

        # Wait until the channel is ready

        print(f"rabbit_mq_io_handler::send_output::Channel ready for inbox_name: {self.inbox_name}")

        # Publish the message to the recipient's inbox
        except pika.exceptions.UnroutableError:
            print("Message could not be confirmed")

    def graceful_shutdown(self):

        #"Waiting for all messages to be published before closing...")

        # Ensure all messages are published before closing
        # This might need additional logic to ensure the completion of message publishing
        #"Shutting down RabbitMQIOHandler")

    def purge_inbox(self):
        print(f"Purging messages in '{self.inbox_name}'...")

:information_source: When I try to send a message, “got here” prints out, but “got here 2” does not.

        print(f"got here", flush=True)
        print(f"got here 2", flush=True)  # this code is never reached!

:page_facing_up: Here’s the script that sends the message:

import time
from import RabbitMQIOHandler

def message_callback(message):
    # This function will process received messages
    # For now, it just prints the message
    print(f"Received message: {message}")

# Create an instance of RabbitMQIOHandler
rabbitmq_io_handler = RabbitMQIOHandler(
    inbox_name="guestbook_service",  # Assuming this is the queue you want to use

# Just in case, wait a moment for the connection to be established

# Send a message using the send_output method of RabbitMQIOHandler
    content="Hello, world 1!",

# Wait for a bit to ensure the message is sent

# Gracefully shutdown

And here’s the output:

RabbitMQIOHandler::message_callback OK for inbox_name: guestbook_service
RabbitMQIOHandler::start_io_loop::Starting I/O loop for inbox_name: guestbook_service
RabbitMQIOHandler::on_connection_open::Connection called for inbox_name: guestbook_service
RabbitMQIOHandler::on_channel_open called for inbox_name: guestbook_service
got here
Sending message to ‘guestbook_service’: {“content”: “Hello, world 1!”, “sender”: “sender1”, “return_address”: “reply_queue”, “timestamp”: 1710462683.881744, “type”: “message”}

Although it says "Sending message to ‘guestbook_service’…

This line never unblocks, and the message is never sent.


(Yes, I have two different parts of the code that are blocking, and that’s possible because there’s threading involved.)

I’ve been struggling with this for 2 days and would love some help!

fyi …

you have the following import but it is never used.

from datetime import datetime
1 Like


please reference the following code snippet from your code: = self.inbox_name, callback = self.on_queue_declared)
        print(f"got here", flush = True)
        print(f"got here 2", flush = True)
        print(f"got here 3", flush=True)
        print(f"RabbitMQIOHandler::on_channel_open completed for inbox_name: {self.inbox_name}", flush=True)
    def on_queue_declared(self, method_frame):

        print(f"rabbit_mq_io_handler::on_queue_declared::Queue declared: {self.inbox_name}", flush=True)

        # Called when the RabbitMQ queue is declared
            queue = self.inbox_name,
            on_message_callback = self.on_message,
            auto_ack = True

Note that the callback method is:

on_queue_declared(self, method_frame):

However, in the body of this method, the parameter method_frame is never used. Was this an oversight?

Hi !

Just checked Pika’s documentation, it seems that Channel.confirm_delivery() is supposed to take arguments:

Maybe that’s the reason why the call is blocking ?

Oh, very possible! Thanks you! I already made significant updates to my code since I posted just a day or so ago, so it will be difficult for me to test. But I bet you’re correct!