import pandas as pd
import mysql.connector
import json
from base64 import b64decode as base_b64decode
import logging
import datetime
import os
from sqlalchemy import create_engine as sqlalchemy_create_engine
from pandas import read_sql as pd_read_sql
import sys
# ... (all your initial setup and variable definitions)
# Set up logging
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
query_count = 0
# Open SQL log file for writing
sql_log_file = open(sql_log_file, "w")
# ... (all your utility functions)
def process_release_table(eprm_table_name, schema_tar, releaseId, opId, buId):
query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
try:
cursor_tar.execute(query)
result = cursor_tar.fetchone()
query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
query_info += f"-- #Query: Result:{result[0]}\n"
query_count += 1
query_info += query + ";\n"
write_sql(query_info)
print(f"Count for {eprm_table_name}: {result[0]} (release)")
logging.info(f"Count for {eprm_table_name}: {result[0]} (release)")
except mysql.connector.Error as err:
print(f"Error occurred while executing the query: {query}: {err}")
logging.info(f"Error occurred while executing the query: {query}: {err}")
def process_audit_table(eprm_table_name, eprm_table_col_pk, schema_tar, releaseId, opId, buId):
query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE ({eprm_table_col_pk}) IN (SELECT entity_ref_nbr FROM {schema_tar}.release_entity_inst_map WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}')"
try:
cursor_tar.execute(query)
result = cursor_tar.fetchone()
query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
query_info += f"-- #Query: Result:{result[0]}\n"
query_count += 1
query_info += query + ";\n"
write_sql(query_info)
print(f"Count for {eprm_table_name}: {result[0]} (audit)")
logging.info(f"Count for {eprm_table_name}: {result[0]} (audit)")
except mysql.connector.Error as err:
print(f"Error occurred while executing the query: {query}: {err}")
logging.info(f"Error occurred while executing the query: {query}: {err}")
# ... (your main code)
try:
# ... (your main code)
# Read JSON data from file
# ... (your main code)
try:
# ... (your main code)
# Connect to PPM_PC database
# ... (your main code)
# Fetch data from the etl_ppm_replication_master table
# ... (your main code)
for _, row in filtered_df.iterrows():
eprm_table_name = row['eprm_table_name']
eprm_table_col_pk = row['eprm_table_col_pk']
eprm_table_type = row['eprm_table_type']
eprm_seq_nbr = row['eprm_seq_nbr']
if eprm_table_type == 'RELEASE':
process_release_table(eprm_table_name, schema_tar, releaseId, opId, buId)
if eprm_table_type == 'AUDIT':
process_audit_table(eprm_table_name, eprm_table_col_pk, schema_tar, releaseId, opId, buId)
if eprm_table_type == 'MASTER':
# ... (your existing MASTER logic)
elif eprm_table_type == 'MASTER-CHILD':
# ... (your existing MASTER-CHILD logic)
elif eprm_table_type == 'AUDIT-CHILD':
# ... (your existing AUDIT-CHILD logic)
except Exception as e:
print("An Error occurred:", str(e))
except Exception as e:
print("An Error occurred:", str(e))
# Close the SQL log file
sql_log_file.close()