My mqtt-in node cannot received data from broker

Hi. I have created my custom mqtt broker using python. Now I have issue that my node-red mqtt-in node cannot receive data from my custom broker. My scenario is I have one esp32 which connected with dht 22 sensor coding by arduino. the sensor sends data to my custom broker and my custom broker can received the data. but when my node-red mqtt in subscribe to my custom broker and get data, it cannot get any data

import socket
import json
import threading
import traceback
import logging

# Initialize the logging module
logger = logging.getLogger(__name__)

# Dictionary to store subscribed topics for each client
client_subscriptions = {}
# Dictionary to store last message for each topic
last_messages = {}
# Lock for thread safety
lock = threading.Lock()

def decode_string(data, start):
    length = (data[start] << 8) + data[start + 1]
    topic = data[start + 2:start + 2 + length].decode('utf-8')
    return topic, start + 2 + length

def decode_length(data, start):
    multiplier = 1
    value = 0
    while True:
        digit = data[start]
        value += (digit & 127) * multiplier
        multiplier *= 128
        start += 1
        if (digit & 128) == 0:
    return value, start

def send_connack(client_socket):
    connack = b'\x20\x02\x00\x00'  # CONNACK packet

def decode_string_subscribe(data):
        topic_length = (data[2] << 8) + data[3]
        topic_bytes = data[4:4 + topic_length]
        print(f"Raw bytes of extracted topic: {topic_bytes}")
        decoded_topic = topic_bytes.decode('utf-8', errors='replace')
        return decoded_topic, 4 + topic_length

    except UnicodeDecodeError as ude:
        print(f"[!] UnicodeDecodeError in decode_string_subscribe: {ude}")
        return "", 0
    except Exception as ex:
        print(f"[!] Exception in decode_string_subscribe: {ex}")
        return "", 0

def handle_subscribe(client_socket, raw_packet):
        topic, index = decode_string_subscribe(raw_packet)
        # Extract the QoS level from the SUBSCRIBE packet
        qos = raw_packet[index]

        with lock:
            if client_socket not in client_subscriptions:
                client_subscriptions[client_socket] = set()

        print(f"[*] Received SUBSCRIBE packet. Decoded topic: {topic}")
        print(f"[*] Subscriber QoS level: {qos}")
        print(f"[*] Client subscribed to topic: {client_subscriptions}")

    except Exception as ex:
        print(f"[!] Exception in handle_subscribe: {ex}")
        print(f"[*] Raw packet in case of exception: {raw_packet}")

def handle_publish(client_socket, topic, payload):
        with lock:
            last_messages[topic] = payload

        with lock:
            for client, subscribed_topics in client_subscriptions.items():
                if topic in subscribed_topics:
                    # Forward the message to subscribed clients
                    logger.debug(f"[*] Forwarding message to client {client}: {payload}")

        # Additional print statements for verification"[*] Published message to topic {topic} with payload: {payload}")
        # Print QoS level
        qos_level = (payload[0] & 0b00000110) >> 1"[*] QoS level: {qos_level}")

    except Exception as ex:
        # Log the exception without exposing payload content
        logger.error(f"[!] Exception in handle_publish: {ex}")

        traceback.print_exc()  # Print the traceback for detailed information

        # Alternatively, you can log the exception to a file or use a logging library
        # logging.error(f"Exception in handle_publish: {ex}")
        # logging.exception("Detailed exception information:")"[*] Topic: {topic}, Payload: [Error occurred, check logs for details]")

def handle_client(client_socket):
        data = client_socket.recv(1024)

        while data:
            packet_type = (data[0] & 0xF0) >> 4

            if packet_type == 1:  # CONNECT packet
                mqtt_version = data[5] if (data[0] & 0x80) == 0x80 else data[6]

                print(f"[*] Raw CONNECT packet: {data}")
                print(f"[*] Client connected with MQTT version: {mqtt_version}")


            elif packet_type == 8:  # SUBSCRIBE packet
                topic, index = decode_string(data, 2)
                print(f"[*] Received SUBSCRIBE packet. Raw packet: {data}")
                print(f"[*] Received SUBSCRIBE packet. Topic (decoded): {topic}")
                handle_subscribe(client_socket, data[2:])

            elif packet_type == 3:  # PUBLISH packet
                topic, index = decode_string(data, 2)
                payload_length, index = decode_length(data, index)
                payload = data[index:index + payload_length]

                payload_str = payload.decode('utf-8')

                if payload_str[0] != '{':
                    payload_str = '{' + payload_str

                print(f"[*] Received raw payload: {payload_str}")

                    payload_dict = json.loads(payload_str)
                    temperature = payload_dict.get("temperature")
                    print(f"Temperature: {temperature}")

                    handle_publish(client_socket, topic, payload_str.encode('utf-8'))

                except json.JSONDecodeError as e:
                    print(f"[!] JSON Decode Error: {e}")
                except Exception as ex:
                    print(f"[!] Exception: {ex}")

            data = client_socket.recv(1024)

    except Exception as e:
        print(f"[!] Error in handle_client: {e}")

        with lock:
            if client_socket in client_subscriptions:
                del client_subscriptions[client_socket]

if __name__ == "__main__":
    broker_address = ""
    broker_port =1883
    broker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    broker_socket.bind((broker_address, broker_port))
    print(f"[*] Broker socket bound to {broker_socket.getsockname()}")
    print(f"[*] MQTT Broker listening on {broker_address}:{broker_port}")

        while True:
            client_socket, client_address = broker_socket.accept()
            print(f"[*] Accepted connection from {client_address}")

            # Assuming you allow any client to connect without authentication
            client_handler = threading.Thread(target=handle_client, args=(client_socket,))

    except KeyboardInterrupt:
        print("[*] Broker shutting down.")

Broker output

Node-red Flow

Instead of a screenshot, please copy and paste the broker output from the shell window, and format it as a multi-line code block (you can put lang-none after the opening backticks, to make it disable syntax highlighting). It will be a lot easier to read that way, and it will also allow others to copy and paste parts of it for further discussion.