Untitled
unknown
plain_text
2 years ago
20 kB
3
Indexable
# ===================== ppm_sc_changesonly dna.py --operation ChangesOnly --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId SC_JOB_999 # =========================================================================================================================================================== # PPM PRODUCT CATALOG (PC) REPLICATION - CE CUSTOM TABLES REPLICATE # DATE AUTHOR VER CHANGE DESCRIPTION # -------- --------- ----- ------------------ # 21.08.23 Veerendra 1.0 The below script replicates ppm table records dynamically from "etl_ppm_replication_master" "PC_EXT" # # # ================================================================================================================================================================================ import pandas as pd import json from base64 import b64decode as base_b64decode import logging from pandas import read_sql as pd_read_sql import sys from sqlalchemy import create_engine import argparse from io import TextIOBase as io_TextIOBase from json import load as json_load import os import datetime # define Python user-defined exceptions class Error(Exception): """Base class for other exceptions""" pass # define Python user-defined exceptions class ETL_PPM_REPLICATION_MASTER_ERROR(Error): pass class DB_CONNECTION_ERROR(Error): pass def is_datetime(value): try: # Attempt to parse the value as a datetime datetime.datetime.strptime(str(value), '%Y-%m-%d %H:%M:%S') return True except ValueError: return False def setDbConnection(logging, json_data, serverInfo, encoding): from sqlalchemy import create_engine as sqlalchemy_create_engine from base64 import b64decode as base_b64decode import mysql.connector try: cnx = cursor = schema = db_type = None encrypt = json_data.get(serverInfo, {}).get('ENCRYPT') host = json_data.get(serverInfo, {}).get('DB_HOST') port = json_data.get(serverInfo, {}).get('DB_PORT') user = json_data.get(serverInfo, {}).get('DB_USER') db_type = json_data.get(serverInfo, {}).get('DB_TYPE') schema = json_data.get(serverInfo, {}).get('DB_SCHEMA') if encrypt == 'Y': password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get(serverInfo, {}).get('DB_PASSWORD') if db_type in ('MYSQL', 'MARIA'): connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user , password , host , port , schema)) elif db_type == 'ORACLE': connection_text = ('oracle://%s:%s@%s:%s/%s' % (user , password , host , port , json_data.get(serverInfo, {}).get('DB_SID') )) cnx = sqlalchemy_create_engine(connection_text , encoding=encoding) # ,fast_executemany=True # , connect_args={'connect_timeout': 600}) cursor = cnx.connect() logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}") # except mysql.connector.Error as dberr: # logging.error("DATABASE CONNECTION ERROR") # logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno))) # cnx = cursor = schema = None except Exception as dbexp: logging.error("DATABASE CONNECTION EXCEPTION") logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno))) cnx = cursor = schema = None return cnx, cursor, schema, db_type # def main(args, json_data, log_file, logging, encoding): def process_changes_only(pdb_ppm_sc, pdb_source, pdb_target, ps_rep_env, ps_op_id, ps_bu_id, ps_rep_job_id, pl_logging): try: replicationJobId = args.replicationJobId return_flag = True STATUS = "InProgress" STATUS_MESSAGE = "Updating service catalog changes successful." failed_entities = [] opId=ps_op_id buId=ps_bu_id insert_sucess_count = 0 insert_failed_count = 0 update_sucess_count = 0 update_failed_count = 0 primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='SC' AND eprm_enabled_flg='Y'" primary_df = pd_read_sql(primary_query, con=pdb_ppm_sc) for index, row in primary_df.iterrows(): # Fetch primary key column name and table name table_name = row['eprm_table_name'].lower() eprm_table_col_pk = row['eprm_table_col_pk'] pk = eprm_table_col_pk.lower() source_query = f"SELECT * FROM {schema_source}.{table_name} where OP_ID='{opId}' AND BU_ID='{buId}'" source_df = pd.read_sql(source_query, connection_source) target_query = f"SELECT * FROM {schema_ext}.{table_name} WHERE OP_ID='{opId}' AND BU_ID='{buId}'" target_df = pd.read_sql(target_query, connection_target) for index, source_row in source_df.iterrows(): pk_value = source_row[pk] # Check if the primary key exists in the target DataFrame if pk_value not in target_df[pk].values: # Replace 'None' and 'NaT' with None in source_row for column_name, source_val in source_row.items(): if source_val == 'None' or source_val == 'NaT' or source_val == 'nan': source_row[column_name] = NULL # Generate an INSERT query dynamically insert_query = f"INSERT INTO {schema_ext}.{table_name} (" insert_columns = [] insert_values = [] for column_name, source_val in source_row.items(): if source_val is not None: if isinstance(source_val, str) and source_val.startswith('TO_DATE'): # If it already starts with TO_DATE, don't add TO_DATE again insert_values.append(source_val) elif is_datetime(source_val): # Format datetime values using the appropriate function for the database type if db_type_ext == 'ORACLE': insert_values.append(f"TO_DATE('{source_val}', 'YYYY-MM-DD HH24:MI:SS')") elif db_type_ext in ('MYSQL', 'MARIA'): # For MariaDB, use STR_TO_DATE insert_values.append(f"STR_TO_DATE('{source_val}', '%Y-%m-%d %H:%i:%s')") else: # Enclose other values in single quotes insert_values.append(f"'{source_val}'") insert_columns.append(column_name) # Add the column name elif str(source_val) == 'NaT': # Replace 'NaT' with NULL without single quotes insert_values.append('NULL') insert_columns.append(column_name) # Add the column name elif column_name == 'extended_rule_code': parts = source_val.split('==') if len(parts) == 2: extended_rule_code = f"'{parts[0]}=='{parts[1]}''" insert_values.append(extended_rule_code) insert_columns.append(column_name) elif str(source_val) == 'nan': insert_values.append('NULL') insert_columns.append(column_name) else: # Enclose other values in single quotes insert_values.append(f"'{source_val}'") insert_columns.append(column_name) # Add the column name else: insert_values.append('NULL') # Insert a true NULL insert_columns.append(column_name) # Add the column name # Construct the INSERT query with column names insert_query = f"INSERT INTO {schema_ext}.{table_name} ({', '.join(insert_columns)}) VALUES ({', '.join(insert_values)})" # Execute the INSERT query try: print(insert_query) cursor_target.execute(insert_query) insert_sucess_count += 1 except Exception as e: pl_logging.error( "Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) insert_failed_count += 1 else: # Fetch the corresponding row from the target DataFrame based on the primary key target_row = target_df[target_df[pk] == pk_value].iloc[0] if not source_row.equals(target_row): columns_to_update = [] for column_name, source_val in source_row.items(): target_val = target_row[column_name] if source_val != target_val: if is_datetime(source_val): # Format datetime values using the appropriate function for the database type if db_type_ext == 'ORACLE': update_value = f"TO_DATE('{source_val}', 'YYYY-MM-DD HH24:MI:SS')" elif db_type_ext in ('MYSQL', 'MARIA'): # For MariaDB and MySQL, use STR_TO_DATE update_value = f"STR_TO_DATE('{source_val}', '%Y-%m-%d %H:%i:%s')" else: # Enclose other values in single quotes update_value = f"'{source_val}'" elif str(source_val) == 'NaT': # Replace 'NaT' with NULL without single quotes update_value = 'NULL' elif str(source_val) == 'nan': # Replace 'NaT' with NULL without single quotes update_value = 'NULL' elif str(source_val) == 'None': update_value = 'NULL' elif column_name == 'extended_rule_code': parts = source_val.split('==') if len(parts) == 2: extended_rule_code = f"'{parts[0]}=='{parts[1]}'" update_value = extended_rule_code else: if column_name == 'created_by': update_value = f"'{source_val}'" else: # Handle non-datetime columns (e.g., strings, numbers) here update_value = f"'{source_val}'" # Add the column name and formatted value to the update statement columns_to_update.append(f"{column_name} = {update_value}") # Generate an update query dynamically if columns_to_update: update_query = f"UPDATE {schema_ext}.{table_name} SET " update_query += ", ".join(columns_to_update) update_query += f" WHERE {eprm_table_col_pk} = '{pk_value}' AND OP_ID='{opId}' AND BU_ID='{buId}'" try: print(update_query) cursor_target.execute(update_query) update_sucess_count += 1 except Exception as e: pl_logging.error( "Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) update_failed_count += 1 print(f"Successful inserts: {insert_sucess_count}") print(f"Failed inserts: {insert_failed_count}") print(f"Successful updates: {update_sucess_count}") print(f"Failed updates: {update_failed_count}") pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n") if len(failed_entities) > 0: return_flag = False STATUS = "Error" STATUS_MESSAGE = str(failed_entities).replace("'", '').replace('"', '') pl_logging.info("STATUS: %s" % STATUS) pl_logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE) query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \ + ", status_message='" + STATUS_MESSAGE + "'" \ + ", error_description=NULL" \ + ", updated_by='" + replicationJobId + "' /**/" \ if db_type_source == 'ORACLE': query_update = query_update.replace('/**/', ', updated_date=SYSDATE') elif db_type_source in ('MARIA', 'MYSQL'): query_update = query_update.replace('/**/', ', updated_date=NOW()') pl_logging.info("-- + ppm_replication_status - UPDATE \n") pl_logging.info(query_update + ";\n") pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n") res = cursor_source.execute(query_update) pl_logging.info("query_update: %s" % res) except ETL_PPM_REPLICATION_MASTER_ERROR: STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE" pl_logging.error("EXCEPTION:" + STATUS_MESSAGE) return_flag = False except Exception as e: pl_logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) return_flag = False return return_flag if __name__ == '__main__': import logging from configparser import ConfigParser as conf_ConfigParser try: # python3 ppm_sc_changesonly.py --operation ChangesOnly --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId SC_JOB_999 parser = argparse.ArgumentParser(description="PPM Service Catalog Replication Script") parser.add_argument('--operation', required=True, help="Operation Type") parser.add_argument('--replicationTarget', required=True, help="Replication Target") parser.add_argument('--opId', required=True, help="Operation ID") parser.add_argument('--buId', required=True, help="Business Unit ID") parser.add_argument('--replicationJobId', required=True, help="Replication Job ID") args = parser.parse_args() replicationJobId = args.replicationJobId json_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/dna.json" conf_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/ppm_sc_replication.conf" log_file = f'/app/scripts/PPM_Release_Management/Service_Catalog_ETL/logs/{replicationJobId}_ppm_sc_changesonly.log' args = parser.parse_args() # Set up logging CONFIG = conf_ConfigParser() CONFIG.read(conf_file_path) logging.basicConfig(filename=log_file , level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True) , format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True) , datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True) ) logging.info('LOGGER initiated') encoding = CONFIG.get('CONFIG_GENERIC', 'DB_CHARSET', raw=True) # Read JSON data from file if not os.path.exists(json_file_path): logging.error("CREDENTIAL FILE MISSING") logging.error("CREDENTIAL FILE: %s" % json_file_path) raise FileNotFoundError("CREDENTIAL FILE MISSING") with open(json_file_path) as json_file: json_data = json_load(json_file) # Connect to PPM_PC database connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding) # Connect to source database connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE', encoding) # Connect to target_ext database connection_target, cursor_target, schema_ext, db_type_ext = setDbConnection(logging, json_data, args.replicationTarget, encoding) if not (connection_ppm and connection_source and connection_target): raise DB_CONNECTION_ERROR # if main(args, json_data, log_file, logging, encoding): if process_changes_only(pdb_ppm_sc=connection_ppm , pdb_source=connection_source , pdb_target=connection_target , ps_rep_env=args.replicationTarget , ps_op_id=args.opId , ps_bu_id=args.buId , ps_rep_job_id=args.replicationJobId , pl_logging=logging): print("Update successful") else: print("Update FAILED") except DB_CONNECTION_ERROR: logging.error("EXCEPTION: DB CONNECTION ERROR") print("EXCEPTION: DB CONNECTION ERROR") except FileNotFoundError as ferr: print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno))) except Exception as err: print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) in the above while reading source and target dataframes i dont have some tables and for that iam getting error as below 13:54:51|ERROR|process_changes_only|282|Error - (cx_Oracle.DatabaseError) ORA-00942: table or view does not exist [SQL: SELECT * FROM tibtcare_ppm_st.sc_ppm_item_marketing_info WHERE OP_ID='HOB' AND BU_ID='DEFAULT'] (Background on this error at: https://sqlalche.me/e/14/4xp6) . Line No - 123 so i want you to hadle that if table doesnt exits then it should log that tables doesnt have in target database proceeding for next table like that...code should not stop
Editor is loading...