Struggling with threading structure

i am working on a project that has the following requirements:

  • critical function runs at exactly a 10ms interval and burps out anything it finds
  • run other functions every 1 second(+/- 100ms)
  • send report every 5 seconds(+/- 100ms)

the astute among you may rightly assume MQTT is in the mix, but i didn’t want to muddy the waters. as it stands, all relevant functions work on their own. my challenge is threading these such that the critical function runs exactly as required while outputting values for the report function. ditto for the other functions, but they can fire/run/dump and then be relaunched. the critical one must always be running.

i have no idea how to thread these correctly. i think threading is the best way to go since i am I/O bound and the I/O is pretty fast. the kink is that i’ve come across a resolution-perfect clock function that i’d like to incorporate, but apparently it is only good with asyncio. while i am not a stranger to Python, i am a stranger to threading in Python. so please bear with me.

i have been advised to use three threads: on for mission critical function, one for the other functions, and a final third for the reporting. all well and good… but i am stuck with some basic understanding of how to fit this into a while True event loop. :stuck_out_tongue:

i appreciate this is a bit of a barn-door-ask, but i am stumped and not sure where to turn to. i am open to whatever refinements to this post are required to move this puck forward.

here’s what i’m tinkering with using that timing clock:

import time
import asyncio

goState = True
tickCount = 0


def do_every(periodInSeconds, fx, *args):
  global goState

  def g_tick():
    t = time.time()
    while True:
      t += periodInSeconds
      yield max(t - time.time(), 0)
  g = g_tick()
  while goState:
    time.sleep(next(g))
    fx(*args)


def checkIO(s, loop):
  global tickCount
  tt = str(time.time())[6:15]  #<-- trim values to the ms
  print(f'hello {s} {tickCount} {goState} ({tt})')
  tickCount += 1
  time.sleep(.009)  #<-- simulation of I/O sampling process
  if not goState:
    loop.stop()


loop = asyncio.get_event_loop()
loop.call_soon(do_every, 0.01, checkIO, 'fleep', loop)  #<-- 'fleep' = input to func

#
#   i know this ↓ gets called right away, but i'm not sure how to loop this
#     properly.  you can see what it's SUPPOSED to do...  and i'm not sure
#     where to add the other necessary threads for reporting and other
#     sampling functions (the other two threads).
#
print(f'current tickCount: {tickCount}')
if tickCount >= 200:
  print(f'turning off the machine')
  goState = False   #<-- should break the do_every() loop...?

loop.run_forever()
loop.close()

i have a bunch of other files that i’ve been bashing away at, but this is the once that feels the closest and incorporates the accurate clock.

EDIT: one further kink i meant to mention, the threads need to be cancellable. which i presume is not a problem since we can name threads and subsequently call thread_x.cancel() to do that. if i’m wrong, please let me know. i was also looking at the second coroutine implementation found in this post.

EDIT2: i mistyped the interval for the critical loop, so i corrected it (100ms → 10ms)

How precise does this 100ms need to be?

i mistyped: the cycle is 10ms. the pulse width we look for is about 20ms, so i guess we can err 5ms either side? and the frustration is that we may only get a maximum of 3 pulses per second. so, in truth, this loop should run as fast as it can to avoid missing anything. i set it to 10ms to try and ‘rest’ the CPU a bit, but in the end, that isn’t a factor.

and that’s the reason i’m trying to tie it to that clock function because i can get the function locked to that resolution.

This is going to be very limiting. It’s extremely hard to reliably sleep for 10ms repeatedly. That said, though: If this high-performance thread isn’t actually sleeping but is instead requesting from a queue, this will likely be gine. The other functions are going to get their look-ins when they need them, and the queue-pumping thread will do its work in between.

Thus the question on how precise these sleeps need to be. On a nice quiet computer, you should be able to sleep for 10ms +/- 5ms fairly reliably; but when you throw multithreading into the mix, it becomes far less certain. Here’s a test script that has a five-second pulse of work; if it’s completely destroying your computer, lower the work factor (40 in this example), but you can probably find a threshold where it doesn’t take all that long to do the work, but it’s still enough to delay the other thread by more than the 5ms cutoff.

import time
import threading

def work(factor):
	print("Working!")
	fib = ["", "*"]
	for _ in range(factor):
		fib.append("*" * (len(fib[-2]) + len(fib[-1])))
	print("Done working.")

def thread(delay, factor):
	while True:
		time.sleep(delay)
		work(factor)

threading.Thread(target=thread, args=(5.0, 40)).start()

prev = time.time()
while True:
	time.sleep(0.010)
	next = time.time()
	if next - prev > 0.015: print("LATE:", next - prev)
	if next - prev < 0.005: print("EARLY:", next - prev)
	prev = next

(The astute among you may notice that this work() function will be followed by some extra garbage collection work, so on a memory-constrained system, the delayed messages might come after the “Done working” message rather than before it. But either way, this work can impact the main thread.)

The “resolution-perfect clock function” you provided is solving one important problem (which I dodged in the above example), namely the clock drift from the time spent in the process itself. However, it does nothing to protect you from other threads in the process, other processes in the system, or any other source of delay.

If you really really truly need PERFECT timing pulses, you may need to farm that off to a separate, ultra-specialized program, or maybe even a kernel service (what OS are you running on?). That service can then ping you via a file descriptor or something (a queue, pipe, or equivalent), and you can simply respond to those. Otherwise, high precision timing is fundamentally difficult in any language. Few OSes today guarantee realtime performance.

True, but so long as the CPU load is <50% and you have a reasonable number of cores then you should be able to get the timing to +1ms/-1ms (at least on linux).

You could also set the nice level to be real-time for the thread that is critical.

Yeah, that’ll be true of a reasonably quiet Linux system with a single Python thread. But the above code (where it wastes a bunch of memory on a secondary thread) has delays far in excess of 1ms.

No it is true of busy system with many threads.
What matters when latency matters is that a system has spare resources to respond within the crisis time.

As a rule-of-thumb using less then 50% gives you such a system.
If course the code has to be designed to work well when the resources are available.

Sorry, what I meant is: It’s true of a system where the Python process in question has only a single thread. What you’re describing is the OS juggling things just fine, and that I absolutely believe. (On my own testing, I was able to get stable results even with eight busy-waiting threads on a four-core-eight-thread CPU.) But when there are multiple Python threads doing work, it’s very easy to disrupt that.

this is a single core, single CPU SoC. it is a dedicated unit. there is literally nothing else running on this machine but this application. so even if i hit 75%, i’m not going to worry. resting state is < 2% of CPU. there’s also 9.5GB RAM available.

so. having said that. i just got in after braving the 30cm of snow that dumped last night and will get into your post shortly. i just wanted to toss this up there and a massive THANK YOU for helping me out. i come from a webDev/analysis background, so this is new to me; particularly in Python. working in JS, i never had to worry about threading or the event loop, per se. it was pretty straightforward. plus, never had such tight constraints either (system or speed). so thanks for getting me going in the right direction. :slight_smile:

more later! :+1:

EDIT: running a Buildroot Linux on an ARM v7l chip, btw. and, yeah, i agree: precision timing is a PITA. i am fully willing to accept this might be nuts to even attempt, but that’s what i get paid to do: this kind of fun stuff. :smiley:

okay. had some time to dig in and review. this was extremely helpful. thank you.

my question now is, to add two more threads, i would simply duplicate thread with calls to the other two functions (with attendant delays) and i’ll be away to the races, yes?

next, i’m assuming factor is just representing a load-factor on the function to expand/contract it’s processing time/load, yes?

can i get information into, and out of, these threads? for example, the critical function needs to update a global counter. the threaded write function needs to then send that data when it runs. this is where my head collapsed trying to see if this could be done. it only makes sense that it can be, but no tutorial i found (that didn’t cost a ton) even brushed by this notion.

finally, can i cancel a running thread, or let it run out, in the event it is no longer needed? some of our use-cases require the toggling of the high-frequency sampling capability, like when the sensor is disconnected, changed, or removed from the platform. my presumption is ‘yes’ since i have read that you can name your threads but there’s so much crap out there, i wanted to confirm.

i’m going to play with this code this morning and upload what i’ve been tinkering with in a few hours so you know i’m not just couch surfing here.

okay. here’s what i’m playing with and it seems to be delivering what i’m aiming for. any recommendations or critiques are welcome.

import time
import threading

tickCount = 0
payload = []

def readMulti(factor):
	global payload
	print("Working!")
	for i in range(factor):
		temp = {'ping': ("·" * (i+1))}
		payload.append(temp)
		print(f'payload: {payload}')
	print("readMulti working.")

def readMulti_thread(delay, factor):
	while True:
		time.sleep(delay)
		readMulti(factor)

def writeMQTT(factor):
	global tickCount
	global payload
	print("Working!")
	print(f'----------------------------> final tickCount: {tickCount}')
	tickCount = 0
	print(f'----------------------------> final payload: {payload}')
	payload = []
	print("writeMQTT Done working.")

def writeMQTT_thread(delay, factor):
	while True:
		time.sleep(delay)
		writeMQTT(factor)

def readDIN(factor):
	global tickCount
	print("Working!")
	for _ in range(factor):
		tickCount += 1
		print(f'tickCount: {tickCount}')
	print("readDIN Done working.")

def readDIN_thread(delay, factor):
	while True:
		time.sleep(delay)
		readDIN(factor)

threading.Thread(target=readMulti_thread, args=(1.0, 5)).start()
threading.Thread(target=writeMQTT_thread, args=(5.0, 8)).start()
threading.Thread(target=readDIN_thread, args=(0.010, 2)).start()

prev = time.time()
while True:
	time.sleep(0.010)
	next = time.time()
	if next - prev > 0.015: print("LATE:", next - prev)
	if next - prev < 0.005: print("EARLY:", next - prev)
	prev = next

instead of making the threads sleep, i simulated similar work instead. the readMulti is quite a bit more involved, but the end result is it spitting out a list of dicts. same with the writeMQTT. there’s logic and response handling in there as well. finally, i’m making the assumption this will run well alongside the MQTT client.loop_start() functionality required for the MQTT aspect to function.

i should note that this test, running on the platform itself, is not triggering any LATE or EARLY messages, which, so far, i take as a good sign.

so now i just have to figure out how to terminate the readDIN thread when the MQTT message arrives to kill it… and restart it when another MQTT message arrives with the green light.

Threads (Python thread objects) cannot be cancelled in Python. There has been some talk about this (for instance Making it simpler to gracefully exit threads). Timers can be cancelled and concurrent.futures.Future objects (though that’s not guaranteed to succeed), but Thread objects basically only have start/run/join.

But your code and the code in the link is not using any threading, but asyncio. Asyncio is useful for IO-bound processes, but gives you even less control over timing then threading (since it really depends on which futures are awaitable at runtime and where the awaits and io are called - even in the simplest scenarios that quickly becomes intractable for human programmers). However, you are also not really using asyncio, you only use the eventloop… without any async functions.

I would suggest not using asyncio at all, but using a threadpool with three threads. For each of them I would profile the code first separately, to ensure that in the worst case scenario it’s all working.

The “resolution-perfect clock function” from stack overlow is imo kind of nonsense for your usecase (as @Rosuav also pointed out). And asyncio is not necessary to get “good-enough” timings (which is the only sensible thing to aim for). You might just as well use threading + time.

But why use any timers at all? If the critical function and the other functions/reporting each run in their own thread (or in their own process), and if you would always just process everything as fast as possible as soon as data becomes available, would this cause any problems? What problem are you trying to solve by adding a timer or scheduler for the critical function (or for that matter for the reporting - which I assume is just logging or DB access?)

data usage vs. accuracy for billing. the device will be remotely located and we don’t need it chewing through the data cap. so while it is only reporting every 5 seconds, it is reporting the timestamped message data for tracking, and (most importantly) billing (the tickCount). while it may not seem like much, when you have 30 of these devices, the data scales pretty quickly if you don’t manage the reporting interval. in other words, we need to sample as fast as we can, but only report the accumulated data at specified intervals.

the reason for the tight sample loop is the need to capture a 20ms wide pulse that comes 0 to 3 times a second. because of that short pulse width, the timing has to be tight so it doesn’t get missed when it comes.

i am completely open to more efficient ways of doing this. i am new to threading in Python, so have no favourite method save one that works and is comprehensible and expandable. what was throwing me was how to handle this in an event loop as i understood it. turns out i didn’t understand it well enough. :stuck_out_tongue: but tinkering with this working code has helped enormously. so, again, if there is a better way, i’m all for it. i’ll start reading up on futures and threadpools. if you have a short example, i’d be most grateful.

Somehow my earlier post and your post crossed - so I only saw your last post after submitting mine… :slight_smile:

Don’t use global objects with threading - that’s going to create lots of trouble. Use semaphores.
Instead of killing the readDIN thread - why not make it handle some kind of “poison” or “terminate” message on which it can join and exit normally?

heh. yeah. as i hit REPLY, i saw you replying as well. race condition: GO! :smiley:

i also noted i didn’t answer one of you last questions. because this is a single-core-single-cpu, i want to thread as much as possible because everything basically orbits that tickCount accuracy, which directly relates to billing. and since it has to basically run constantly, albeit I/O bound, i wanted everything else to basically dance between those gaps. since cash is on the line, that trumps everything else. does that help?

poison/terminate code: i was actually toying with this very idea. instead of a while True, maybe a while goState that gets flipped to False at some point by another function, terminating that base loop?

so what i’m wanting to research is concurrent.futures and that ball of wax, correct? forgive my newbishness… it appears there are three ways to handle this stuff and there’s a lot of crosstalk (to my mind) as to what goes where. since i didn’t import or use asyncio i wasn’t aware that’s what i was even doing!

What is that? How is that presented to your program? Are you basically polling for the presence of that pulse? If so, isn’t there a way to not poll? Polling in a (pretty) tight loop never seems a good idea… (On the one hand you’re wasting resources, and on the other hand - what happens when two events happen inside the same timer interval?)

so what i’m wanting to research is concurrent.futures and that ball of wax, correct?

Indeed. This should give you maximal control and will make it easy to either choose a ThreadPoolExecutor or a ProcessPoolExecutor. There is some sample code in the Python docs for multiprocessing.

can’t do multi-processing. remember: single-core-single-cpu. so ThreadPool it must be.

on the device there is a digital input monitoring a voltage. when voltage goes high, that’s a pulse. and it only lasts 20ms; the pulse width. that pulse represents a measurement. the only way to capture it is basically to look at the thing often enough to catch that spike. fun, right? it’s like plopping a kid on a stool and having him shout ‘ON’ when the light flips… basically how PLCs handle sensors. in this use-case, we are emulating that behaviour on an ARMv7l chip so it has to be done in a single thread because there’s literally no other way to do it.

the need to terminate threads stem from use-cases where the sensor is either being replaced, is not deployed, or is added at a later date. no sense hammering away when there’s nothing there. and that functionality needs to be toggle-able remotely via MQTT. it’s a curious spec with some challenging limitations and requirements. :slight_smile:

peeked at semaphores… sweeeeet. :star_struck:

can’t tell you how much this has helped me understand how all this is working. you guys connected a lot of amorphous dots into a cohesive, if wobbly, understanding. thank you so much.

You can still run multiple processes, and it might still help with performance if those processes are IO-bound. Though a thread pool would probably be just as performant in this case.

on the device there is a digital input monitoring a voltage. when voltage goes high, that’s a pulse. and it only lasts 20ms; the pulse width. that pulse represents a measurement. the only way to capture it is basically to look at the thing often enough to catch that spike. fun, right?

There are other ways - better ways (imo) that do not require polling and thus might also not require using a separate thread just to monitor the device (!). It does depend on exactly how you are able to interface with that device, and might require some (not too difficult) extra C-level code - basically to implement something like a ‘select’ call (based on GPIO registers for instance). There may be Python libraries available for this as well.
In terms of high-level design - polling kind of works (with caveats and implicit assumptions about the data stream), but is definitely not as good as an event-driven design in this case. The general code modularity and robustness may also be improved by making your program really event-driven.

300% agree. i just don’t have the time do develop the chops to do it. :stuck_out_tongue: but i do have it slotted for down-the-road because i am always trying to be better.

i’ve done some fairly straightforward testing, and when i remove the global and try to just use semaphores, it tanks. hard. so i’m not doing it correctly. here’s a snippet:

tickCount = 0
tickSempahore = threading.Semaphore(1)

def readDIN(factor):
	# global tickCount
	print("Working!")
	for _ in range(factor):
		with tickSempahore:
			tickCount += 1
			print(f'tickCount: {tickCount}')
	print("readDIN Done working.")

def readDIN_thread(delay, factor):
	while True:
		time.sleep(delay)
		readDIN(factor)

threading.Thread(target=readDIN_thread, args=(0.010, 2)).start()
#  blah blah blah

i’ll continue looking into threadpools and i absolutely have to find a good ‘poison’ for the two sampling threads. personally given the use-case here, i don’t see the problem with using a global variable. it gets passed around through all the MQTT functions anyway and they don’t really interlock. but there is a point where they might and so it’s worth trying to figure the semaphore concept out. if you can point me in the right direction where i went wrong, that’d be great.

and thanks ever more. i know how busy everyone is and so i truly appreciate the time taken to help.

Can you configure the sense line to be edge-triggered?
If so it easier on your timings.