`import os
import oracledb
import pandas as pd
import win32com.client as win32
import schedule
import time
import openpyxl
from openpyxl import load_workbook
from datetime import datetime
from tqdm import tqdm
# Main script logic to run the entire workflow every 3 mins
def run_script():
# CSV file for tracking sent emails
tracking_file = 'email_tracking.csv'
# Excel file for logging script runs
log_file = 'script_run_log.xlsx'
# Step 2: Create or open an Excel log file to track script runs
def log_script_run():
if not os.path.exists(log_file):
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "Log"
sheet.append(["Timestamp", "Run Count"])
workbook.save(log_file)
workbook = load_workbook(log_file)
if "Log" not in workbook.sheetnames:
sheet = workbook.create_sheet(title="Log")
sheet.append(["Timestamp", "Run Count"])
else:
sheet = workbook["Log"]
timestamp = pd.Timestamp.now()
sheet.append([timestamp, 1])
workbook.save(log_file)
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{current_time}] Starting the script....")
# Step 3: Read existing email tracking data
if os.path.exists(tracking_file):
email_tracking_data = pd.read_csv(tracking_file)
email_tracking_data['TC_ORDER_ID'] = email_tracking_data['TC_ORDER_ID'].astype(str)
else:
email_tracking_data = pd.DataFrame(columns=['TC_ORDER_ID', 'EMAIL_SENT'])
# Step 4: SQL Queries
query1 = """
SELECT
*
FROM (
SELECT
ord.ref_field_10 AS SHIPMENT,
SUBSTR(oli.ref_field1, 1, 10) AS TC_ORDER_ID,
dos.description as ord_status,
oli.ref_num1 AS ORDER_LINE_NBR,
oli.item_name AS ITEM_NAME,
ord.do_status AS DO_STATUS,
MAX(oli.orig_order_qty) AS ORIG_QTY,
SUM(nvl(oli.shipped_qty,0)) AS QTY_SHIPPED,
(MAX(oli.orig_order_qty) - SUM(oli.shipped_qty)) AS QTY_CANCELLED,
SUM(CASE WHEN oli.tc_order_line_id = oli.line_item_id THEN oli.shipped_qty ELSE 0 END) AS QTY_CHASED,
oli.ref_field10 AS REF_FIELD10,
oli.ext_purchase_order AS EXT_PURCHASE_ORDER,
ord.created_dttm AS CREATED_DTTM,
ord.ref_field_8 AS SERVICE_LEVEL
FROM orders ord
JOIN order_line_item oli ON oli.order_id = ord.order_id
JOIN do_status dos on ord.do_status = dos.order_status
WHERE ord.created_dttm >= (sysdate-1)
AND ord.do_status IN ('190', '200')
AND oli.do_dtl_status IN ('190', '200')
AND ord.ref_field_8 NOT IN ('Scrap Order', 'Repair Order')
AND oli.ref_field6 NOT LIKE '%12'
GROUP BY
ord.ref_field_10,
SUBSTR(oli.ref_field1, 1, 10),
oli.ref_num1,
oli.item_name,
ord.do_status,
ord.created_dttm,
oli.ref_field10,
oli.ext_purchase_order,
ord.ref_field_8,
dos.description
) sub_query_alias
WHERE qty_shipped < orig_qty
ORDER BY TC_ORDER_ID, ORDER_LINE_NBR ASC
"""
query2 = """
SELECT * FROM
(
SELECT DISTINCT
oh.extn_reference_1 || '-' || TRIM(ol.item_id) AS "Key",
TO_CHAR(NEW_TIME(oh.createts, 'GMT', 'CST'), 'MM/DD/YYYY HH24:MI') AS "Created CST",
TRIM(ol.shipnode_key) AS "WHS",
oh.level_of_service AS "Service Level",
oh.extn_reference_1 AS "Order",
TRIM(ol.item_id) AS "Item",
ol.ordered_qty AS "Qty",
ol.product_class AS "Product Status",
ol.sched_failure_reason_code AS "Reason",
orl.status AS "Status",
oh.order_header_key,
ref.table_key,
CASE
WHEN ol.ordered_qty = 0 THEN 'Cancelled'
ELSE sts.description
END AS "Status Description",
ROW_NUMBER() OVER (PARTITION BY oh.extn_reference_1 || '-' || TRIM(ol.item_id) ORDER BY orl.status_date DESC) AS rn,
oh.extn_reference_6,
oh.extn_reference_2,
oh.extn_reference_3,
oh.extn_reference_4 AS "PO",
oh.extn_reference_5
FROM suomsprod.yfs_order_header oh
JOIN suomsprod.yfs_order_line ol
ON oh.order_header_key = ol.order_header_key
JOIN suomsprod.yfs_order_release_status orl
ON oh.order_header_key = orl.order_header_key
JOIN suomsprod.yfs_status sts
ON orl.status = sts.status
AND sts.process_type_key = 'ORDER_FULFILLMENT'
LEFT JOIN SUOMSPROD.YFS_REFERENCE_TABLE ref
ON ref.table_key = oh.order_header_key
WHERE
ol.shipnode_key = 'USMEM05'
AND TRUNC(oh.createts) >= TRUNC(SYSDATE - 7)
AND oh.enterprise_key = 'SIEMENS'
AND ol.product_class IN ('GNC', 'GBC', 'GWC')
)
WHERE rn = 1
UNION
SELECT * FROM
(
SELECT DISTINCT
oh.extn_reference_1 || '-' || TRIM(ol.item_id) AS "Key",
TO_CHAR(NEW_TIME(oh.createts, 'GMT', 'CST'), 'MM/DD/YYYY HH24:MI') AS "Created CST",
TRIM(ol.shipnode_key) AS "WHS",
oh.level_of_service AS "Service Level",
oh.extn_reference_1 AS "Order",
TRIM(ol.item_id) AS "Item",
ol.ordered_qty AS "Qty",
ol.product_class AS "Product Status",
ol.sched_failure_reason_code AS "Reason",
orl.status AS "Status",
oh.order_header_key,
ref.table_key,
CASE
WHEN ol.ordered_qty = 0 THEN 'Cancelled'
ELSE sts.description
END AS "Status Description",
ROW_NUMBER() OVER (PARTITION BY oh.extn_reference_1 || '-' || TRIM(ol.item_id) ORDER BY orl.status_date DESC) AS rn,
oh.extn_reference_6,
oh.extn_reference_2,
oh.extn_reference_3,
oh.extn_reference_4 AS "PO",
oh.extn_reference_5
FROM suomsprod.yfs_order_header oh
JOIN suomsprod.yfs_order_line ol
ON oh.order_header_key = ol.order_header_key
JOIN suomsprod.yfs_order_release_status orl
ON oh.order_header_key = orl.order_header_key
JOIN suomsprod.yfs_status sts
ON orl.status = sts.status
AND sts.process_type_key = 'ORDER_FULFILLMENT'
LEFT JOIN SUOMSPROD.YFS_REFERENCE_TABLE ref
ON ref.table_key = oh.order_header_key
WHERE
ol.shipnode_key IS NULL
AND TRUNC(oh.createts) >= TRUNC(SYSDATE - 7)
AND oh.enterprise_key = 'SIEMENS'
AND ol.product_class IN ('GNC', 'GBC', 'GWC')
)
WHERE rn = 1
"""
# Step 5: Fetch function
def fetch_query_data(db_config, query):
try:
with oracledb.connect(user=db_config['user'], password=db_config['password'], dsn=db_config['dsn']) as connection:
cursor = connection.cursor()
cursor.execute(query)
columns = [col[0].upper() for col in cursor.description] # Convert column names to uppercase
data = cursor.fetchall()
return pd.DataFrame(data, columns=columns)
except Exception as e:
print(f"Error fetching data from database: {e}")
exit()
# Step 6: Fetch data
print("[Step 1/4] Fetching Data...")
start_time = time.time()
query1_data = fetch_query_data(db1_config, query1)
print(f"Query 1 completed. Rows fetched: {len(query1_data)}. Time taken: {time.time() - start_time:.2f} seconds")
start_time = time.time()
query2_data = fetch_query_data(db2_config, query2)
print(f"Query 2 completed. Rows fetched: {len(query2_data)}. Time taken: {time.time() - start_time:.2f} seconds")
# Step 7: Combined Matching
print("[Step 2/4] Merging Data...")
merged_data = query1_data.merge(
query2_data[['WHS', 'PO', 'ORDER_HEADER_KEY', 'ITEM']],
left_on=['EXT_PURCHASE_ORDER', 'ITEM_NAME'],
right_on=['ORDER_HEADER_KEY', 'ITEM'],
how='left',
suffixes=('_q1', '_q2')
)
print("Merge completed.")
# Add EMAIL_SENT column if it does not exist
if 'EMAIL_SENT' not in merged_data.columns:
merged_data['EMAIL_SENT'] = False
# Step 8: Validate Columns for Emails
print("[Step 3/4] Validating Columns for Emails...")
required_columns = {'SERVICE_LEVEL', 'SHIPMENT', 'TC_ORDER_ID', 'PO'}
missing_columns = required_columns - set(merged_data.columns)
if missing_columns:
print(f"Error: Missing required columns for emails: {missing_columns}")
return
# Step 9: Filter data for which email has not been sent
new_data = merged_data[~merged_data['TC_ORDER_ID'].isin(email_tracking_data['TC_ORDER_ID'])]
if not new_data.empty:
outlook = win32.Dispatch('outlook.application')
print("[Step 4/4] Sending Emails...")
for index, row in tqdm(new_data.iterrows(), total=new_data.shape[0], desc="Sending Emails"):
try:
# Determine email subject and recipients based on WHS value
if row['WHS'] == 'USMEM05':
prefix = "Hard"
to_email = "sean.lee2@xxxx"
cc_email = ""
elif pd.isna(row['WHS']):
prefix = "Soft"
to_email = "sean.lee2@xxxx"
cc_email = ""
else:
prefix = ""
to_email = "sean.lee2@xxxx"
cc_email = ""
subject = f"{prefix} Cancellation for {row['TC_ORDER_ID']}"
# Create an HTML table for the email body with updated header
body = f"""
<html>
<body>
<p>Hello,</p>
<p>Here is the cancellation information:</p>
<table border="1" style="border-collapse: collapse; width: 50%;">
<tr>
<th style="background-color: #f2f2f2; text-align: left;">Field</th>
<th style="background-color: #f2f2f2; text-align: left;">Value</th>
</tr>
<tr>
<td><b>Service Level</b></td>
<td>{row['SERVICE_LEVEL']}</td>
</tr>
<tr>
<td><b>Shipment</b></td>
<td>{row['SHIPMENT']}</td>
</tr>
<tr>
<td><b>Customer Order</b></td>
<td>{row['TC_ORDER_ID']}</td>
</tr>
<tr>
<td><b>PO</b></td>
<td>{row['PO']}</td>
</tr>
<tr>
<td><b>Material Number</b></td>
<td>{row['ITEM_NAME']}</td>
</tr>
<tr>
<td><b>Order Line Number</b></td>
<td>{int(row['ORDER_LINE_NBR'])}</td>
</tr>
<tr>
<td><b>Quantity Cancelled</b></td>
<td>{row['QTY_CANCELLED']}</td>
</tr>
</table>
<p>Best regards,</p>
</body>
</html>
"""
# Sending email via Outlook
mail = outlook.CreateItem(0)
mail.To = to_email
if cc_email:
mail.CC = cc_email
mail.Subject = subject
mail.HTMLBody = body # Use HTMLBody to send HTML content
mail.Send()
# Mark the email as sent in DataFrame
new_data.at[index, 'EMAIL_SENT'] = True
print(f"Email sent for record: {row['TC_ORDER_ID']}")
except Exception as e:
print(f"Error sending email for Customer Order {row['TC_ORDER_ID']}: {e}")
# Step 10: Save the updated DataFrame to track sent emails
email_tracking_data = pd.concat([email_tracking_data, new_data[['TC_ORDER_ID', 'EMAIL_SENT']]])
email_tracking_data.to_csv(tracking_file, index=False)
print("Updated email tracking saved to 'email_tracking.csv'.")
else:
print("No new emails to send.")
# Step 11: Log the script run
log_script_run()
# Run the script immediately
run_script()
# Schedule the script to run every 3 minutes
schedule.every(3).minutes.do(run_script)
# Keep the script running
print("Scheduler started. Press Ctrl+C to stop")
while True:
schedule.run_pending()
time.sleep(1)`
Hi everyone,
I’m working on a Python script that fetches data from two Oracle databases, merges the data, and sends emails based on the merged results. My goal is to ensure that no duplicate emails are sent. Despite various attempts, I’m still encountering issues with duplicate emails being sent.
Here’s a summary of my approach:
- Fetch data from two Oracle databases.
- Merge the data based on certain keys.
- Use a CSV file to track which
TC_ORDER_ID
values have already been processed. - Send emails only for new records that have not been processed before.
- Update the tracking file with newly processed records.
I’ve included progress bars and detailed logging for better monitoring. However, duplicates still occur, and I’m not sure where the issue lies.