Untitled
unknown
plain_text
a year ago
19 kB
2
Indexable
Never
import pandas as pd import json from base64 import b64decode as base_b64decode import logging from pandas import read_sql as pd_read_sql import sys from sqlalchemy import create_engine import argparse from io import TextIOBase as io_TextIOBase from json import load as json_load import os import datetime # define Python user-defined exceptions class Error(Exception): """Base class for other exceptions""" pass # define Python user-defined exceptions class ETL_PPM_REPLICATION_MASTER_ERROR(Error): pass class DB_CONNECTION_ERROR(Error): pass def is_datetime(value): try: # Attempt to parse the value as a datetime datetime.datetime.strptime(str(value), '%Y-%m-%d %H:%M:%S') return True except ValueError: return False # Function to write SQL query to the log file def write_sql(sql_log_file, query_info): sql_log_file.write(query_info) sql_log_file.write('\n') def setDbConnection(logging, json_data, serverInfo, encoding): from sqlalchemy import create_engine as sqlalchemy_create_engine from base64 import b64decode as base_b64decode import mysql.connector try: cnx = cursor = schema = db_type = None encrypt = json_data.get(serverInfo, {}).get('ENCRYPT') host = json_data.get(serverInfo, {}).get('DB_HOST') port = json_data.get(serverInfo, {}).get('DB_PORT') user = json_data.get(serverInfo, {}).get('DB_USER') db_type = json_data.get(serverInfo, {}).get('DB_TYPE') schema = json_data.get(serverInfo, {}).get('DB_SCHEMA') if encrypt == 'Y': password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get(serverInfo, {}).get('DB_PASSWORD') if db_type in ('MYSQL', 'MARIA'): connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user , password , host , port , schema)) elif db_type == 'ORACLE': connection_text = ('oracle://%s:%s@%s:%s/%s' % (user , password , host , port , json_data.get(serverInfo, {}).get('DB_SID') )) cnx = sqlalchemy_create_engine(connection_text , encoding=encoding) # ,fast_executemany=True # , connect_args={'connect_timeout': 600}) cursor = cnx.connect() logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}") # except mysql.connector.Error as dberr: # logging.error("DATABASE CONNECTION ERROR") # logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno))) # cnx = cursor = schema = None except Exception as dbexp: logging.error("DATABASE CONNECTION EXCEPTION") logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno))) cnx = cursor = schema = None return cnx, cursor, schema, db_type # def main(args, json_data, log_file, logging, encoding): def process_changes_only(pdb_ppm_sc, pdb_source, pdb_target, ps_rep_env, ps_op_id, ps_bu_id, ps_rep_job_id, pl_logging): try: pl_logging.info("Started") replicationJobId = args.replicationJobId return_flag = True STATUS = "Success" STATUS_MESSAGE = "Service Catalogue Replication Successful." failed_entities = [] opId = ps_op_id buId = ps_bu_id pl_logging.info("Variable declaration") total_events_logged = 0 source_read_count = 0 target_read_count = 0 inserted_count = 0 updated_count = 0 insert_failed_count = 0 update_failed_count = 0 query_count = 0 pl_logging.info("Master query") sql_log_file = open( f"/app/scripts/PPM_Release_Management/Service_Catalog_ETL/logs/{replicationJobId}_ppm_sc_changesonly.sql", "w") primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='SC' AND eprm_enabled_flg='Y'" write_sql(sql_log_file, f"-- ++++++++++++++++++++++++++++++++ Primary Query") write_sql(sql_log_file, primary_query) primary_df = pd_read_sql(primary_query, con=pdb_ppm_sc) for index, row in primary_df.iterrows(): # Fetch primary key column name and table name table_name = row['eprm_table_name'].lower() eprm_table_col_pk = row['eprm_table_col_pk'] pk = eprm_table_col_pk.lower() where_clause = ((row['eprm_join_cols_reim'] % (ps_op_id, ps_bu_id))) query_count += 1 table_alias = row['eprm_table_alias'] write_sql(sql_log_file, f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {pk} | {table_name}") try: source_query = f"SELECT /*+ PARALLEL(2) */ * FROM {schema_source}.{table_name} {table_alias} where {where_clause}" write_sql(sql_log_file, source_query) pl_logging.info("Reading data from source") source_df = pd.read_sql(source_query, connection_source) pl_logging.info("source_df: %s" % len(source_df)) write_sql(sql_log_file, f"source: {table_name}: {source_read_count}") source_read_count += len(source_df) target_query = f"SELECT /*+ PARALLEL(2) */ * FROM {schema_ext}.{table_name} {table_alias} where {where_clause}" write_sql(sql_log_file, target_query) pl_logging.info("Reading data from target") target_df = pd.read_sql(target_query, connection_target) write_sql(sql_log_file, f"target: {table_name}: {target_read_count}") target_read_count += len(target_df) for index, source_row in source_df.iterrows(): pk_value = source_row[pk] new_records = source_df[~source_df[pk_value].isin(target_df[pk_value])] if not new_records.empty: new_records.to_sql(table_name,con=connection_target,if_exists='append',index=False) inserted_count+=len(new_records) else: # Fetch the corresponding row from the target DataFrame based on the primary key target_row = target_df[target_df[pk] == pk_value].iloc[0] if not source_row.equals(target_row): columns_to_update = [] for column_name, source_val in source_row.items(): target_val = target_row[column_name] if source_val != target_val: if is_datetime(source_val): # Format datetime values using the appropriate function for the database type if db_type_ext == 'ORACLE': update_value = f"TO_DATE('{source_val}', 'YYYY-MM-DD HH24:MI:SS')" elif db_type_ext in ('MYSQL', 'MARIA'): # For MariaDB and MySQL, use STR_TO_DATE update_value = f"STR_TO_DATE('{source_val}', '%Y-%m-%d %H:%i:%s')" else: # Enclose other values in single quotes update_value = f"'{source_val}'" elif str(source_val) == 'NaT': # Replace 'NaT' with NULL without single quotes update_value = 'NULL' elif str(source_val) == 'nan': # Replace 'NaT' with NULL without single quotes update_value = 'NULL' elif str(source_val) == 'None': update_value = 'NULL' elif column_name == 'extended_rule_code': parts = source_val.split('==') if len(parts) == 2: extended_rule_code = f"'{parts[0]}=='{parts[1]}'" update_value = extended_rule_code else: if column_name == 'created_by': update_value = f"'{source_val}'" else: # Handle non-datetime columns (e.g., strings, numbers) here update_value = f"'{source_val}'" # Add the column name and formatted value to the update statement columns_to_update.append(f"{column_name} = {update_value}") # Generate an update query dynamically if columns_to_update: update_query = f"UPDATE {schema_ext}.{table_name} SET " update_query += ", ".join(columns_to_update) update_query += f" WHERE {eprm_table_col_pk} = '{pk_value}' AND OP_ID='{opId}' AND BU_ID='{buId}'" try: print(update_query) cursor_target.execute(update_query) updated_count += 1 write_sql(sql_log_file, f"UPDATE: {update_query}") except Exception as e: pl_logging.error( "Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) write_sql(sql_log_file, f"UPDATE FAILED: {update_query}") update_failed_count += 1 except Exception as e: error_msg = f"Error querying table {schema_ext}.{table_name}: {str(e)} skipping the table and proceeding for next table" pl_logging.error(error_msg) failed_entities.append((table_name, error_msg)) continue # Calculate the inserted and updated counts total_events_logged = inserted_count + updated_count pl_logging.info(f"Total Events Logged: {total_events_logged}") pl_logging.info(f"Source Read Count: {source_read_count}") pl_logging.info(f"Target Read Count: {target_read_count}") pl_logging.info(f"Inserted Count: {inserted_count}") pl_logging.info(f"Updated Count: {updated_count}") pl_logging.info(f"Failed Inserted Count: {insert_failed_count}") pl_logging.info(f"Failed Updated Count: {update_failed_count}") pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n") if len(failed_entities) > 0: return_flag = False STATUS = "Error" STATUS_MESSAGE = str(failed_entities).replace("'", '').replace('"', '') pl_logging.info("STATUS: %s" % STATUS) pl_logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE) query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \ + ", status_message='" + STATUS_MESSAGE + "'" \ + ", error_description=NULL" \ + ", updated_by='" + replicationJobId + "' /**/" \ + " WHERE replication_job_id='" + replicationJobId + "'" if db_type_source == 'ORACLE': query_update = query_update.replace('/**/', ', updated_date=SYSDATE') elif db_type_source in ('MARIA', 'MYSQL'): query_update = query_update.replace('/**/', ', updated_date=NOW()') pl_logging.info("-- + ppm_replication_status - UPDATE \n") pl_logging.info(query_update + ";\n") pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n") res = cursor_source.execute(query_update) pl_logging.info("query_update: %s" % res) except ETL_PPM_REPLICATION_MASTER_ERROR: STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE" pl_logging.error("EXCEPTION:" + STATUS_MESSAGE) return_flag = False except Exception as e: pl_logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) return_flag = False return return_flag if __name__ == '__main__': import logging from configparser import ConfigParser as conf_ConfigParser try: # python3 ppm_sc_changesonly.py --operation ChangesOnly --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId SC_JOB_999 parser = argparse.ArgumentParser(description="PPM Service Catalog Replication Script") parser.add_argument('--operation', required=True, help="Operation Type") parser.add_argument('--replicationTarget', required=True, help="Replication Target") parser.add_argument('--opId', required=True, help="Operation ID") parser.add_argument('--buId', required=True, help="Business Unit ID") parser.add_argument('--replicationJobId', required=True, help="Replication Job ID") args = parser.parse_args() replicationJobId = args.replicationJobId json_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/dna.json" conf_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/ppm_sc_replication.conf" sql_log_file = f"/app/scripts/PPM_Release_Management/Service_Catalog_ETL/logs/{replicationJobId}_ppm_sc_changesonly.sql" log_file = f'/app/scripts/PPM_Release_Management/Service_Catalog_ETL/logs/{replicationJobId}_ppm_sc_changesonly.log' args = parser.parse_args() # Set up logging CONFIG = conf_ConfigParser() CONFIG.read(conf_file_path) logging.basicConfig(filename=log_file , level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True) , format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True) , datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True) ) logging.info('LOGGER initiated') encoding = CONFIG.get('CONFIG_GENERIC', 'DB_CHARSET', raw=True) # Read JSON data from file if not os.path.exists(json_file_path): logging.error("CREDENTIAL FILE MISSING") logging.error("CREDENTIAL FILE: %s" % json_file_path) raise FileNotFoundError("CREDENTIAL FILE MISSING") with open(json_file_path) as json_file: json_data = json_load(json_file) # Connect to PPM_PC database connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding) # Connect to source database connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE', encoding) # Connect to target_ext database connection_target, cursor_target, schema_ext, db_type_ext = setDbConnection(logging, json_data, args.replicationTarget, encoding) if not (connection_ppm and connection_source and connection_target): raise DB_CONNECTION_ERROR # if main(args, json_data, log_file, logging, encoding): if process_changes_only(pdb_ppm_sc=connection_ppm , pdb_source=connection_source , pdb_target=connection_target , ps_rep_env=args.replicationTarget , ps_op_id=args.opId , ps_bu_id=args.buId , ps_rep_job_id=args.replicationJobId , pl_logging=logging): print("Update successful") else: print("Update FAILED") except DB_CONNECTION_ERROR: logging.error("EXCEPTION: DB CONNECTION ERROR") print("EXCEPTION: DB CONNECTION ERROR") except FileNotFoundError as ferr: print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno))) except Exception as err: print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) iam getting below error 20:25:49|INFO|process_changes_only|106|Started 20:25:49|INFO|process_changes_only|115|Variable declaration 20:25:49|INFO|process_changes_only|124|Master query 20:25:50|INFO|process_changes_only|145|Reading data from source 20:25:51|INFO|process_changes_only|147|source_df: 36362 20:25:51|INFO|process_changes_only|153|Reading data from target 20:25:52|ERROR|process_changes_only|225|Error querying table tibtcare_ppm_st.ppm_item_marketing_info: 190704 skipping the table and proceeding for next table 20:25:52|INFO|process_changes_only|232|Total Events Logged: 0 20:25:52|INFO|process_changes_only|233|Source Read Count: 36362 20:25:52|INFO|process_changes_only|234|Target Read Count: 36362 20:25:52|INFO|process_changes_only|235|Inserted Count: 0 20:25:52|INFO|process_changes_only|236|Updated Count: 0 20:25:52|INFO|process_changes_only|237|Failed Inserted Count: 0 20:25:52|INFO|process_changes_only|238|Failed Updated Count: 0