Hi @sr-murthy,
Thanks for the information. I’ll keep that in mind, but in the meantime, I believe that I identified my main issue.
The issue turned out to be that when socket.io is run in debug mode…
def background_task():
counter = 0
while True:
time.sleep(2)
socketio.emit('new_message', {'content': 'Hello from the server!', 'counter': counter})
print(f"Message sent to web interface: {counter}")
counter += 1
if __name__ == '__main__':
socketio.start_background_task(target=background_task)
socketio.run(app, debug=True)
It spawns two copies of the background task function and runs each. I was lucky enough to have added a counter to my background task for testing, and noticed this behavior when it started printing…
Message sent to web interface: 1
Message sent to web interface: 1
Message sent to web interface: 2
Message sent to web interface: 2
Message sent to web interface: 3
Message sent to web interface: 3
etc…
Once I corrected that, everything started going a lot smoother.
Here’s my entire flask app, in case it’s helpful for anyone:
import sys
import os
import json
from flask import Flask, request, jsonify, render_template
from flask_socketio import SocketIO
from src.io.rabbit_mq_io_handler import RabbitMQIOHandler
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
def handle_message(body):
print(f"Web interface received message: {body}")
# Attempt to decode body if it's a byte string, otherwise keep it as is
decoded_body = body.decode() if isinstance(body, bytes) else body
# Attempt to parse the decoded body as JSON, otherwise use it directly
try:
message_data = json.loads(decoded_body) if isinstance(decoded_body, str) else decoded_body
except json.JSONDecodeError:
message_data = {'content': decoded_body}
# Emitting the message to the frontend using Flask-SocketIO
print(f"Web interface emitting message: {message_data}")
socketio.emit('new_message', message_data)
def rabbitmq_message_listener():
rabbitmq_listener = RabbitMQIOHandler(
rabbitmq_host="localhost",
queue_name="web_interface",
message_callback=handle_message
)
rabbitmq_listener.start_consuming()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/send_message', methods=['POST'])
def send_message():
data = request.json
message = data['content']
recipient_queue = data['recipient_queue']
# Initialize a new RabbitMQIOHandler instance for sending messages
send_io_handler = RabbitMQIOHandler(
rabbitmq_host="localhost",
queue_name=recipient_queue,
declare_queue=False
)
# Send message to RabbitMQ
send_io_handler.send_output(
message_type="message",
sender_name="user",
return_address="user_queue",
recipient_queue=recipient_queue,
content=message
)
return jsonify({"status": "Message sent"})
@socketio.on('connect')
def handle_connect():
print('Client connected')
socketio.emit('new_message', {'content': 'Welcome to the real-time message stream!'})
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
if __name__ == '__main__':
socketio.start_background_task(target=rabbitmq_message_listener)
socketio.run(app, debug=False)
(it may need to be tailored to your own message format)
And here’s my HTML, which is only partially working at the moment, so be aware it will need some tweaking because the data that I’m passing through the websocket doesn’t match the expected format. However, I’m posting it here regardless. I’ll try to swing back and share a more complete example.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Message Stream</title>
<style>
body { display: flex; }
#channel-list {
width: 200px; /* Adjust width as needed */
border-right: 1px solid #ddd;
height: 100vh; /* Full viewport height */
overflow-y: auto;
padding: 10px;
}
#content {
flex-grow: 1; /* Take up remaining space */
padding: 10px;
}
#messages {
border: 1px solid #ddd;
height: 300px;
overflow-y: scroll;
margin-bottom: 10px;
}
</style>
</head>
<body>
<div id="channel-list">
<!-- Channels will be dynamically added here -->
</div>
<div id="content">
<div id="messages"></div>
<form id="message-form">
<input type="text" id="recipient-queue" placeholder="Enter recipient queue" style="margin-bottom: 10px;">
<input type="text" id="message-input" placeholder="Type a message...">
<button type="submit">Send</button>
</form>
</div>
<script src="https://cdn.socket.io/4.7.5/socket.io.min.js" integrity="sha384-2huaZvOR9iDzHqslqwpR87isEmrfxqyWOF7hr7BY6KG0+hVKLoEXMPUJw3ynWuhO" crossorigin="anonymous"></script>
<script type="text/javascript">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('new_message', function(msg) {
console.log("got new message:")
console.log(msg)
if(msg.type == "queue_update") {
var agent_name = "TBD";
var queue_name = msg.return_address;
addAgent(agent_name, queue_name);
return;
}
else
{
var messages = document.getElementById('messages');
var message = document.createElement('p');
message.innerHTML = 'Queue: ' + msg.recipient_queue + ', Message: ' + msg.content;
messages.appendChild(message);
}
});
document.getElementById('message-form').addEventListener('submit', function(e) {
e.preventDefault();
var messageContent = document.getElementById('message-input').value;
var recipientQueue = document.getElementById('recipient-queue').value; // Get the recipient queue from the input field
fetch('/send_message', {
method: 'POST',
body: JSON.stringify({
content: messageContent,
recipient_queue: recipientQueue
}),
headers: {
'Content-Type': 'application/json'
}
}).then(response => response.json())
.then(data => console.log(data));
document.getElementById('message-input').value = ''; // Clear message input field
document.getElementById('recipient-queue').value = ''; // Optionally clear recipient queue input field
});
// Function to update the channel list
function updateChannelList() {
fetch('/active_agents')
.then(response => response.json())
.then(agents => {
const channelList = document.getElementById('channel-list');
channelList.innerHTML = ''; // Clear existing list
agents.forEach(agent => {
addAgent(agent.agent_name, agent.queue_name);
});
})
.catch(error => console.error('Error fetching agents:', error));
}
function addAgent(agent_name, queue_name)
{
const channelList = document.getElementById('channel-list');
const agentElement = document.createElement('p');
agentElement.innerText = `${agent_name} [${queue_name}]`;
channelList.appendChild(agentElement);
}
// Call this function on page load and periodically if desired
// updateChannelList();
// Optional: Set an interval to update the list periodically
// setInterval(updateChannelList, 6000); // Update every 6 seconds
</script>
</body>
</html>
Thank you everyone!