import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os
# define Python user-defined exceptions
class Error(Exception):
"""Base class for other exceptions"""
pass
# define Python user-defined exceptions
class ETL_PPM_REPLICATION_MASTER_ERROR(Error):
pass
class DB_CONNECTION_ERROR(Error):
pass
def setDbConnection(logging, json_data, serverInfo, encoding):
from sqlalchemy import create_engine as sqlalchemy_create_engine
from base64 import b64decode as base_b64decode
import mysql.connector
try:
cnx = cursor = schema = db_type = None
encrypt = json_data.get(serverInfo, {}).get('ENCRYPT')
host = json_data.get(serverInfo, {}).get('DB_HOST')
port = json_data.get(serverInfo, {}).get('DB_PORT')
user = json_data.get(serverInfo, {}).get('DB_USER')
db_type = json_data.get(serverInfo, {}).get('DB_TYPE')
schema = json_data.get(serverInfo, {}).get('DB_SCHEMA')
if encrypt == 'Y':
password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8')
else:
password = json_data.get(serverInfo, {}).get('DB_PASSWORD')
if db_type in ('MYSQL', 'MARIA'):
connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
, password
, host
, port
, schema))
elif db_type == 'ORACLE':
connection_text = ('oracle://%s:%s@%s:%s/%s' % (user
, password
, host
, port
, json_data.get(serverInfo, {}).get('DB_SID')
))
cnx = sqlalchemy_create_engine(connection_text
, encoding=encoding
# ,fast_executemany=True
, connect_args={'connect_timeout': 600})
cursor = cnx.connect()
logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}")
# except mysql.connector.Error as dberr:
# logging.error("DATABASE CONNECTION ERROR")
# logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno)))
# cnx = cursor = schema = None
except Exception as dbexp:
logging.error("DATABASE CONNECTION EXCEPTION")
logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno)))
cnx = cursor = schema = None
return cnx, cursor, schema, db_type
def main(args, json_data, log_file, logging, encoding):
try:
releaseId = args.releaseId
opId = args.opId
buId = args.buId
replicationTarget = args.replicationTarget
replicationJobId = args.replicationJobId
return_flag = True
STATUS = "InProgress"
STATUS_MESSAGE = "Insertion of federation tables successful."
replicationTarget_EXT = replicationTarget + '_EXT'
failed_entities = []
# Connect to PPM_PC database
connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)
# Connect to source database
connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE',
encoding)
# Connect to source_ext database
connection_source_ext, cursor_source_ext, schema_source_ext, db_type_source_ext = setDbConnection(logging,
json_data,
'SOURCE_EXT',
encoding)
# Connect to target_ext database
connection_ext, cursor_ext, schema_ext, db_type_ext = setDbConnection(logging, json_data, replicationTarget_EXT,
encoding)
if not (connection_ppm and connection_source and connection_source_ext and connection_ext):
raise DB_CONNECTION_ERROR
# Fetch data from the etl_ppm_replication_master table
primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
df = pd_read_sql(primary_query, con=connection_ppm)
logging.info("Count etl_ppm_replication_master: %s" % len(df))
if len(df) == 0:
raise ETL_PPM_REPLICATION_MASTER_ERROR
query_count = 0
for _, row in df.iterrows():
try:
query_count + 1
eprm_table_name = row['eprm_table_name'].lower()
eprm_seq_nbr = row['eprm_seq_nbr']
logging.info(
f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}")
if eprm_table_name == 'pkg_prd_fed_ext_attrs':
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'"
else:
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
logging.info(f"Reading values")
source_df = pd_read_sql(source_query, con=connection_source_ext)
logging.info(f"Count {len(source_df):,}")
if 'updated_by' in source_df:
source_df['updated_by'] = replicationJobId
logging.info(f"Inserting values")
if not source_df.empty:
source_df.to_sql(eprm_table_name, con=connection_ext, if_exists='append', index=False,
method='multi')
logging.info(f"Insertion successful")
except Exception as e:
failed_entities.append(eprm_table_name)
logging.error(
"DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n")
if len(failed_entities) > 0:
return_flag = False
STATUS = "Error"
STATUS_MESSAGE = str(failed_entities).replace("'", '').replace('"', '')
logging.info("STATUS: %s" % STATUS)
logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE)
query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \
+ ", status_message='" + STATUS_MESSAGE + "'" \
+ ", error_description=NULL" \
+ ", updated_by='" + replicationJobId + "' /**/" \
+ " WHERE replication_job_id='" + replicationJobId + "' AND release_id='" + str(releaseId) + "'"
query_ppm_update = f"UPDATE {schema_source}.ppm_release_master SET replication_status='" + STATUS + "'" \
+ ", updated_by='" + replicationJobId + "' /**/" \
+ " WHERE release_id='" + str(releaseId) + "'"
if db_type_source == 'ORACLE':
query_update = query_update.replace('/**/', ', updated_date=SYSDATE')
query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=SYSDATE')
elif db_type_source in ('MARIA', 'MYSQL'):
query_update = query_update.replace('/**/', ', updated_date=NOW()')
query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=NOW()')
logging.info("-- + ppm_replication_status - UPDATE \n")
logging.info(query_update + ";\n")
logging.info("-- + ppm_release_master - UPDATE \n")
logging.info(query_ppm_update + ";\n")
logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n")
res = cursor_source.execute(query_update)
logging.info("query_update: %s" % res)
res = cursor_source.execute(query_ppm_update)
logging.info("query_ppm_update: %s" % res)
except DB_CONNECTION_ERROR:
logging.error("EXCEPTION: DB CONNECTION ERROR PC_EXT")
return_flag = False
except ETL_PPM_REPLICATION_MASTER_ERROR:
STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE"
logging.error("EXCEPTION:" + STATUS_MESSAGE)
return_flag = False
except Exception as e:
logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
return_flag = False
return return_flag
if __name__ == '__main__':
import logging
from configparser import ConfigParser as conf_ConfigParser
statFile = ""
try:
parser = argparse.ArgumentParser(description="PPM Product Catalog Replication Script")
parser.add_argument('--releaseId', required=True, help="Release ID")
parser.add_argument('--releaseType', required=True, help="Release Type")
parser.add_argument('--replicationTarget', required=True, help="Replication Target")
parser.add_argument('--opId', required=True, help="Operation ID")
parser.add_argument('--buId', required=True, help="Business Unit ID")
parser.add_argument('--replicationJobId', required=True, help="Replication Job ID")
args = parser.parse_args()
replicationJobId = args.replicationJobId
json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json"
conf_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.conf"
log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.log'
statFile = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.status'
args = parser.parse_args()
statFile = open(statFile, "w")
# Set up logging
CONFIG = conf_ConfigParser()
CONFIG.read(conf_file_path)
logging.basicConfig(filename=log_file
, level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True)
, format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True)
, datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True)
)
logging.info('LOGGER initiated')
encoding = CONFIG.get('CONFIG_GENERIC', 'DB_CHARSET', raw=True)
# Read JSON data from file
if not os.path.exists(json_file_path):
logging.error("CREDENTIAL FILE MISSING")
logging.error("CREDENTIAL FILE: %s" % json_file_path)
raise FileNotFoundError("CREDENTIAL FILE MISSING")
with open(json_file_path) as json_file:
json_data = json_load(json_file)
if main(args, json_data, log_file, logging, encoding):
print("Insertion of data successful")
statFile.write("SUCCESS")
else:
statFile.write("FAILED")
except FileNotFoundError as ferr:
print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno)))
statFile.write("FAILED")
except Exception as err:
print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
statFile.write("FAILED")
if isinstance(statFile, io_TextIOBase):
statFile.close()
due to high volume of records above code is not able to load records in my target table so i want to load the dataframe in batches for example if i have around 300000 records then first 50000 records should dataframed then after another 50000 like that the dataframe should should take so that the load on the dataframe will be reduced and also most important thing make sure that there is no confusion in records loading because if we load first 50000 records after that the pointer should point to next 50000 it should not start from starting.so from the below code iam taking data and constructing dataframe
if eprm_table_name == 'pkg_prd_fed_ext_attrs':
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'"
else:
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
logging.info(f"Reading values")
source_df = pd_read_sql(source_query, con=connection_source_ext)
and from below code iam loading the records
if not source_df.empty:
source_df.to_sql(eprm_table_name, con=connection_ext, if_exists='append', index=False,
method='multi')
logging.info(f"Insertion successful")