Help Needed: Preventing Duplicate Emails in Python Script with Data Merging and Tracking

`import os
import oracledb
import pandas as pd
import win32com.client as win32
import schedule
import time
import openpyxl
from openpyxl import load_workbook
from datetime import datetime
from tqdm import tqdm

# Main script logic to run the entire workflow every 3 mins
def run_script():
    

    # CSV file for tracking sent emails
    tracking_file = 'email_tracking.csv'

    # Excel file for logging script runs
    log_file = 'script_run_log.xlsx'

    # Step 2: Create or open an Excel log file to track script runs
    def log_script_run():
        if not os.path.exists(log_file):
            workbook = openpyxl.Workbook()
            sheet = workbook.active
            sheet.title = "Log"
            sheet.append(["Timestamp", "Run Count"])
            workbook.save(log_file)
        
        workbook = load_workbook(log_file)
        if "Log" not in workbook.sheetnames:
            sheet = workbook.create_sheet(title="Log")
            sheet.append(["Timestamp", "Run Count"])
        else:
            sheet = workbook["Log"]
        timestamp = pd.Timestamp.now()
        sheet.append([timestamp, 1])
        workbook.save(log_file)

    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"[{current_time}] Starting the script....")

    # Step 3: Read existing email tracking data
    if os.path.exists(tracking_file):
        email_tracking_data = pd.read_csv(tracking_file)
        email_tracking_data['TC_ORDER_ID'] = email_tracking_data['TC_ORDER_ID'].astype(str)
    else:
        email_tracking_data = pd.DataFrame(columns=['TC_ORDER_ID', 'EMAIL_SENT'])

    # Step 4: SQL Queries
    query1 = """
SELECT 
*
FROM (
    SELECT 
        ord.ref_field_10 AS SHIPMENT,
        SUBSTR(oli.ref_field1, 1, 10) AS TC_ORDER_ID,
        dos.description as ord_status,
        oli.ref_num1 AS ORDER_LINE_NBR,
        oli.item_name AS ITEM_NAME,
        ord.do_status AS DO_STATUS,
        MAX(oli.orig_order_qty) AS ORIG_QTY,
        SUM(nvl(oli.shipped_qty,0)) AS QTY_SHIPPED,
        (MAX(oli.orig_order_qty) - SUM(oli.shipped_qty)) AS QTY_CANCELLED,
        SUM(CASE WHEN oli.tc_order_line_id = oli.line_item_id THEN oli.shipped_qty ELSE 0 END) AS QTY_CHASED,
        oli.ref_field10 AS REF_FIELD10,
        oli.ext_purchase_order AS EXT_PURCHASE_ORDER,
        ord.created_dttm AS CREATED_DTTM,
        ord.ref_field_8 AS SERVICE_LEVEL
    FROM orders ord
    JOIN order_line_item oli ON oli.order_id = ord.order_id
    JOIN do_status dos on ord.do_status = dos.order_status
    WHERE ord.created_dttm >= (sysdate-1)
      AND ord.do_status IN ('190', '200')
      AND oli.do_dtl_status IN ('190', '200')
      AND ord.ref_field_8 NOT IN ('Scrap Order', 'Repair Order')
      AND oli.ref_field6 NOT LIKE '%12'
    GROUP BY 
        ord.ref_field_10,
        SUBSTR(oli.ref_field1, 1, 10),
        oli.ref_num1,
        oli.item_name,
        ord.do_status,
        ord.created_dttm,
        oli.ref_field10,
        oli.ext_purchase_order,
        ord.ref_field_8,
        dos.description
) sub_query_alias
WHERE qty_shipped < orig_qty 
ORDER BY TC_ORDER_ID, ORDER_LINE_NBR ASC
"""
    query2 = """
SELECT * FROM
(
    SELECT DISTINCT
        oh.extn_reference_1 || '-' || TRIM(ol.item_id) AS "Key",
        TO_CHAR(NEW_TIME(oh.createts, 'GMT', 'CST'), 'MM/DD/YYYY HH24:MI') AS "Created CST",
        TRIM(ol.shipnode_key) AS "WHS",
        oh.level_of_service AS "Service Level",
        oh.extn_reference_1 AS "Order",
        TRIM(ol.item_id) AS "Item",
        ol.ordered_qty AS "Qty",
        ol.product_class AS "Product Status",
        ol.sched_failure_reason_code AS "Reason",
        orl.status AS "Status",
        oh.order_header_key,
        ref.table_key,
        CASE 
            WHEN ol.ordered_qty = 0 THEN 'Cancelled' 
            ELSE sts.description 
        END AS "Status Description",
        ROW_NUMBER() OVER (PARTITION BY oh.extn_reference_1 || '-' || TRIM(ol.item_id) ORDER BY orl.status_date DESC) AS rn,
        oh.extn_reference_6,
        oh.extn_reference_2,
        oh.extn_reference_3,
        oh.extn_reference_4 AS "PO",
        oh.extn_reference_5
    FROM suomsprod.yfs_order_header oh
    JOIN suomsprod.yfs_order_line ol
        ON oh.order_header_key = ol.order_header_key
    JOIN suomsprod.yfs_order_release_status orl
        ON oh.order_header_key = orl.order_header_key
    JOIN suomsprod.yfs_status sts
        ON orl.status = sts.status
        AND sts.process_type_key = 'ORDER_FULFILLMENT'
    LEFT JOIN SUOMSPROD.YFS_REFERENCE_TABLE ref
        ON ref.table_key = oh.order_header_key
    WHERE 
        ol.shipnode_key = 'USMEM05'
        AND TRUNC(oh.createts) >= TRUNC(SYSDATE - 7)
        AND oh.enterprise_key = 'SIEMENS'
        AND ol.product_class IN ('GNC', 'GBC', 'GWC')
) 
WHERE rn = 1
UNION
SELECT * FROM
(
    SELECT DISTINCT
        oh.extn_reference_1 || '-' || TRIM(ol.item_id) AS "Key",
        TO_CHAR(NEW_TIME(oh.createts, 'GMT', 'CST'), 'MM/DD/YYYY HH24:MI') AS "Created CST",
        TRIM(ol.shipnode_key) AS "WHS",
        oh.level_of_service AS "Service Level",
        oh.extn_reference_1 AS "Order",
        TRIM(ol.item_id) AS "Item",
        ol.ordered_qty AS "Qty",
        ol.product_class AS "Product Status",
        ol.sched_failure_reason_code AS "Reason",
        orl.status AS "Status",
        oh.order_header_key,
        ref.table_key,
        CASE 
            WHEN ol.ordered_qty = 0 THEN 'Cancelled' 
            ELSE sts.description 
        END AS "Status Description",
        ROW_NUMBER() OVER (PARTITION BY oh.extn_reference_1 || '-' || TRIM(ol.item_id) ORDER BY orl.status_date DESC) AS rn,
        oh.extn_reference_6,
        oh.extn_reference_2,
        oh.extn_reference_3,
        oh.extn_reference_4 AS "PO",
        oh.extn_reference_5
    FROM suomsprod.yfs_order_header oh
    JOIN suomsprod.yfs_order_line ol
        ON oh.order_header_key = ol.order_header_key
    JOIN suomsprod.yfs_order_release_status orl
        ON oh.order_header_key = orl.order_header_key
    JOIN suomsprod.yfs_status sts
        ON orl.status = sts.status
        AND sts.process_type_key = 'ORDER_FULFILLMENT'
    LEFT JOIN SUOMSPROD.YFS_REFERENCE_TABLE ref
        ON ref.table_key = oh.order_header_key
    WHERE 
        ol.shipnode_key IS NULL
        AND TRUNC(oh.createts) >= TRUNC(SYSDATE - 7)
        AND oh.enterprise_key = 'SIEMENS'
        AND ol.product_class IN ('GNC', 'GBC', 'GWC')
) 
WHERE rn = 1
"""

    # Step 5: Fetch function
    def fetch_query_data(db_config, query):
        try:
            with oracledb.connect(user=db_config['user'], password=db_config['password'], dsn=db_config['dsn']) as connection:
                cursor = connection.cursor()
                cursor.execute(query)
                columns = [col[0].upper() for col in cursor.description]  # Convert column names to uppercase
                data = cursor.fetchall()
                return pd.DataFrame(data, columns=columns)
        except Exception as e:
            print(f"Error fetching data from database: {e}")
            exit()

    # Step 6: Fetch data
    print("[Step 1/4] Fetching Data...")

    start_time = time.time()
    query1_data = fetch_query_data(db1_config, query1)
    print(f"Query 1 completed. Rows fetched: {len(query1_data)}. Time taken: {time.time() - start_time:.2f} seconds")

    start_time = time.time()
    query2_data = fetch_query_data(db2_config, query2)
    print(f"Query 2 completed. Rows fetched: {len(query2_data)}. Time taken: {time.time() - start_time:.2f} seconds")

    # Step 7: Combined Matching
    print("[Step 2/4] Merging Data...")
    merged_data = query1_data.merge(
        query2_data[['WHS', 'PO', 'ORDER_HEADER_KEY', 'ITEM']],
        left_on=['EXT_PURCHASE_ORDER', 'ITEM_NAME'],
        right_on=['ORDER_HEADER_KEY', 'ITEM'],
        how='left',
        suffixes=('_q1', '_q2')
    )
    print("Merge completed.")

    # Add EMAIL_SENT column if it does not exist
    if 'EMAIL_SENT' not in merged_data.columns:
        merged_data['EMAIL_SENT'] = False

    # Step 8: Validate Columns for Emails
    print("[Step 3/4] Validating Columns for Emails...")
    required_columns = {'SERVICE_LEVEL', 'SHIPMENT', 'TC_ORDER_ID', 'PO'}
    missing_columns = required_columns - set(merged_data.columns)
    if missing_columns:
        print(f"Error: Missing required columns for emails: {missing_columns}")
        return

    # Step 9: Filter data for which email has not been sent
    new_data = merged_data[~merged_data['TC_ORDER_ID'].isin(email_tracking_data['TC_ORDER_ID'])]

    if not new_data.empty:
        outlook = win32.Dispatch('outlook.application')

        print("[Step 4/4] Sending Emails...")
        for index, row in tqdm(new_data.iterrows(), total=new_data.shape[0], desc="Sending Emails"):
            try:
                # Determine email subject and recipients based on WHS value
                if row['WHS'] == 'USMEM05':
                    prefix = "Hard"
                    to_email = "sean.lee2@xxxx"
                    cc_email = ""
                elif pd.isna(row['WHS']):
                    prefix = "Soft"
                    to_email = "sean.lee2@xxxx"
                    cc_email = ""
                else:
                    prefix = ""
                    to_email = "sean.lee2@xxxx"
                    cc_email = ""

                subject = f"{prefix} Cancellation for {row['TC_ORDER_ID']}"

                # Create an HTML table for the email body with updated header
                body = f"""
                <html>
                <body>
                    <p>Hello,</p>
                    <p>Here is the cancellation information:</p>
                    <table border="1" style="border-collapse: collapse; width: 50%;">
                        <tr>
                            <th style="background-color: #f2f2f2; text-align: left;">Field</th>
                            <th style="background-color: #f2f2f2; text-align: left;">Value</th>
                        </tr>
                        <tr>
                            <td><b>Service Level</b></td>
                            <td>{row['SERVICE_LEVEL']}</td>
                        </tr>
                        <tr>
                            <td><b>Shipment</b></td>
                            <td>{row['SHIPMENT']}</td>
                        </tr>
                        <tr>
                            <td><b>Customer Order</b></td>
                            <td>{row['TC_ORDER_ID']}</td>
                        </tr>
                        <tr>
                            <td><b>PO</b></td>
                            <td>{row['PO']}</td>
                        </tr>
                        <tr>
                            <td><b>Material Number</b></td>
                            <td>{row['ITEM_NAME']}</td>
                        </tr>
                        <tr>
                            <td><b>Order Line Number</b></td>
                            <td>{int(row['ORDER_LINE_NBR'])}</td>
                        </tr>
                        <tr>
                            <td><b>Quantity Cancelled</b></td>
                            <td>{row['QTY_CANCELLED']}</td>
                        </tr>
                    </table>
                    <p>Best regards,</p>
                </body>
                </html>
                """

                # Sending email via Outlook
                mail = outlook.CreateItem(0)
                mail.To = to_email
                if cc_email:
                    mail.CC = cc_email
                mail.Subject = subject
                mail.HTMLBody = body  # Use HTMLBody to send HTML content
                mail.Send()

                # Mark the email as sent in DataFrame
                new_data.at[index, 'EMAIL_SENT'] = True
                print(f"Email sent for record: {row['TC_ORDER_ID']}")

            except Exception as e:
                print(f"Error sending email for Customer Order {row['TC_ORDER_ID']}: {e}")

        # Step 10: Save the updated DataFrame to track sent emails
        email_tracking_data = pd.concat([email_tracking_data, new_data[['TC_ORDER_ID', 'EMAIL_SENT']]])
        email_tracking_data.to_csv(tracking_file, index=False)
        print("Updated email tracking saved to 'email_tracking.csv'.")
    else:
        print("No new emails to send.")

    # Step 11: Log the script run
    log_script_run()

# Run the script immediately
run_script()

# Schedule the script to run every 3 minutes
schedule.every(3).minutes.do(run_script)

# Keep the script running
print("Scheduler started. Press Ctrl+C to stop")
while True:
    schedule.run_pending()
    time.sleep(1)`

Hi everyone,

I’m working on a Python script that fetches data from two Oracle databases, merges the data, and sends emails based on the merged results. My goal is to ensure that no duplicate emails are sent. Despite various attempts, I’m still encountering issues with duplicate emails being sent.

Here’s a summary of my approach:

  1. Fetch data from two Oracle databases.
  2. Merge the data based on certain keys.
  3. Use a CSV file to track which TC_ORDER_ID values have already been processed.
  4. Send emails only for new records that have not been processed before.
  5. Update the tracking file with newly processed records.

I’ve included progress bars and detailed logging for better monitoring. However, duplicates still occur, and I’m not sure where the issue lies.

Hello,

Rather than setting up your code to do this automatically every three minutes (and monitoring it via logging), for testing purposes, can you set it up such that you can perform these actions manually and then follow what is being fetched and follow that data through the checking process. In essence stepping through the fetching and checking the data that is being compared.

From a quick review of your code, it makes two comparisons.
a. Checks if the merged data is in email tracking data.
b. If it is not, then send the email. Else, do not.

To start, first check the following:

  1. Check if the data that being compared is in the correct or expected format. You can only compare like data. If not, then make correction here.
  2. If they are in the correct format, is the output of the comparison code providing the expected outcome. If not, then make correction here.

From your script, this is the code that does the checking. I would zoom in on this part of the code:

    # Step 9: Filter data for which email has not been sent
    new_data = merged_data[~merged_data['TC_ORDER_ID'].isin(email_tracking_data['TC_ORDER_ID'])]

    if not new_data.empty:
    # send email 

Based on the two inputs that are being compared, is the value of new_data.empty the expected value?

I would in this general area.

Almost forgot - also make sure that your .csv file is logging the emails that have been sent correctly and that the read command to iterate through the sent emails list is being correctly performed.