Untitled
plain_text
a month ago
13 kB
1
Indexable
Never
import pandas as pd import json from base64 import b64decode as base_b64decode import logging from pandas import read_sql as pd_read_sql import sys from sqlalchemy import create_engine import argparse from io import TextIOBase as io_TextIOBase from json import load as json_load import os # define Python user-defined exceptions class Error(Exception): """Base class for other exceptions""" pass # define Python user-defined exceptions class ETL_PPM_REPLICATION_MASTER_ERROR(Error): pass class DB_CONNECTION_ERROR(Error): pass def setDbConnection(logging, json_data, serverInfo, encoding): from sqlalchemy import create_engine as sqlalchemy_create_engine from base64 import b64decode as base_b64decode import mysql.connector try: cnx = cursor = schema = db_type = None encrypt = json_data.get(serverInfo, {}).get('ENCRYPT') host = json_data.get(serverInfo, {}).get('DB_HOST') port = json_data.get(serverInfo, {}).get('DB_PORT') user = json_data.get(serverInfo, {}).get('DB_USER') db_type = json_data.get(serverInfo, {}).get('DB_TYPE') schema = json_data.get(serverInfo, {}).get('DB_SCHEMA') if encrypt == 'Y': password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get(serverInfo, {}).get('DB_PASSWORD') if db_type in ('MYSQL', 'MARIA'): connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user , password , host , port , schema)) elif db_type == 'ORACLE': connection_text = ('oracle://%s:%s@%s:%s/%s' % (user , password , host , port , json_data.get(serverInfo, {}).get('DB_SID') )) cnx = sqlalchemy_create_engine(connection_text , encoding=encoding # ,fast_executemany=True , connect_args={'connect_timeout': 600}) cursor = cnx.connect() logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}") # except mysql.connector.Error as dberr: # logging.error("DATABASE CONNECTION ERROR") # logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno))) # cnx = cursor = schema = None except Exception as dbexp: logging.error("DATABASE CONNECTION EXCEPTION") logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno))) cnx = cursor = schema = None return cnx, cursor, schema, db_type def main(args, json_data, log_file, logging, encoding): try: releaseId = args.releaseId opId = args.opId buId = args.buId replicationTarget = args.replicationTarget replicationJobId = args.replicationJobId return_flag = True STATUS = "InProgress" STATUS_MESSAGE = "Insertion of federation tables successful." replicationTarget_EXT = replicationTarget + '_EXT' failed_entities = [] # Connect to PPM_PC database connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding) # Connect to source database connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE', encoding) # Connect to source_ext database connection_source_ext, cursor_source_ext, schema_source_ext, db_type_source_ext = setDbConnection(logging, json_data, 'SOURCE_EXT', encoding) # Connect to target_ext database connection_ext, cursor_ext, schema_ext, db_type_ext = setDbConnection(logging, json_data, replicationTarget_EXT, encoding) if not (connection_ppm and connection_source and connection_source_ext and connection_ext): raise DB_CONNECTION_ERROR # Fetch data from the etl_ppm_replication_master table primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'" df = pd_read_sql(primary_query, con=connection_ppm) logging.info("Count etl_ppm_replication_master: %s" % len(df)) if len(df) == 0: raise ETL_PPM_REPLICATION_MASTER_ERROR query_count = 0 for _, row in df.iterrows(): try: query_count + 1 eprm_table_name = row['eprm_table_name'].lower() eprm_seq_nbr = row['eprm_seq_nbr'] logging.info( f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}") if eprm_table_name == 'pkg_prd_fed_ext_attrs': source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'" else: source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'" logging.info(f"Reading values") source_df = pd_read_sql(source_query, con=connection_source_ext) logging.info(f"Count {len(source_df):,}") if 'updated_by' in source_df: source_df['updated_by'] = replicationJobId logging.info(f"Inserting values") if not source_df.empty: source_df.to_sql(eprm_table_name, con=connection_ext, if_exists='append', index=False, method='multi') logging.info(f"Insertion successful") except Exception as e: failed_entities.append(eprm_table_name) logging.error( "DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n") if len(failed_entities) > 0: return_flag = False STATUS = "Error" STATUS_MESSAGE = str(failed_entities).replace("'", '').replace('"', '') logging.info("STATUS: %s" % STATUS) logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE) query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \ + ", status_message='" + STATUS_MESSAGE + "'" \ + ", error_description=NULL" \ + ", updated_by='" + replicationJobId + "' /**/" \ + " WHERE replication_job_id='" + replicationJobId + "' AND release_id='" + str(releaseId) + "'" query_ppm_update = f"UPDATE {schema_source}.ppm_release_master SET replication_status='" + STATUS + "'" \ + ", updated_by='" + replicationJobId + "' /**/" \ + " WHERE release_id='" + str(releaseId) + "'" if db_type_source == 'ORACLE': query_update = query_update.replace('/**/', ', updated_date=SYSDATE') query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=SYSDATE') elif db_type_source in ('MARIA', 'MYSQL'): query_update = query_update.replace('/**/', ', updated_date=NOW()') query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=NOW()') logging.info("-- + ppm_replication_status - UPDATE \n") logging.info(query_update + ";\n") logging.info("-- + ppm_release_master - UPDATE \n") logging.info(query_ppm_update + ";\n") logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n") res = cursor_source.execute(query_update) logging.info("query_update: %s" % res) res = cursor_source.execute(query_ppm_update) logging.info("query_ppm_update: %s" % res) except DB_CONNECTION_ERROR: logging.error("EXCEPTION: DB CONNECTION ERROR PC_EXT") return_flag = False except ETL_PPM_REPLICATION_MASTER_ERROR: STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE" logging.error("EXCEPTION:" + STATUS_MESSAGE) return_flag = False except Exception as e: logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) return_flag = False return return_flag if __name__ == '__main__': import logging from configparser import ConfigParser as conf_ConfigParser statFile = "" try: parser = argparse.ArgumentParser(description="PPM Product Catalog Replication Script") parser.add_argument('--releaseId', required=True, help="Release ID") parser.add_argument('--releaseType', required=True, help="Release Type") parser.add_argument('--replicationTarget', required=True, help="Replication Target") parser.add_argument('--opId', required=True, help="Operation ID") parser.add_argument('--buId', required=True, help="Business Unit ID") parser.add_argument('--replicationJobId', required=True, help="Replication Job ID") args = parser.parse_args() replicationJobId = args.replicationJobId json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json" conf_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.conf" log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.log' statFile = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.status' args = parser.parse_args() statFile = open(statFile, "w") # Set up logging CONFIG = conf_ConfigParser() CONFIG.read(conf_file_path) logging.basicConfig(filename=log_file , level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True) , format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True) , datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True) ) logging.info('LOGGER initiated') encoding = CONFIG.get('CONFIG_GENERIC', 'DB_CHARSET', raw=True) # Read JSON data from file if not os.path.exists(json_file_path): logging.error("CREDENTIAL FILE MISSING") logging.error("CREDENTIAL FILE: %s" % json_file_path) raise FileNotFoundError("CREDENTIAL FILE MISSING") with open(json_file_path) as json_file: json_data = json_load(json_file) if main(args, json_data, log_file, logging, encoding): print("Insertion of data successful") statFile.write("SUCCESS") else: statFile.write("FAILED") except FileNotFoundError as ferr: print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno))) statFile.write("FAILED") except Exception as err: print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) statFile.write("FAILED") if isinstance(statFile, io_TextIOBase): statFile.close() due to high volume of record insertion my code is not working so i want to modify above code like for example consider i have 1000 records so first i want to take 200 records in dataframe only and load them after that another 200 records like that i want to load so that the load will be less and also make sure to have an pointer to track the records or else mismatch will happen like if 200 records loaded first then after 200 records it should continue it should not come again from starting onwards..so please apply the said logic to the source_df where iam executing source query