Untitled

mail@pastecode.io avatarunknown
plain_text
a month ago
14 kB
1
Indexable
Never
# ================================================================================================================================================================================
# PPM PRODUCT CATALOG (PC) REPLICATION - CE CUSTOM TABLES REPLICATE
#   DATE     AUTHOR     VER   CHANGE DESCRIPTION
# --------  ---------  -----  ------------------
# 21.08.23  Veera     1.0   The below script replicates ppm table records dynamically from "etl_ppm_replication_master" "PC_EXT"
#
# python3 ppm_pc_ext_insert.py --releaseId 275.2 --releaseType DEPLOYED --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId REP_990_234
# ================================================================================================================================================================================

import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os


# define Python user-defined exceptions
class Error(Exception):
    """Base class for other exceptions"""
    pass

# define Python user-defined exceptions
class ETL_PPM_REPLICATION_MASTER_ERROR(Error):
    pass

class DB_CONNECTION_ERROR(Error):
    pass


def setDbConnection(logging, json_data, serverInfo, encoding):
    from sqlalchemy import create_engine as sqlalchemy_create_engine
    from base64 import b64decode as base_b64decode
    import mysql.connector
    try:
        cnx = cursor = schema = db_type = None
        encrypt = json_data.get(serverInfo, {}).get('ENCRYPT')
        host = json_data.get(serverInfo, {}).get('DB_HOST')
        port = json_data.get(serverInfo, {}).get('DB_PORT')
        user = json_data.get(serverInfo, {}).get('DB_USER')
        db_type = json_data.get(serverInfo, {}).get('DB_TYPE')
        schema = json_data.get(serverInfo, {}).get('DB_SCHEMA')
        if encrypt == 'Y':
            password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8')
        else:
            password = json_data.get(serverInfo, {}).get('DB_PASSWORD')

        if db_type in ('MYSQL','MARIA'):
            connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
                                                                         ,password
                                                                         ,host
                                                                         ,port
                                                                         ,schema))
        elif db_type == 'ORACLE':
            connection_text = ('oracle://%s:%s@%s:%s/%s' % (user
                                                           ,password
                                                           ,host
                                                           ,port
                                                           ,json_data.get(serverInfo, {}).get('DB_SID')
                                                           ))
        cnx = sqlalchemy_create_engine(connection_text
                                      ,encoding=encoding
                                      #,fast_executemany=True
                                      ,connect_args={'connect_timeout': 300})
        cursor = cnx.connect()
        logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}")
    #except mysql.connector.Error as dberr:
    #    logging.error("DATABASE CONNECTION ERROR")
    #    logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno)))
    #    cnx = cursor = schema = None
    except Exception as dbexp:
        logging.error("DATABASE CONNECTION EXCEPTION")
        logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno)))
        cnx = cursor = schema = None
    
    return cnx, cursor, schema, db_type



def main(args, json_data, log_file, logging, encoding):
    try:
        releaseId = args.releaseId
        opId = args.opId
        buId = args.buId
        replicationTarget = args.replicationTarget
        replicationJobId = args.replicationJobId
        return_flag = True
        STATUS = "InProgress"
        STATUS_MESSAGE = "Insertion of federation tables successful."
        replicationTarget_EXT = replicationTarget + '_EXT'
        failed_entities = []

        # Connect to PPM_PC database
        connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)

        # Connect to source database
        connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data,'SOURCE', encoding)

        # Connect to source_ext database
        connection_source_ext, cursor_source_ext, schema_source_ext,db_type_source_ext = setDbConnection(logging, json_data,'SOURCE_EXT', encoding)

        # Connect to target_ext database
        connection_ext, cursor_ext, schema_ext, db_type_ext  = setDbConnection(logging, json_data, replicationTarget_EXT, encoding)

        if not (connection_ppm and connection_source  and connection_source_ext and connection_ext):
            raise DB_CONNECTION_ERROR
        
        # Fetch data from the etl_ppm_replication_master table
        primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
        df = pd_read_sql(primary_query, con=connection_ppm)
        logging.info("Count etl_ppm_replication_master: %s" % len(df))
        if len(df) == 0:
            raise ETL_PPM_REPLICATION_MASTER_ERROR
        query_count = 0
        
        for _, row in df.iterrows():
            try:
                query_count+1
                eprm_table_name = row['eprm_table_name'].lower()
                eprm_seq_nbr = row['eprm_seq_nbr']
                
                logging.info(f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}")
                
                if eprm_table_name == 'pkg_prd_fed_ext_attrs':
                    source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'"
                else:
                    source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
                
                logging.info(f"Reading values")
                source_df = pd_read_sql(source_query, con=connection_source_ext)
                logging.info(f"Count {len(source_df):,}")
                
                if 'updated_by' in source_df:
                    source_df['updated_by'] = replicationJobId
                
                logging.info(f"Inserting values")
                if not source_df.empty:
                    source_df.to_sql(eprm_table_name, con=connection_ext, if_exists='append', index=False, method='multi')
                    logging.info(f"Insertion successful")
            except Exception as e:
                failed_entities.append(eprm_table_name)
                logging.error("DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
        
        logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n")
        if len(failed_entities)>0:
            return_flag = False
            STATUS = "Error"
            STATUS_MESSAGE = str(failed_entities).replace("'",'').replace('"','')
        
        logging.info("STATUS: %s" % STATUS)
        logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE)
            
        query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \
                       + ", status_message='" + STATUS_MESSAGE + "'" \
                       + ", error_description=NULL" \
                       + ", updated_by='" + replicationJobId + "' /**/" \
                       + " WHERE replication_job_id='" + replicationJobId + "' AND release_id='" + str(releaseId) + "'"
        query_ppm_update = f"UPDATE {schema_source}.ppm_release_master SET replication_status='" + STATUS + "'" \
                           + ", updated_by='" + replicationJobId + "' /**/" \
                           + " WHERE release_id='" + str(releaseId) + "'"

        if db_type_source == 'ORACLE':
            query_update = query_update.replace('/**/', ', updated_date=SYSDATE')
            query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=SYSDATE')
        elif db_type_source in ('MARIA', 'MYSQL'):
            query_update = query_update.replace('/**/', ', updated_date=NOW()')
            query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=NOW()')
        logging.info("-- + ppm_replication_status - UPDATE \n")
        logging.info(query_update + ";\n")
        logging.info("-- + ppm_release_master - UPDATE \n")
        logging.info(query_ppm_update + ";\n")
        logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n")
        res=cursor_source.execute(query_update)
        logging.info("query_update: %s" % res)
        res=cursor_source.execute(query_ppm_update)
        logging.info("query_ppm_update: %s" % res)
    except DB_CONNECTION_ERROR:
        logging.error("EXCEPTION: DB CONNECTION ERROR PC_EXT")
        return_flag = False
    except ETL_PPM_REPLICATION_MASTER_ERROR:
        STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE"
        logging.error("EXCEPTION:" + STATUS_MESSAGE)
        return_flag = False
    except Exception as e:
        logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
        return_flag = False

    return return_flag

if __name__ == '__main__':
    import logging
    from configparser import ConfigParser as conf_ConfigParser

    statFile = ""
    try:
        parser = argparse.ArgumentParser(description="PPM Product Catalog Replication Script")
        parser.add_argument('--releaseId', required=True, help="Release ID")
        parser.add_argument('--releaseType', required=True, help="Release Type")
        parser.add_argument('--replicationTarget', required=True, help="Replication Target")
        parser.add_argument('--opId', required=True, help="Operation ID")
        parser.add_argument('--buId', required=True, help="Business Unit ID")
        parser.add_argument('--replicationJobId', required=True, help="Replication Job ID")
        args = parser.parse_args()
        replicationJobId = args.replicationJobId
        json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json"
        conf_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.conf"
        log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.log'
        statFile = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.status'
        args = parser.parse_args()
        statFile = open(statFile, "w")
        # Set up logging
        CONFIG = conf_ConfigParser()
        CONFIG.read(conf_file_path)


        logging.basicConfig(filename=log_file
                            ,level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True)
                            ,format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True)
                            ,datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True)
                            )
        logging.info('LOGGER initiated')
        
        encoding=CONFIG.get('CONFIG_GENERIC','DB_CHARSET',raw=True)
        
        # Read JSON data from file
        if not os.path.exists(json_file_path):
            logging.error("CREDENTIAL FILE MISSING")
            logging.error("CREDENTIAL FILE: %s" % json_file_path)
            raise FileNotFoundError("CREDENTIAL FILE MISSING")
        with open(json_file_path) as json_file:
            json_data = json_load(json_file)
        
        if main(args, json_data, log_file, logging, encoding):
            print("Insertion of data successful")
            statFile.write("SUCCESS")
        else:
            statFile.write("FAILED")

    except FileNotFoundError as ferr:
        print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno)))
        statFile.write("FAILED")

    except Exception as err:
        print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
        statFile.write("FAILED")

    if isinstance(statFile, io_TextIOBase):
        statFile.close()

actually in this script iam loading data from one table to another..both the tables has same columns and datatypes but as the data is huge iam getting mysql server gone away canyou try one approach like load the data first into csv and from csv diretly load into target table for more clarification...primary query is the data where iam fetching and source_df.to_sql where iam loading..