Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
7.8 kB
2
Indexable
Never
# ================================================================================================================================================================================
# PPM PRODUCT CATALOG (PC) REPLICATION - CE CUSTOM TABLES REPLICATE
#   DATE     AUTHOR     VER   CHANGE DESCRIPTION
# --------  ---------  -----  ------------------
# 21.08.23  Veera      1.0   The below script replicates ppm table records dynamically from "etl_ppm_replication_master" "PC_EXT"
#
#
# ================================================================================================================================================================================

import pandas as pd
import mysql.connector
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine

releaseId = '275.2'
opId = 'HOB'
buId = 'DEFAULT'
replicationTarget = 'SIT'
source = 'SOURCE'
replicationJobId = 'REP_990_235'
json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json"
sql_log_file = f"/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_ext.sql"
log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_ext.log'
# sql_log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.sql"
# json_file_path = os.get_path("PPM_PC_CONFIG") + "/ppm_pc_replication.json"
# log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.log"

# Set up logging
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Open SQL log file for writing
sql_log_file = open(sql_log_file, "w")


# Function to write SQL query to the log file
def write_sql(query_info):
    sql_log_file.write(query_info)
    sql_log_file.write('\n')


try:
    # Function to establish a database connection
    def connect_to_database(json_data, replicationTarget):
        encrypt = json_data.get(replicationTarget, {}).get('ENCRYPT')
        host = json_data.get(replicationTarget, {}).get('DB_HOST')
        port = json_data.get(replicationTarget, {}).get('DB_PORT')
        user = json_data.get(replicationTarget, {}).get('DB_USER')
        db_type = json_data.get(replicationTarget, {}).get('DB_TYPE')
        schema = json_data.get(replicationTarget, {}).get('DB_SCHEMA')
        sid = json_data.get(replicationTarget, {}).get('DB_SID')
        if encrypt == 'Y':
            password = base_b64decode(json_data.get(replicationTarget, {}).get('DB_PASSWORD')).decode('utf-8')
        else:
            password = json_data.get(replicationTarget, {}).get('DB_PASSWORD')

        if db_type == 'MYSQL':
            cnx = mysql.connector.connect(user=user, password=password, host=host, port=port)
            cursor = cnx.cursor()
            logging.info(f"Connected to MySQL database server {replicationTarget}: {host}:{port}")
        elif db_type == 'ORACLE':
            import cx_Oracle
            dsn = cx_Oracle.makedsn(host, port, sid=sid)
            cnx = cx_Oracle.connect(user=user, password=password, dsn=dsn)
            cursor = cnx.cursor()

        return cnx, cursor, schema, user, password, host, port

    # Read JSON data from file
    with open(json_file_path) as json_file:
        json_data = json.load(json_file)

    # Connect to PPM_PC database
    conn_ppm, cursor_ppm, schema_ppm, user_ppm, password_ppm, host_ppm, port_ppm = connect_to_database(json_data, 'PPM_PC')

    # Fetch data from the etl_ppm_replication_master table
    primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
    df = pd_read_sql(primary_query, con=conn_ppm)

    # Connect to source database
    conn_source, cursor_source, schema_source, _, _, _, _ = connect_to_database(json_data, source)

    # Connect to target database
    replicationTarget_EXT = replicationTarget + '_EXT'
    db_type = json_data.get(replicationTarget_EXT, {}).get('DB_TYPE')
    if db_type=='MYSQL':
        _, _, schema_target, user_target, password_target, host_target, port_target = connect_to_database(json_data, replicationTarget_EXT)
        target_engine = create_engine(f"mysql+mysqlconnector://{user_target}:{password_target}@{host_target}:{port_target}/{schema_target}")
    else:
        _, _, schema_target, user_target, password_target, host_target, port_target = connect_to_database(json_data,
                                                                                                          replicationTarget_EXT)

        oracle_dsn = f"(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST={host_target})(PORT={port_target})))(CONNECT_DATA=(SERVICE_NAME={schema_target})(SID={sid})))"  # Add SID here
        dsn_kwargs = {
            'user': user_target,
            'password': password_target,
            'dsn': oracle_dsn,
            'encoding': 'UTF-8',
        }

        try:
            import cx_Oracle

            target_connection = cx_Oracle.connect(**dsn_kwargs)
            target_cursor = target_connection.cursor()
        except ImportError:
            logging.error("cx_Oracle library not found. Make sure it's installed to establish Oracle connections.")
            raise
        except cx_Oracle.DatabaseError as ex:
            logging.error(f"Error while connecting to Oracle database: {ex}")
            raise

    for _, row in df.iterrows():
        eprm_table_name = row['eprm_table_name'].lower()  # Convert table name to lowercase
        if eprm_table_name != 'pkg_prd_fed_ext_attrs':
            source_query = f"SELECT * FROM {schema_source}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
            try:
                source_df = pd_read_sql(source_query, con=conn_source)
                if 'updated_by' in source_df:
                    source_df['updated_by'] = replicationJobId
                if not source_df.empty:
                    source_df.to_sql(eprm_table_name, con=target_engine, if_exists='append', index=False)
                    write_sql(
                        f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n")
                    write_sql(f"-- #Query: Inserted {len(source_df)} record(s) and updated 'created_by'\n")
                    print(f"Inserting records into {eprm_table_name}")
                    logging.info(f"Inserted {len(source_df)} record(s) into {eprm_table_name} and updated 'created_by'")
            except mysql.connector.Error as err:
                print(f"Error occurred while executing the query: {source_query}: {err}")
                logging.info(f"Error occurred while executing the query: {source_query}: {err}")

except Exception as e:
    print("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
    print("An Error occurred while constructing dataframe:", str(e))

except mysql.connector.Error as err:
    cnx = cursor = schema = None
    print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
    print(f"An error occurred: {err}")
    logging.info(f"An error occurred: {err}")

can you please update the above code like the below things should be takens as parameter i dont want to hard code i want to pass them as arguments while executing python code in shell and also write them in the main function in the down and write entire code in def main() like this if __name__ == '__main__':