Code slows down over time but can't find a memory leak

I have a simple project that reads a bunch of vehicle sensors and outputs values to a node red dashboard. I’m very much a novice but have the code working, however it slows down over time to the point where the pi zero hangs. It consists of four functions which are running as daemon threads so they can run independently - essentially the four sensors. I’ve tried to use tracemalloc to identify a memory leak with no success. I wonder if I have something wrong structually?

from sys import platform
from io import BufferedReader
from threading import Thread, Lock
from time import sleep
import tracemalloc
import linecache
import os
import serial
import pynmea2
import io
import time
import board
import busio
import digitalio
import adafruit_max31855
import paho.mqtt.client as mqtt
import collections
import obd
import adafruit_ads1x15.ads1015 as ADS
from adafruit_ads1x15.analog_in import AnalogIn

# import logging
# import psutil
tracemalloc.start()
# logging.basicConfig(filename="/home/pi/hdj.log", format='%(asctime)s %(message)s', filemode='a')
# logger = logging.getLogger()
# logger.setLevel(logging.DEBUG)

spi = board.SPI()
cs = digitalio.DigitalInOut(board.D5)
max31855 = adafruit_max31855.MAX31855(spi, cs)
mqttBroker = "127.0.0.1"
client = mqtt.Client("HDJ")
client.connect(mqttBroker)
obd.logger.setLevel(obd.logging.CRITICAL)
i2c = busio.I2C(board.SCL, board.SDA)
ads = ADS.ADS1015(i2c)

# initialise global variables
reading = False
ref_set = True
zbv = 0.55
aBoost = 0


def display_top(snapshot, key_type='lineno', limit=10):
    snapshot = snapshot.filter_traces((
        tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
        tracemalloc.Filter(False, "<unknown>"),
    ))
    top_stats = snapshot.statistics(key_type)

    print("Top %s lines" % limit)
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        # replace "/path/to/module/file.py" with "module/file.py"
        filename = os.sep.join(frame.filename.split(os.sep)[-2:])
        print("#%s: %s:%s: %.1f KiB"
              % (index, filename, frame.lineno, stat.size / 1024))
        line = linecache.getline(frame.filename, frame.lineno).strip()
        if line:
            print('    %s' % line)

    other = top_stats[limit:]
    if other:
        size = sum(stat.size for stat in other)
        print("%s other: %.1f KiB" % (len(other), size / 1024))
    total = sum(stat.size for stat in top_stats)
    print("Total allocated size: %.1f KiB" % (total / 1024))


def gpsStuff():
    ser = serial.Serial('/dev/ttyACM0', 9600, timeout=0.1)
    sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
    while 1:
        try:
            line = sio.readline()
            prefix = line[0:6]
            if prefix == "$GPVTG":
                msg = pynmea2.parse(line)
                # client.publish("GPS_SPD", round(msg.spd_over_grnd_kmph))
                client.publish("GPS_SPD", msg.spd_over_grnd_kmph)

            if prefix == "$GPGGA":
                msg = pynmea2.parse(line)
                client.publish("GPS_NumSV", msg.num_sats)
                client.publish("GPS_DOP", round(float(msg.horizontal_dil) * 2.5))
                client.publish("GPS_ALT", msg.altitude)

        except serial.SerialException as e:
            print('Device error: {}'.format(e))
            break

        except pynmea2.ParseError as e:
            print('Parse error: {}'.format(e))
            continue


def boostyboy():
    global zbv
    global ref_set
    global aBoost
    while not ref_set:
        print("Zero boost vRef not set")
        time.sleep(2)
    print("vRef set:", zbv)
    while True:
        try:
            chan = AnalogIn(ads, ADS.P0)
            aBoost = round((chan.voltage - zbv) / 0.04, 1)
            # print("Pressure:", aBoost, "PSI")
            # if p < 1:
            #    p = 0
            client.publish("boost", aBoost)
            # logger.info("aBoost: %s",p)
            time.sleep(.1)
        except:
            pass


def egtStuff():
    while True:
        try:
            samples = collections.deque(maxlen=40)
            for i in range(5):
                tempC = max31855.temperature
                # print(tempC)
                samples.append(tempC)
                time.sleep(.05)

            N = len(samples)
            total = sum(samples)
            movingAv = total / N
            # print("Moving Av Temperature:", int(round(movingAv,0)))
            client.publish("EGT", int(round(movingAv, 0)))
            time.sleep(0.50)
            # print(psutil.virtual_memory())

        except:
            pass


def obdStuff():
    global ref_set
    global zbv
    global aBoost

    tic = time.time()
    c = obd.OBD("/dev/rfcomm0")
    tic = time.time()

    while True:
        try:
            # print("allo")
            toc = time.time()
            if toc - tic > 20:
                r = c.query(obd.commands.ELM_VOLTAGE)
                client.publish("Voltage", r.value.magnitude)
                tic = time.time()

            r = c.query(obd.commands.RPM)
            if not r.is_null():
                client.publish("RPM", round(r.value.magnitude, 0))

            r = c.query(obd.commands.COOLANT_TEMP)
            if not r.is_null():
                client.publish("COOLANT_TEMP", r.value.magnitude)

            r = c.query(obd.commands.INTAKE_PRESSURE)
            if not r.is_null():
                b = round(r.value.magnitude * 0.145 - 14.7, 1)
                if not ref_set:
                    if b < 0.5:
                        chan = AnalogIn(ads, ADS.P0)
                        zbv = chan.voltage
                        ref_set = True
                        print("MAP at set time:", b)
                client.publish("INTAKE_PRESSURE", b)
                logger.info("MAP: %s ; aBoost: %s",b, aBoost)
                print("MAP:", b)

            r = c.query(obd.commands.INTAKE_TEMP)
            if not r.is_null():
                client.publish("INTAKE_TEMP", r.value.magnitude)

        except:
            pass


if __name__ == "__main__":
    thread1 = Thread(target=gpsStuff)
    thread1.setDaemon(True)
    thread1.start()

    thread2 = Thread(target=obdStuff)
    thread2.setDaemon(True)
    thread2.start()

    thread3 = Thread(target=egtStuff)
    thread3.setDaemon(True)
    thread3.start()

    thread4 = Thread(target=boostyboy)
    thread4.setDaemon(True)
    thread4.start()

    # while True:
    #    snapshot = tracemalloc.take_snapshot()
    #    display_top(snapshot)

    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()

I use the GC module to figure what is leaking as python objects.

gc.collect()
all_objects = gc.get_objects()

I then summerize the types of objects in all_objects and watch how the type counts change over time.

This should lead you to the type object that is leaking and after you know the object type it’s possible to fix the problem.

Barry

Thanks Barry - i’ll give it a go.