Rookie Help Needed for Queue Processing

Hello,

I’m a basic to intermediate Python programmer but very experienced in Unix/Linux. This is my first attempt at trying to implement a producer-consumer model of queue processing that is triggered by watchdog observer events.

I have written the following code and it is working without any issues, but this is my first time doing something like this. I’m asking the experts here if they see any glaring mistakes with event handling, loading the queue and locking threads?

Thanks in advance!

Best,

-PE

class MyEventHandler(FileSystemEventHandler):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def on_created(self, event):
        if not event.is_directory:
            logger.info(f"Detected new file: {event.src_path}. Adding to queue for processing.")
            self.queue.put(event.src_path)


def consumer(myqueue, processing_lock):
  while True:
     try:
        qfile = myqueue.get()
        if qfile is None:  # Sentinel to stop the consumer
            break

        with processing_lock:
           logger.info(f"In consumer(), processing file: {qfile}.")
           try:
              if verify_something(qfile):

                 # if cert was already created
                 if config.file.is_file():

                     do_time_consuming_task()

                 else: # its a new endpoint we do EST and get cert

                     do_very_time_consuming_task()
              else:
                   return False
           except Exception as e:
                  logger.error(f"Error processing {qfile}: {e}")
           myqueue.task_done()

     except queue.Empty:
        logger.info(f"Finished processing file: {qfile}....")
        logger.info("No items in queue, continue waiting....")
        pass


def process_queue():
    event_queue = queue.Queue()
    processing_lock = threading.Lock()

    event_handler = MyEventHandler(event_queue)
    observer = Observer()
    observer.schedule(event_handler, config.watchdir, recursive=True) # Set recursive=True for subdirectories

    observer.start()

    # Start a consumer thread to process files from the queue
    consumer_thread = threading.Thread(target=consumer, args=(event_queue, processing_lock), daemon=True)
    consumer_thread.start()

    logger.info("Started processing files at startup.")

    # Reset endpoint array
    config.files.clear()

    # Process existing files
    lsdir(config.watchdir, ".txt")

    for newfile in config.files:
       logger.info(f"Processing file is: {newfile}")
       filepath = Path(newfile)
       event = FileCreatedEvent(filepath)
       event_handler.on_created(event) # Manually call the handler
       logger.info(f"Finished processing: {newfile}")

    # Reset endpoint array
    config.files.clear()

    logger.info("Completed processing files at startup.")

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Caught keyboard interrupt, observer and consumer stopped.")
        observer.stop()
        event_queue.put(None)  # Signal the consumer to stop

    observer.join()
    consumer_thread.join()
    event_queue.join()

    logger.info("Observer and consumer stopped.")
</>

Hello,

I am running into an issue, any time the event of a file getting adding to the directory happens, the on_create() method is not firing.

The logs say the files are added to the queue for processing but nothing happens afterwards. I’m not sure if on_create() is the correct method I should be using for this.

Any helps is greatly appreciated.

Best,

-PE

To be clear, it works if 1 or 2 files gets created in the directory simultaneously but when I tried creating 10 files at the same time, the queue is not processed.

Best,

-PE