a year ago
14 kB
import pandas as pd import json from base64 import b64decode as base_b64decode import logging from pandas import read_sql as pd_read_sql import sys from sqlalchemy import create_engine import argparse from io import TextIOBase as io_TextIOBase from json import load as json_load import os # define Python user-defined exceptions class Error(Exception): """Base class for other exceptions""" pass # define Python user-defined exceptions class ETL_PPM_REPLICATION_MASTER_ERROR(Error): pass class DB_CONNECTION_ERROR(Error): pass def setDbConnection(logging, json_data, serverInfo, encoding): from sqlalchemy import create_engine as sqlalchemy_create_engine from base64 import b64decode as base_b64decode import mysql.connector try: cnx = cursor = schema = db_type = None encrypt = json_data.get(serverInfo, {}).get('ENCRYPT') host = json_data.get(serverInfo, {}).get('DB_HOST') port = json_data.get(serverInfo, {}).get('DB_PORT') user = json_data.get(serverInfo, {}).get('DB_USER') db_type = json_data.get(serverInfo, {}).get('DB_TYPE') schema = json_data.get(serverInfo, {}).get('DB_SCHEMA') if encrypt == 'Y': password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get(serverInfo, {}).get('DB_PASSWORD') if db_type in ('MYSQL', 'MARIA'): connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user , password , host , port , schema)) elif db_type == 'ORACLE': connection_text = ('oracle://%s:%s@%s:%s/%s' % (user , password , host , port , json_data.get(serverInfo, {}).get('DB_SID') )) cnx = sqlalchemy_create_engine(connection_text , encoding=encoding) # ,fast_executemany=True #, connect_args={'connect_timeout': 600}) cursor = cnx.connect() logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}") # except mysql.connector.Error as dberr: # logging.error("DATABASE CONNECTION ERROR") # logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno))) # cnx = cursor = schema = None except Exception as dbexp: logging.error("DATABASE CONNECTION EXCEPTION") logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno))) cnx = cursor = schema = None return cnx, cursor, schema, db_type def main(args, json_data, log_file, logging, encoding): try: releaseId = args.releaseId opId = args.opId buId = args.buId replicationTarget = args.replicationTarget replicationJobId = args.replicationJobId return_flag = True STATUS = "InProgress" STATUS_MESSAGE = "Insertion of federation tables successful." replicationTarget_EXT = replicationTarget + '_EXT' failed_entities = [] # Connect to PPM_PC database connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding) # Connect to source database connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE', encoding) # Connect to target_ext database connection_ext, cursor_ext, schema_ext, db_type_ext = setDbConnection(logging, json_data, 'SIT', encoding) if not (connection_ppm and connection_source and connection_ext): raise DB_CONNECTION_ERROR primary_query=f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='SC' AND eprm_enabled_flg='Y'" primary_df=pd_read_sql(primary_query,con=connection_ppm) for index, row in primary_df.iterrows(): table_name = row['eprm_table_name'].lower() eprm_table_col_pk = row['eprm_table_col_pk'] # Fetch primary key column name and table name pk = eprm_table_col_pk.lower() try: source_query = f"SELECT * FROM {schema_source}.{table_name}" # Where op_id , bu_id source_df = pd.read_sql(source_query, connection_source) target_query = f"SELECT * FROM {schema_ext}.{table_name}" # Where op_id , bu_id target_df = pd.read_sql(target_query, connection_ext) except Exception as e: continue for index, source_row in source_df.iterrows(): pk_value = source_row[pk] # Check if the primary key exists in the target DataFrame if pk_value not in target_df[pk].values: # Generate an INSERT query dynamically insert_query = f"INSERT INTO {schema_ext}.{table_name} (" insert_columns = [] insert_values = [] for column_name, source_val in source_row.items(): insert_columns.append(column_name) insert_values.append(f"'{source_val}'") insert_query += ", ".join(insert_columns) insert_query += ") VALUES (" insert_query += ", ".join(insert_values) insert_query += ")" # Execute the INSERT query try: print(insert_query) # cursor_ext.execute(insert_query) except Exception as e: continue else: # Fetch the corresponding row from the target DataFrame based on the primary key target_row = target_df[target_df[pk] == pk_value].iloc[0] if not source_row.equals(target_row): # Generate an update query dynamically update_query = f"UPDATE {schema_ext}.{table_name} SET " column_updates = [] for column_name, source_val in source_row.items(): if column_name not in ('created_by', 'created_on', 'updated_on', 'start_date', 'end_date'): target_val = target_row[column_name] if source_val != target_val: column_updates.append(f"{column_name} = '{source_val}'") update_query += ", ".join(column_updates) update_query += f" where {eprm_table_col_pk} = '{pk_value}'" # print(update_query) try: print(update_query) # cursor_ext.execute(update_query) except Exception as e: continue logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n") if len(failed_entities) > 0: return_flag = False STATUS = "Error" STATUS_MESSAGE = str(failed_entities).replace("'", '').replace('"', '') logging.info("STATUS: %s" % STATUS) logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE) query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \ + ", status_message='" + STATUS_MESSAGE + "'" \ + ", error_description=NULL" \ + ", updated_by='" + replicationJobId + "' /**/" \ + " WHERE replication_job_id='" + replicationJobId + "' AND release_id='" + str(releaseId) + "'" query_ppm_update = f"UPDATE {schema_source}.ppm_release_master SET replication_status='" + STATUS + "'" \ + ", updated_by='" + replicationJobId + "' /**/" \ + " WHERE release_id='" + str(releaseId) + "'" if db_type_source == 'ORACLE': query_update = query_update.replace('/**/', ', updated_date=SYSDATE') query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=SYSDATE') elif db_type_source in ('MARIA', 'MYSQL'): query_update = query_update.replace('/**/', ', updated_date=NOW()') query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=NOW()') logging.info("-- + ppm_replication_status - UPDATE \n") logging.info(query_update + ";\n") logging.info("-- + ppm_release_master - UPDATE \n") logging.info(query_ppm_update + ";\n") logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n") res = cursor_source.execute(query_update) logging.info("query_update: %s" % res) res = cursor_source.execute(query_ppm_update) logging.info("query_ppm_update: %s" % res) except DB_CONNECTION_ERROR: logging.error("EXCEPTION: DB CONNECTION ERROR PC_EXT") return_flag = False except ETL_PPM_REPLICATION_MASTER_ERROR: STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE" logging.error("EXCEPTION:" + STATUS_MESSAGE) return_flag = False except Exception as e: logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) return_flag = False return return_flag if __name__ == '__main__': import logging from configparser import ConfigParser as conf_ConfigParser statFile = "" try: parser = argparse.ArgumentParser(description="PPM Product Catalog Replication Script") parser.add_argument('--releaseId', required=True, help="Release ID") parser.add_argument('--releaseType', required=True, help="Release Type") parser.add_argument('--replicationTarget', required=True, help="Replication Target") parser.add_argument('--opId', required=True, help="Operation ID") parser.add_argument('--buId', required=True, help="Business Unit ID") parser.add_argument('--replicationJobId', required=True, help="Replication Job ID") args = parser.parse_args() replicationJobId = args.replicationJobId json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/dna.json" conf_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.conf" log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_update_dna.log' statFile = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_update_dna.status' args = parser.parse_args() statFile = open(statFile, "w") # Set up logging CONFIG = conf_ConfigParser() CONFIG.read(conf_file_path) logging.basicConfig(filename=log_file , level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True) , format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True) , datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True) ) logging.info('LOGGER initiated') encoding = CONFIG.get('CONFIG_GENERIC', 'DB_CHARSET', raw=True) # Read JSON data from file if not os.path.exists(json_file_path): logging.error("CREDENTIAL FILE MISSING") logging.error("CREDENTIAL FILE: %s" % json_file_path) raise FileNotFoundError("CREDENTIAL FILE MISSING") with open(json_file_path) as json_file: json_data = json_load(json_file) if main(args, json_data, log_file, logging, encoding): print("Update successful") statFile.write("SUCCESS") else: statFile.write("FAILED") except FileNotFoundError as ferr: print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno))) statFile.write("FAILED") except Exception as err: print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) statFile.write("FAILED") if isinstance(statFile, io_TextIOBase): statFile.close() in the above code iam inserting and updating records based on some condition while inserting records some of the values have 'None' as valye can you replace them with NULL dynamically by identifying the values below i have given how its occuring INSERT INTO tibtcare_ppm_st.ppm_pff_container_map (pff_container_map_id, pff_id, action, container_id, created_by, created_on, updated_by, updated_on, op_id, bu_id, end_date, start_date) VALUES ('9405', 'PFF_ADD_USER_FIXED_NUMBER', 'MODIFY', 'MODIFY_PFF_ADD_USER_FIXED_NUMBER', 'a0658007', '2020-01-28 17:25:04', 'None', 'NaT', 'HOB', 'DEFAULT', '2030-12-30 23:59:59', '2020-01-28 17:25:04') you see above insert statment has 'None' and 'Nat' can you replace both of them with NULL in every record when ever None or Nat occurs in value...remember one thing just place NULL thats all dont place 'NULL'
Editor is loading...