Untitled
unknown
plain_text
a year ago
7.8 kB
2
Indexable
Never
# ================================================================================================================================================================================ # PPM PRODUCT CATALOG (PC) REPLICATION - CE CUSTOM TABLES REPLICATE # DATE AUTHOR VER CHANGE DESCRIPTION # -------- --------- ----- ------------------ # 21.08.23 Veera 1.0 The below script replicates ppm table records dynamically from "etl_ppm_replication_master" "PC_EXT" # # # ================================================================================================================================================================================ import pandas as pd import mysql.connector import json from base64 import b64decode as base_b64decode import logging from pandas import read_sql as pd_read_sql import sys from sqlalchemy import create_engine releaseId = '275.2' opId = 'HOB' buId = 'DEFAULT' replicationTarget = 'SIT' source = 'SOURCE' replicationJobId = 'REP_990_235' json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json" sql_log_file = f"/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_ext.sql" log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_ext.log' # sql_log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.sql" # json_file_path = os.get_path("PPM_PC_CONFIG") + "/ppm_pc_replication.json" # log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.log" # Set up logging logging.basicConfig( filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Open SQL log file for writing sql_log_file = open(sql_log_file, "w") # Function to write SQL query to the log file def write_sql(query_info): sql_log_file.write(query_info) sql_log_file.write('\n') try: # Function to establish a database connection def connect_to_database(json_data, replicationTarget): encrypt = json_data.get(replicationTarget, {}).get('ENCRYPT') host = json_data.get(replicationTarget, {}).get('DB_HOST') port = json_data.get(replicationTarget, {}).get('DB_PORT') user = json_data.get(replicationTarget, {}).get('DB_USER') db_type = json_data.get(replicationTarget, {}).get('DB_TYPE') schema = json_data.get(replicationTarget, {}).get('DB_SCHEMA') sid = json_data.get(replicationTarget, {}).get('DB_SID') if encrypt == 'Y': password = base_b64decode(json_data.get(replicationTarget, {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get(replicationTarget, {}).get('DB_PASSWORD') if db_type == 'MYSQL': cnx = mysql.connector.connect(user=user, password=password, host=host, port=port) cursor = cnx.cursor() logging.info(f"Connected to MySQL database server {replicationTarget}: {host}:{port}") elif db_type == 'ORACLE': import cx_Oracle dsn = cx_Oracle.makedsn(host, port, sid=sid) cnx = cx_Oracle.connect(user=user, password=password, dsn=dsn) cursor = cnx.cursor() return cnx, cursor, schema, user, password, host, port # Read JSON data from file with open(json_file_path) as json_file: json_data = json.load(json_file) # Connect to PPM_PC database conn_ppm, cursor_ppm, schema_ppm, user_ppm, password_ppm, host_ppm, port_ppm = connect_to_database(json_data, 'PPM_PC') # Fetch data from the etl_ppm_replication_master table primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'" df = pd_read_sql(primary_query, con=conn_ppm) # Connect to source database conn_source, cursor_source, schema_source, _, _, _, _ = connect_to_database(json_data, source) # Connect to target database replicationTarget_EXT = replicationTarget + '_EXT' db_type = json_data.get(replicationTarget_EXT, {}).get('DB_TYPE') if db_type=='MYSQL': _, _, schema_target, user_target, password_target, host_target, port_target = connect_to_database(json_data, replicationTarget_EXT) target_engine = create_engine(f"mysql+mysqlconnector://{user_target}:{password_target}@{host_target}:{port_target}/{schema_target}") else: _, _, schema_target, user_target, password_target, host_target, port_target = connect_to_database(json_data, replicationTarget_EXT) oracle_dsn = f"(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST={host_target})(PORT={port_target})))(CONNECT_DATA=(SERVICE_NAME={schema_target})(SID={sid})))" # Add SID here dsn_kwargs = { 'user': user_target, 'password': password_target, 'dsn': oracle_dsn, 'encoding': 'UTF-8', } try: import cx_Oracle target_connection = cx_Oracle.connect(**dsn_kwargs) target_cursor = target_connection.cursor() except ImportError: logging.error("cx_Oracle library not found. Make sure it's installed to establish Oracle connections.") raise except cx_Oracle.DatabaseError as ex: logging.error(f"Error while connecting to Oracle database: {ex}") raise for _, row in df.iterrows(): eprm_table_name = row['eprm_table_name'].lower() # Convert table name to lowercase if eprm_table_name != 'pkg_prd_fed_ext_attrs': source_query = f"SELECT * FROM {schema_source}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'" try: source_df = pd_read_sql(source_query, con=conn_source) if 'updated_by' in source_df: source_df['updated_by'] = replicationJobId if not source_df.empty: source_df.to_sql(eprm_table_name, con=target_engine, if_exists='append', index=False) write_sql( f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n") write_sql(f"-- #Query: Inserted {len(source_df)} record(s) and updated 'created_by'\n") print(f"Inserting records into {eprm_table_name}") logging.info(f"Inserted {len(source_df)} record(s) into {eprm_table_name} and updated 'created_by'") except mysql.connector.Error as err: print(f"Error occurred while executing the query: {source_query}: {err}") logging.info(f"Error occurred while executing the query: {source_query}: {err}") except Exception as e: print("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) print("An Error occurred while constructing dataframe:", str(e)) except mysql.connector.Error as err: cnx = cursor = schema = None print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) print(f"An error occurred: {err}") logging.info(f"An error occurred: {err}") can you please update the above code like the below things should be takens as parameter i dont want to hard code i want to pass them as arguments while executing python code in shell and also write them in the main function in the down and write entire code in def main() like this if __name__ == '__main__':