Create a system that receives N data packets and can extract the voltage from the data and compare this voltage with the other incoming data

Hello partners

I am making a system that can receive data from several antennas (xbee) to be able to detect the failure of one or more devices.

What I am doing is a sensor network, I am using Xbee to be able to make this network, and I use python to see the data on the computer. What I want to do is send from the sensor the voltage data and the time and date. And on the PC there will be an Xbee receiving the data frame in API. The good thing is that there is already an Xbee library for Python.

I have managed to separate the voltage from the data. Now I want to compare this voltage with the other incoming voltages and from there take an average and determine which device has the lowest voltage.

Thi is the code:

indent preformatted text by 4 spaces

from digi.xbee.devices import XBeeDevice

TODO: Replace with the serial port where your local module is connected to.

PORT = “COM5”

TODO: Replace with the baud rate of your local module.

BAUD_RATE = 9600
DATA_TO_SEND = “Hola XBee!”

resultado = 0

def main():
print(" ±-------------------------------------------------+")
print(" | Sistema de Monitreo de Paneles Solares del SESLab|")
print(" ±-------------------------------------------------+\n")

device = XBeeDevice(PORT, BAUD_RATE)

try:
    device.open()

    device.flush_queues()

    print("Inicializando Xbee´s : %s..." % DATA_TO_SEND)

    device.send_data_broadcast(DATA_TO_SEND)

    print("Completado")

    print("Esperando Datos de Sensores...\n")

    while True:
        xbee_message = device.read_data()
        if xbee_message is not None:
            
            print("Dirección de Xbee %s >> %s" % (xbee_message.remote_device.get_64bit_addr(),
                                     xbee_message.data.decode()))
            
            data = xbee_message.data.decode()
            voltage = data.split(",")[0]
            volt = float (voltage)
            resultado = resultado + volt 
            
            
            

finally:
    if device is not None and device.is_open():
        device.close()

if name == ‘main’:
main()

Hi!
Could you please tell in what kind of format exactly you are receiving the data? Also, what it the format that you would like to get your reports in?

I would suggest you try to put your program to smaller pieces. Now that you get data, you could have a function that adds the coming data to a dataframe. Then you could have separate functions for getting the average and minimum values from the dataframe. pandas package might be nice to use, as it has ready functions like

mean()

,

min()

and

describe()

, with which you can make nice reports for yourself.

Hi Petteri Suckman

Basically this graphic interface is thought and designed to be able to show voltages of the sensors that are in the field and with this detect which sensor is showing a voltage value lower than the average of all the voltages that will reach my computer.

At the moment this interface will show the Date, the Time and the voltages of the first, second and third sensor, in addition to that it will show the average of these voltages and finally the lowest voltage.

I want that in the future when I go to add one or more sensors to the network, I can modify the interface so that it shows more voltage values.

The words in this interface are in Spanish. So I’m going to translate them for you.

Sistema de monitoreo de Paneles del Seslab = Seslab Panel monitoring system
Fecha de medicion = Measurement date
Hora de medicion = measurement time
Voltaje de Panel 1 = Panel Voltage 1
Voltaje de Panel 2 = Panel Voltage 2
Voltaje de Panel 3 = Panel Voltage 3
Pomedio = Average
Panel con menor voltaje = Lower voltage panel

The other thing I wanted to tell you is how the program realizes that the frame that enters the PC is from the Panel that I assign as Panel 1 or 2 or 3. Now it is very easy because I can only test one antenna at a time but when put this system to work, I am going to receive 3 frames of data from 3 different sensors.

So I’m going to tell you that Xbee made a library for Xbee, well as you have seen there are several functions here that have served to send, receive and subtract the data from the Xbee.

There is a function called xbee_message.remote_device.get_64bit_addr ()

This function will give me the direction of the sensor in this way I will know according to how I place the sensors in the field, to which sensor each direction belongs.

Next this is the result of printing in the console the code that gives me the address of the sensor, the data frame and the separated data.

30.00,09:18:36,30/05/2021 This is the data frame that all Xbee will send to the PC.

Xbee address 0013A20040D7B01E >> 30.00,09:18:36,30/05/2021
voltage 30.00
hour 09:18:36
date 30/05/2021

Regarding the direction of the devices as I have told you, at the time of the test I will do it with 3 Xbee. Each Xbee on the back has its address.
For example sensor 1 would be
0013A20040D7B01E
sensor 2 would be
0013A200414EF124
sensor 3 would be
0013A20040D7AEA4

What I thought is that if in the future I add more devices to the network. what you should do is modify the code and add more sensor addresses before placing them.

In the graphical interface, it does not matter if only sensor 1 takes the time and date, but from all the others it takes the voltage from them, which would be taken from data data.split (",") [0]

This Is the code: (I dont know Why this post dont have a good way to put the code)
What I am missing is knowing that the voltage of panel 1 is from sensor 1 and that the voltage of panel 2 is from sensor 2 and so on. Hopefully make a code that if in the future I want to add one or more sensors, I just enter the code and enter the new address of the sensor.

I just need that and I don’t know how to do it. Also when I run it, it is pulling the voltage in the space where the date goes and the date in the space where the voltage goes. I will share a photo for you to see.

import dataclasses
import functools
import statistics
import tkinter as tk
from concurrent import futures

from digi.xbee.devices import XBeeDevice

PORT = "COM5"
BAUD_RATE = 9600
device = XBeeDevice(PORT, BAUD_RATE)

NUMBER_OF_VOLTAGE_PANELS = 3
DATE = 'Fecha de medición'
TIME = 'Hora de medición'
VOLTAGE = 'Voltage de Panel'
AVERAGE = 'Promedio'
LOWEST = 'Panel con Menor Voltaje'


@dataclasses.dataclass
class FloatData:
label: str
value: float


 @dataclasses.dataclass
 class StringData:
 label: str
 value: str


  @dataclasses.dataclass
  class Data:
  date: StringData = dataclasses.field(
    default=StringData(DATE, ''), init=False)
 time: StringData = dataclasses.field(
    default=StringData(TIME, ''), init=False)
 voltages: list[FloatData] = dataclasses.field(
    default_factory=list, init=False)
average: FloatData = dataclasses.field(
    default=FloatData(AVERAGE, 0), init=False)
lowest: FloatData = dataclasses.field(
    default=FloatData(LOWEST, 0), init=False)

def __post_init__(self):
    end_range = max(2, NUMBER_OF_VOLTAGE_PANELS+1)
    for number in range(1, end_range):
        self.voltages.append(FloatData(f'{VOLTAGE} {number}', 0))

def set_date(self, date: str):
    self.date.value = date

def set_time(self, time: str):
    self.time.value = time

def set_voltages(self, voltages: tuple[float]):
    for float_data, voltage in zip(self.voltages, voltages):
        float_data.value = voltage
    self.calculations()

def calculations(self):
    volatages = [float_data.value for float_data in self.voltages]
    self.average.value = round(statistics.mean(volatages), 2)
    self.lowest.value = min(volatages)


 thread_pool_executor = futures.ThreadPoolExecutor(max_workers=1)


def tk_after(target):

@functools.wraps(target)
def wrapper(self, *args, **kwargs):
    args = (self,) + args
    self.after(0, target, *args, **kwargs)

return wrapper


def submit_to_pool_executor(executor):
'''Decorates a method to be sumbited to the passed in executor'''
def decorator(target):

    @functools.wraps(target)
    def wrapper(*args, **kwargs):
        result = executor.submit(target, *args, **kwargs)
        result.add_done_callback(executor_done_call_back)
        return result

    return wrapper

return decorator


def executor_done_call_back(future):
exception = future.exception()
if exception:
    raise exception


class App(tk.Tk):
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.title("Sistema de Monitoreo de Paneles Solares del SESLab")
    self.config(bg="#919F89")
    self.data = Data()

    width_of_window = 600
    height_of_window = 280
    screen_width = self.winfo_screenwidth()
    screen_height = self.winfo_screenheight()
    x_coordinate = (screen_width/2)-(width_of_window/2)
    y_coordinate = (screen_height/2)-(height_of_window/2)
    self.geometry((f'{width_of_window}x{height_of_window}+'
                   f'{x_coordinate:.0f}+{y_coordinate:.0f}'))

    self.form_frame = FormFrame(self.data, self)
    self.form_frame.pack()
    self.btn = tk.Button(
        self, bg="#E2BBAC", text="Iniciar ", command=self.on_btn)
    self.btn.pack()

@tk_after
def btn_enable(self, enable: bool = True):
    self.btn.config(state='normal' if enable else 'disabled')

def on_btn(self):
    self.btn_enable(False)
    self.update()

@submit_to_pool_executor(thread_pool_executor)
def update(self):
    device.open()
    DATA_TO_SEND = "Hola XBee!"
    device.send_data_broadcast(DATA_TO_SEND)

    try:
        device.flush_queues()
        while True:
            xbee_message = device.read_data()
            if xbee_message is not None:
                data = xbee_message.data.decode()
                self.data.set_date(data.split(",")[1])
                self.data.set_time(data.split(",")[2])
                self.data.set_voltages( float(data.split(",")[0]))
                self.form_frame.update()

    finally:
        if device is not None and device.is_open():
            device.close()
        self.btn_enable()


class FormFrame(tk.Frame):
def __init__(self, data: Data, *args, **kwargs):
    self.data = data
    kwargs['width'] = '1200'
    kwargs['height'] = '600'
    super().__init__(*args, **kwargs)
    self.config(bg="#98A7AC")
    self.str_variables = {}

    label1 = tk.Label(
        self, text="Sistema de monitoreo de Paneles del SESLab", font=18)
    label1.grid(row=0, column=0, padx=5, pady=5, columnspan=2)

    self.data_objs = [self.data.date, self.data.time]
    self.data_objs += [obj for obj in self.data.voltages]
    self.data_objs += [self.data.average, self.data.lowest]

    for row_index, data_obj in enumerate(self.data_objs, 1):
        label = tk.Label(self, text=f'{data_obj.label} : ')
        label.grid(row=row_index, column=0, padx=2, pady=5)

        str_variable = tk.StringVar()
        entry = tk.Entry(self, textvariable=str_variable)
        entry.grid(row=row_index, column=1, padx=1, pady=5)
        entry.config(fg="black", justify="center", state='readonly')
        self.str_variables[data_obj.label] = str_variable

@tk_after
def update(self):
    for data_obj in self.data_objs:
        self.str_variables[data_obj.label].set(data_obj.value)


if __name__ == '__main__':
app = App()
app.mainloop()

Good progress! I did not actually realize you were going for a full blown user interface. :slight_smile: I was also thinking functions where you were thinking objects. But Python is flexible, many ways to achieve the same objective.

I was not actually able to run the program because I could not fix the indentations correctly. But I read the code.

You were saying that what is missing is that you’d like to know which sensor and panel (are they the same or does one panel have many sensors?) the data point is connected to. You have a method called “update”. Is that about the device telling your program what new data there is? If so, I would also give the address of panel/sensor to the object, so it can be collected from there for UI if needed. So, you could have a data structure which holds data objects that each represent all meta data of a single measurement.

Do you need to compare measurements of the same time period, or are you comparing voltages from different times? I think normally you would separate the calculation logic so that you do not have to calculate store calculations results to each object, but do the calculations based on need, e.g. when someone presses button from UI.

The “calculations” function should probably be refactored. Is that part of class Data, or outside? This function changes state of two variables but does not have a return value. I would probably make the calculations outside, as they do not concern one measurement points’ data but the whole thing. And then I would make a separate function for each calculation so that the function gives the calculation result as return value.

There are a couple of places where naming choices make the code hard to read. You might want to refactor e.g. “str_variable” and tk.StringVar(). Also I think the name “update” is not unique.

I noticed you have “dataclasses” StringData and FloatData. Why do you need those? If they are not needed, then the program might be more readable without clutter.

(Disclaimer, I am fairly new to Python myself. So my notes are more to make the code cleaner. I am not familiar with all constructs you have used. E.g. I have not used functools.wraps() yet.)

Too bad you can’t run the code = (

Yes, panel and sensor are the same haha The sensor measures the panel’s voltage.

Thank you very much for the comments, it has actually been a joint work with a person who helped me organize my ideas and who clearly had more knowledge when using Tkinter.

My intention is to make the comparison of the voltages in real time of the same time of period, that means that if I receive N data frames with N voltages at 10:30, then the average operation is done with those N voltages at that date and time.