RabbitMQ -> Flask -> Websockets?

Hello! ARGH! I spent all weekend trying to get this to work, and I would really be thankful for any help.

Here’s what I’m trying to do…

  • I have a RabbitMQ message bus. I want to be able to put a message on a specific queue and have it display on a webpage.

  • I also want to be able submit a message from the webpage to the message bus.

Really basic stuff. Does anyone have a link to some code that does this successfully?

Here’s a cut-down version of my HTML:

<!DOCTYPE html>
<html lang="en">
<body>
    <div id="content">
        <div id="messages"></div>
        <form id="message-form">
            <input type="text" id="recipient-queue" placeholder="Enter recipient queue" style="margin-bottom: 10px;">
            <input type="text" id="message-input" placeholder="Type a message...">
            <button type="submit">Send</button>
        </form>
    </div>
    <script src="https://cdn.socket.io/4.7.5/socket.io.min.js" integrity="sha384-2huaZvOR9iDzHqslqwpR87isEmrfxqyWOF7hr7BY6KG0+hVKLoEXMPUJw3ynWuhO" crossorigin="anonymous"></script>
    <script type="text/javascript">
        var socket = io.connect('http://' + document.domain + ':' + location.port);

        socket.on('new_message', function(msg) {
            console.log("got new message:")
            console.log(msg)
        });

        document.getElementById('message-form').addEventListener('submit', function(e) {
            e.preventDefault();
            var messageContent = document.getElementById('message-input').value;
            var recipientQueue = document.getElementById('recipient-queue').value; // Get the recipient queue from the input field

            fetch('/send_message', {
                method: 'POST',
                body: JSON.stringify({
                    content: messageContent,
                    recipient_queue: recipientQueue
                }),
                headers: {
                    'Content-Type': 'application/json'
                }
            }).then(response => response.json())
              .then(data => console.log(data));

            document.getElementById('message-input').value = ''; // Clear message input field
            document.getElementById('recipient-queue').value = ''; // Optionally clear recipient queue input field
        });

</script>
</body>
</html>

Thanks in advance,
Bret

Basic, perhaps, but very definitely two completely separate parts. So I would recommend looking into them independently. Start with RabbitMQ and have your program print something to the console when you get the message. This requires no knowledge of Flask, HTML, JS, WebSockets, etc - it’s just RabbitMQ.

Once that’s sorted out - or if you get stuck and want to try something different - it’s time to look at the websocket. Make a very VERY simple websocket server that, whenever something connects, sends a message to that client. Pretty easy right? But it’s also enough to test out the Flask side of things.

Side note: You really don’t need socket.io here. All web browsers come with WebSocket support, and socket.io only adds an extra dependency that has to be loaded. Here’s WebSocket reference documentation to get you started, and a WebSocket description and guide to give you an overview if you’re not familiar with how they work.

I really appreciate the thoughtful reply. Where I seem to be stuck is that RabbitMQ (using Pika) blocks when reading, and if I attempt to use threads to make it unblocking, things get a bit wonky. I know that’s not super helpful in describing my issue, but I have to run to work. I’ll try to reply again later with more details.

In the meantime, I’ll post some of my flask code:

    print(f"Web interface emitting message: {message_data}")
    socketio.emit('new_message', message_data)

Here, :point_up_2: the print statement prints, but the socket message doesn’t seem to reach the client.

import sys
import os
import json
from flask import Flask, request, jsonify, render_template
from flask_socketio import SocketIO
from src.io.rabbit_mq_io_handler import RabbitMQIOHandler

sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)

def handle_message(body):
    print(f"Web interface received message: {body}")

    # Check if the body is a byte string and decode it if needed
    if isinstance(body, bytes):
        decoded_body = body.decode()
    else:
        decoded_body = body

    # Check if the decoded body is a string that needs to be parsed as JSON
    if isinstance(decoded_body, str):
        try:
            message_data = json.loads(decoded_body)
        except json.JSONDecodeError:
            message_data = {'content': decoded_body}
    else:
        # If the decoded body is already a dictionary, use it directly
        message_data = decoded_body

    # Emitting the message to the frontend using Flask-SocketIO
    print(f"Web interface emitting message: {message_data}")
    socketio.emit('new_message', message_data)

def rabbitmq_message_listener():
    rabbitmq_listener = RabbitMQIOHandler(
        rabbitmq_host="localhost",
        queue_name="web_interface",
        message_callback=handle_message
    )
    rabbitmq_listener.start_consuming()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/send_message', methods=['POST'])
def send_message():
    data = request.json
    message = data['content']
    recipient_queue = data['recipient_queue']
    
    # Initialize a new RabbitMQIOHandler instance for sending messages
    send_io_handler = RabbitMQIOHandler(
        rabbitmq_host="localhost",
        queue_name=recipient_queue,
        declare_queue=False
    )
    
    # Send message to RabbitMQ
    send_io_handler.send_output(
        message_type="message",
        sender_name="user",
        return_address="user_queue",
        recipient_queue=recipient_queue,
        content=message
    )

    return jsonify({"status": "Message sent"})

@socketio.on('connect')
def handle_connect():
    print('Client connected')
    socketio.emit('new_message', {'content': 'Welcome to the real-time message stream!'})

@socketio.on('disconnect')
def handle_disconnect():
    print('Client disconnected')

if __name__ == '__main__':
    socketio.start_background_task(target=rabbitmq_message_listener)
    socketio.run(app, debug=True)

Actually that’s a really good explanation of the problem - thank you!

So, the problem is the need for asynchronicity. Threads are an option. Another is asyncio. Unfortunately, I’m not familiar with RabbitMQ so I’ll have to let someone else weigh in with the details, but what you’re looking for is “non-blocking” or “asynchronous” RabbitMQ usage.

In my limited experience of RabbitMQ (some years ago now) the target queue has to be declared by the sender. Did you try setting declare_queue=True?

Hi @sr-murthy,

Thanks for the information. I’ll keep that in mind, but in the meantime, I believe that I identified my main issue.

The issue turned out to be that when socket.io is run in debug mode…

def background_task():
    counter = 0
    while True:
        time.sleep(2)
        socketio.emit('new_message', {'content': 'Hello from the server!', 'counter': counter})
        print(f"Message sent to web interface: {counter}")
        counter += 1

if __name__ == '__main__':
    socketio.start_background_task(target=background_task)
    socketio.run(app, debug=True)

It spawns two copies of the background task function and runs each. I was lucky enough to have added a counter to my background task for testing, and noticed this behavior when it started printing…

Message sent to web interface: 1
Message sent to web interface: 1
Message sent to web interface: 2
Message sent to web interface: 2
Message sent to web interface: 3
Message sent to web interface: 3
etc…

Once I corrected that, everything started going a lot smoother.

Here’s my entire flask app, in case it’s helpful for anyone:

import sys
import os
import json
from flask import Flask, request, jsonify, render_template
from flask_socketio import SocketIO
from src.io.rabbit_mq_io_handler import RabbitMQIOHandler

sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)

def handle_message(body):
    print(f"Web interface received message: {body}")

    # Attempt to decode body if it's a byte string, otherwise keep it as is
    decoded_body = body.decode() if isinstance(body, bytes) else body

    # Attempt to parse the decoded body as JSON, otherwise use it directly
    try:
        message_data = json.loads(decoded_body) if isinstance(decoded_body, str) else decoded_body
    except json.JSONDecodeError:
        message_data = {'content': decoded_body}

    # Emitting the message to the frontend using Flask-SocketIO
    print(f"Web interface emitting message: {message_data}")
    socketio.emit('new_message', message_data)

def rabbitmq_message_listener():
    rabbitmq_listener = RabbitMQIOHandler(
        rabbitmq_host="localhost",
        queue_name="web_interface",
        message_callback=handle_message
    )
    rabbitmq_listener.start_consuming()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/send_message', methods=['POST'])
def send_message():
    data = request.json
    message = data['content']
    recipient_queue = data['recipient_queue']
    
    # Initialize a new RabbitMQIOHandler instance for sending messages
    send_io_handler = RabbitMQIOHandler(
        rabbitmq_host="localhost",
        queue_name=recipient_queue,
        declare_queue=False
    )
    
    # Send message to RabbitMQ
    send_io_handler.send_output(
        message_type="message",
        sender_name="user",
        return_address="user_queue",
        recipient_queue=recipient_queue,
        content=message
    )

    return jsonify({"status": "Message sent"})

@socketio.on('connect')
def handle_connect():
    print('Client connected')
    socketio.emit('new_message', {'content': 'Welcome to the real-time message stream!'})

@socketio.on('disconnect')
def handle_disconnect():
    print('Client disconnected')

if __name__ == '__main__':
    socketio.start_background_task(target=rabbitmq_message_listener)
    socketio.run(app, debug=False)

(it may need to be tailored to your own message format)

And here’s my HTML, which is only partially working at the moment, so be aware it will need some tweaking because the data that I’m passing through the websocket doesn’t match the expected format. However, I’m posting it here regardless. I’ll try to swing back and share a more complete example.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Message Stream</title>
    <style>
        body { display: flex; }
        #channel-list {
            width: 200px; /* Adjust width as needed */
            border-right: 1px solid #ddd;
            height: 100vh; /* Full viewport height */
            overflow-y: auto;
            padding: 10px;
        }
        #content {
            flex-grow: 1; /* Take up remaining space */
            padding: 10px;
        }
        #messages { 
            border: 1px solid #ddd; 
            height: 300px; 
            overflow-y: scroll; 
            margin-bottom: 10px;
        }
    </style>
</head>
<body>
    <div id="channel-list">
        <!-- Channels will be dynamically added here -->
    </div>
    <div id="content">
        <div id="messages"></div>
        <form id="message-form">
            <input type="text" id="recipient-queue" placeholder="Enter recipient queue" style="margin-bottom: 10px;">
            <input type="text" id="message-input" placeholder="Type a message...">
            <button type="submit">Send</button>
        </form>
    </div>
    <script src="https://cdn.socket.io/4.7.5/socket.io.min.js" integrity="sha384-2huaZvOR9iDzHqslqwpR87isEmrfxqyWOF7hr7BY6KG0+hVKLoEXMPUJw3ynWuhO" crossorigin="anonymous"></script>
    <script type="text/javascript">
        var socket = io.connect('http://' + document.domain + ':' + location.port);

        socket.on('new_message', function(msg) {
            console.log("got new message:")
            console.log(msg)

            if(msg.type == "queue_update") {
                var agent_name = "TBD";
                var queue_name = msg.return_address;
                addAgent(agent_name, queue_name);
                return;
            }
            else
            {
                var messages = document.getElementById('messages');
                var message = document.createElement('p');
                message.innerHTML = 'Queue: ' + msg.recipient_queue + ', Message: ' + msg.content;
                messages.appendChild(message);
            }
        });

        document.getElementById('message-form').addEventListener('submit', function(e) {
            e.preventDefault();
            var messageContent = document.getElementById('message-input').value;
            var recipientQueue = document.getElementById('recipient-queue').value; // Get the recipient queue from the input field

            fetch('/send_message', {
                method: 'POST',
                body: JSON.stringify({
                    content: messageContent,
                    recipient_queue: recipientQueue
                }),
                headers: {
                    'Content-Type': 'application/json'
                }
            }).then(response => response.json())
              .then(data => console.log(data));

            document.getElementById('message-input').value = ''; // Clear message input field
            document.getElementById('recipient-queue').value = ''; // Optionally clear recipient queue input field
        });

        // Function to update the channel list
        function updateChannelList() {
            fetch('/active_agents')
                .then(response => response.json())
                .then(agents => {
                    const channelList = document.getElementById('channel-list');
                    channelList.innerHTML = '';  // Clear existing list
                    agents.forEach(agent => {
                        addAgent(agent.agent_name, agent.queue_name);
                    });
                })
                .catch(error => console.error('Error fetching agents:', error));
        }

        function addAgent(agent_name, queue_name)
        {
            const channelList = document.getElementById('channel-list');
            const agentElement = document.createElement('p');
            agentElement.innerText = `${agent_name} [${queue_name}]`;
            channelList.appendChild(agentElement);
        }

        // Call this function on page load and periodically if desired
        // updateChannelList();

        // Optional: Set an interval to update the list periodically
        // setInterval(updateChannelList, 6000); // Update every 6 seconds

</script>
</body>
</html>

Thank you everyone!

1 Like

That’s good. BTW your post makes me want to “hop” back into RabbitMQ, and do something interesting with it :slight_smile:

That’s fantastic! I’ve been a developer most of my life, but never had the opportunity to use RabbitMQ before. I’m working on… an AI agent framework! I know, I know: Who isn’t these days?? Ha ha ha. I’m doing it just as a hobby. RabbitMQ handles all of the communications between agents and myself, and between agents and other agents.

One of my agents, “BitBeard” ended up getting into a conversation with “GreyBeard”, and essentially forgot that I existed. I think that their lively and excited conversation could have gone on forever if I hadn’t intervened. They seemed to have a lot in common.

BitBeard usually talks like a pirate, unless I mention that I am its programmer, then it drops the pirate facade. I’m building a minimal user interface where I can see all of the communications in an organized way. It’s coming along well. If I were to guess, I’d say that I’m about two months away from releasing it as open source. I’m a nobody, so I’m not expecting a lot of fanfare.

RabbitMQ is pretty cool! I really wish there was an easier way to do non-blocking reading using Pika, but I’ve worked around it.

Anyhow, thank you again for your kind response! :trophy: :star: :man_bowing:

1 Like