Hello,
I’m a basic to intermediate Python programmer but very experienced in Unix/Linux. This is my first attempt at trying to implement a producer-consumer model of queue processing that is triggered by watchdog observer events.
I have written the following code and it is working without any issues, but this is my first time doing something like this. I’m asking the experts here if they see any glaring mistakes with event handling, loading the queue and locking threads?
Thanks in advance!
Best,
-PE
class MyEventHandler(FileSystemEventHandler):
def __init__(self, queue):
super().__init__()
self.queue = queue
def on_created(self, event):
if not event.is_directory:
logger.info(f"Detected new file: {event.src_path}. Adding to queue for processing.")
self.queue.put(event.src_path)
def consumer(myqueue, processing_lock):
while True:
try:
qfile = myqueue.get()
if qfile is None: # Sentinel to stop the consumer
break
with processing_lock:
logger.info(f"In consumer(), processing file: {qfile}.")
try:
if verify_something(qfile):
# if cert was already created
if config.file.is_file():
do_time_consuming_task()
else: # its a new endpoint we do EST and get cert
do_very_time_consuming_task()
else:
return False
except Exception as e:
logger.error(f"Error processing {qfile}: {e}")
myqueue.task_done()
except queue.Empty:
logger.info(f"Finished processing file: {qfile}....")
logger.info("No items in queue, continue waiting....")
pass
def process_queue():
event_queue = queue.Queue()
processing_lock = threading.Lock()
event_handler = MyEventHandler(event_queue)
observer = Observer()
observer.schedule(event_handler, config.watchdir, recursive=True) # Set recursive=True for subdirectories
observer.start()
# Start a consumer thread to process files from the queue
consumer_thread = threading.Thread(target=consumer, args=(event_queue, processing_lock), daemon=True)
consumer_thread.start()
logger.info("Started processing files at startup.")
# Reset endpoint array
config.files.clear()
# Process existing files
lsdir(config.watchdir, ".txt")
for newfile in config.files:
logger.info(f"Processing file is: {newfile}")
filepath = Path(newfile)
event = FileCreatedEvent(filepath)
event_handler.on_created(event) # Manually call the handler
logger.info(f"Finished processing: {newfile}")
# Reset endpoint array
config.files.clear()
logger.info("Completed processing files at startup.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Caught keyboard interrupt, observer and consumer stopped.")
observer.stop()
event_queue.put(None) # Signal the consumer to stop
observer.join()
consumer_thread.join()
event_queue.join()
logger.info("Observer and consumer stopped.")
</>