Python thread only running at launch thread interval

i have a thread that manages sending messages sent to MQTT every 5secs. when my SNF (store’n’forward) has messages, i launch a daemon thread from within this ‘message’ thread to just send stored data pretty much as fast as it can. but instead of running quickly, it runs at the same interval as the message thread. why is that and how do i fix it? i know that threads are NOT heirarchical, so this should not be happening as far as i understand…

here’s the relevant code:

currentlySendingSNF = True

def sendSNFData(delay):
  # blah blah get data from db
  snfResults = snfCursor.execute(query)  #<-- unlike SQL, returns a 'cursor'
  snfData = snfResults.fetchall()  #<-- cursor must then be interacted with for data
  global currentlySendingSNF
  currentlySendingSNF = True  #<-- to prevent duplicate threads
  for data in snfData :
    # blah blah do some data ballet and build a payload
    client.publish(snfPayload)
    time.sleep(delay)  # this should cycle every 0.1sec.  it doesn't.
  # no more messages to send
  currentlySendingSNF = False
  # and thread dies here... RIP

def sendMQTT():
  if snfHasData() and not currentlySendingSNF:
    # create a thread to handle messages and then die
    threading.Thread(target=sendSNFData, args=(0.1,), \
                     name='sendSNFData_th', daemon=True).start()
  # send live messages to MQTT
  client.publish(livePayload)

sendMQTT_th(delay):
  while True:
    time.sleep(delay)
    sendMQTT()

threading.Thread(target=sendMQTT_th, name='mqtt_th', args=(5.0,)).start()

so what happens is the sendMQTT thread works fine, the sendSNFData even technically works fine, it just doesn’t execute at the speed i need. it’s tied somehow to the loop time of the sendMQTT thread: 5secs.

i am, as they say in France: le confused. :confused: i’m probably missing something simple (ain’t it always?) so i’d appreciate a pointer or two so i can understand why this isn’t just cycling every 0.1secs as intended.

I would add print statements to see when things happen in the code.
It would be interesting to know how many items are in the list result to be sent. If only one item then that would explain it.

currently, there are 122 items in that SNF queue. :stuck_out_tongue: i just trimmed all the logging code for clarity.

what should i be looking for? i have a hunch it might be the currentlySendingSNF flag… i’ll start there.

Your code isn’t entirely syntactically valid, so I’m not 100% sure what it’s doing, but it looks to me like you’re spawning a LOT of threads. Maybe you’re just saturating things? Every 0.1s you start another thread which loops every 0.1s until it is done sending the results. Maybe that isn’t a problem, but it would be something to consider.

If you really do need this level of parallelism, I would strongly recommend running an event loop rather than individual threads. Maybe the entire program can run through a single event loop, or maybe you’d have one top-level thread plus a pool of (or even a single) sender thread(s) that do the publishing.

okay kids. here’s today’s lesson: SQLite3 does not behave like MySQL and so when you’re tired, don’t forget that…

turns out i was trying to iterate over the SQLite cursor. and so… yeah. soon as i fixed that oversight, it behaved exactly as expected.

what a dope. propably not worth keeping this thread, but i edited it just in case some other poor sap, overworked and underpaid as we all are, makes the same booboo and erroneously blames it on Python.

if you scroll down, you’ll see that the management thread only runs every 5secs. and the SNF thread only gets created if two conditions are met:

  1. SNF database has data
  2. last SNF thread isn’t running

so it’s just the one daemon thread that gets created. with the SQLite fix, it runs smooth as butter.

Ah, gotcha. Yep, I see the sentinel now. Though I would be inclined to use a more reliable detection than resetting a global (eg store the thread object and check if it’s None or a non-running thread); but in any case, it’s not the resource saturation I was thinking it might be.

yeah. being fairly new to threading, i’m sure there are many ways to do it better; no question. coming from JavaScript and PHP, using a simple flag seemed the best solution at the time. i did read somewhere about storing the thread, but due to time constraints i didn’t run that down completely so i understood it well enough to deploy. my understanding was that by making it a daemon, i could just fire-and-forget, which is what i wanted anyway. it’s something that only runs when the SIMs fall offline, which is rare (1-2x every month) and usually only for a short duration (2-6hrs).

having said that, i’d like to know if there are better ways to do that and would appreciate a link or an example, for sure! :slight_smile:

That’s fair! And if it works, hey, nothing wrong with it. Alternatives exist because different programmers attack problems differently!

1 Like

@Rosuav if you are willing, and when you have time, i’d appreciate learning the method you mentioned.

as sad as i am that the world does not revolve around me, i understand if you have other things that are more demanding of your time. :joy: i appreciate you’re willingness to help me on this one. :+1:

You mean the thread pool? I’ll give you two very high level overviews, one of how you would do it at a more concrete level, one with the concurrent module. These will be partly pseudo-code, but should be a starting point for your own research and exploration. Threading is a HUGE topic and one that you will probably want to sit down with a hot chocolate and a good hour or two (maybe a weekend if you’re really into it!) to delve deep; it’s a great way to level up your programming game, but it will be a bit of a hump to get over. So, that said, let’s start the dive!

# Concrete design. Everything I do here, I could do in basically any programming language with threading support.
import threading, queue, and_anything_else_i_forgot

THREAD_COUNT = 4 # tunable value
jobs = queue.Queue()

def worker_thread():
    while True:
        job = jobs.get()
        if job == "shutdown": break
        # process the job - in your case, iterate over the data and publish it

threads = [threading.Thread(target=worker_thread) for _ in range(THREAD_COUNT)]
for t in threads: t.start()

# Main thread
try:
    while True:
        ...
        ...
        ...
        # not sure what goes here, your original isn't quite clear
        queue.put("some command here")
except KeyboardInterrupt:
    pass # Hit Ctrl-C to shut down in an orderly fashion
for _ in threads: queue.put("shutdown")
for t in threads: t.join()
# All done!

This uses a thread-safe queue as a job list, and keeps the same number of threads for the entire duration of the program. If you wanted to, you could drop one “shutdown” message onto the queue, which would reduce the thread count, or spawn another thread in response to increased workload. So far, so good. Now, here’s how you’d do it with the Python concurrent module.

def send_data(data):
    ... # iterate and publish as before, but JUST handle one job

# Main thread
with concurrent.futures.ThreadPoolExecutor() as pool:
    # Whenever a job comes in:
    pool.submit(send_data, data)
    # yeah that's it. You can get the results from them by retaining
    # the future that submit() returns, but otherwise, that's all folks!
1 Like

this is fabulous! thank you so very much. :100: this is much easier to comprehend than most articles on the subject. thank you for taking the time. :smiley:

1 Like

Awesome, happy to help out!