Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
3.9 kB
2
Indexable
Never
import pandas as pd
import mysql.connector
import json
from base64 import b64decode as base_b64decode
import logging
import datetime
import os
from sqlalchemy import create_engine as sqlalchemy_create_engine
from pandas import read_sql as pd_read_sql
import sys

# ... (all your initial setup and variable definitions)

# Set up logging
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
query_count = 0

# Open SQL log file for writing
sql_log_file = open(sql_log_file, "w")

# ... (all your utility functions)

def process_release_table(eprm_table_name, schema_tar, releaseId, opId, buId):
    query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
    try:
        cursor_tar.execute(query)
        result = cursor_tar.fetchone()
        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
        query_info += f"-- #Query: Result:{result[0]}\n"
        query_count += 1
        query_info += query + ";\n"
        write_sql(query_info)
        print(f"Count for {eprm_table_name}: {result[0]} (release)")
        logging.info(f"Count for {eprm_table_name}: {result[0]} (release)")
    except mysql.connector.Error as err:
        print(f"Error occurred while executing the query: {query}: {err}")
        logging.info(f"Error occurred while executing the query: {query}: {err}")

def process_audit_table(eprm_table_name, eprm_table_col_pk, schema_tar, releaseId, opId, buId):
    query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE ({eprm_table_col_pk}) IN (SELECT entity_ref_nbr FROM {schema_tar}.release_entity_inst_map WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}')"
    try:
        cursor_tar.execute(query)
        result = cursor_tar.fetchone()
        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
        query_info += f"-- #Query: Result:{result[0]}\n"
        query_count += 1
        query_info += query + ";\n"
        write_sql(query_info)
        print(f"Count for {eprm_table_name}: {result[0]} (audit)")
        logging.info(f"Count for {eprm_table_name}: {result[0]} (audit)")
    except mysql.connector.Error as err:
        print(f"Error occurred while executing the query: {query}: {err}")
        logging.info(f"Error occurred while executing the query: {query}: {err}")

# ... (your main code)

try:
    # ... (your main code)
    # Read JSON data from file

    # ... (your main code)

    try:
        # ... (your main code)
        # Connect to PPM_PC database

        # ... (your main code)
        # Fetch data from the etl_ppm_replication_master table

        # ... (your main code)

        for _, row in filtered_df.iterrows():
            eprm_table_name = row['eprm_table_name']
            eprm_table_col_pk = row['eprm_table_col_pk']
            eprm_table_type = row['eprm_table_type']
            eprm_seq_nbr = row['eprm_seq_nbr']

            if eprm_table_type == 'RELEASE':
                process_release_table(eprm_table_name, schema_tar, releaseId, opId, buId)

            if eprm_table_type == 'AUDIT':
                process_audit_table(eprm_table_name, eprm_table_col_pk, schema_tar, releaseId, opId, buId)

            if eprm_table_type == 'MASTER':
                # ... (your existing MASTER logic)

            elif eprm_table_type == 'MASTER-CHILD':
                # ... (your existing MASTER-CHILD logic)

            elif eprm_table_type == 'AUDIT-CHILD':
                # ... (your existing AUDIT-CHILD logic)

    except Exception as e:
        print("An Error occurred:", str(e))
except Exception as e:
    print("An Error occurred:", str(e))

# Close the SQL log file
sql_log_file.close()