# ... (previous imports and class definitions)
# ... (setDbConnection function)
def main(args, json_data, log_file, logging, encoding):
try:
releaseId = args.releaseId
opId = args.opId
buId = args.buId
replicationTarget = args.replicationTarget
replicationJobId = args.replicationJobId
return_flag = True
STATUS = "InProgress"
STATUS_MESSAGE = "Insertion of federation tables successful."
replicationTarget_EXT = replicationTarget + '_EXT'
failed_entities = []
# Connect to PPM_PC database
connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)
# Connect to source database
connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data,'SOURCE', encoding)
# Connect to source_ext database
connection_source_ext, cursor_source_ext, schema_source_ext,db_type_source_ext = setDbConnection(logging, json_data,'SOURCE_EXT', encoding)
# Connect to target_ext database
connection_ext, cursor_ext, schema_ext, db_type_ext = setDbConnection(logging, json_data, replicationTarget_EXT, encoding)
if not (connection_ppm and connection_source and connection_source_ext and connection_ext):
raise DB_CONNECTION_ERROR
# Fetch data from the etl_ppm_replication_master table
primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
df = pd_read_sql(primary_query, con=connection_ppm)
logging.info("Count etl_ppm_replication_master: %s" % len(df))
if len(df) == 0:
raise ETL_PPM_REPLICATION_MASTER_ERROR
query_count = 0
for _, row in df.iterrows():
try:
query_count+1
eprm_table_name = row['eprm_table_name'].lower()
eprm_seq_nbr = row['eprm_seq_nbr']
logging.info(f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}")
if eprm_table_name == 'pkg_prd_fed_ext_attrs':
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'"
else:
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
logging.info(f"Reading values")
source_df = pd_read_sql(source_query, con=connection_source_ext)
logging.info(f"Count {len(source_df):,}")
if 'updated_by' in source_df:
source_df['updated_by'] = replicationJobId
# Export data to a CSV file
csv_file_path = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{eprm_table_name}.csv'
source_df.to_csv(csv_file_path, index=False)
logging.info(f"Data exported to CSV: {csv_file_path}")
# Load data from CSV to target table
load_query = f"LOAD DATA LOCAL INFILE '{csv_file_path}' INTO TABLE {eprm_table_name} FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n';"
cursor_ext.execute(load_query)
logging.info(f"Data loaded from CSV to table: {eprm_table_name}")
except Exception as e:
failed_entities.append(eprm_table_name)
logging.error("DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
# ... (remaining code for updating status)
except DB_CONNECTION_ERROR:
# ... (previous code)
except ETL_PPM_REPLICATION_MASTER_ERROR:
# ... (previous code)
except Exception as e:
# ... (previous code)
return return_flag
if __name__ == '__main__':
import logging
from configparser import ConfigParser as conf_ConfigParser
from io import TextIOBase as io_TextIOBase
statFile = ""
try:
parser = argparse.ArgumentParser(description="PPM Product Catalog Replication Script")
parser.add_argument('--releaseId', required=True, help="Release ID")
parser.add_argument('--releaseType', required=True, help="Release Type")
parser.add_argument('--replicationTarget', required=True, help="Replication Target")
parser.add_argument('--opId', required=True, help="Operation ID")
parser.add_argument('--buId', required=True, help="Business Unit ID")
parser.add_argument('--replicationJobId', required=True, help="Replication Job ID")
args = parser.parse_args()
replicationJobId = args.replicationJobId
json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json"
conf_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.conf"
log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.log'
statFile = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.status'
args = parser.parse_args()
statFile = open(statFile, "w")
# Set up logging
CONFIG = conf_ConfigParser()
CONFIG.read(conf_file_path)
logging.basicConfig(filename=log_file
,level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True)
,format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True)
,datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True)
)
logging.info('LOGGER initiated')
encoding=CONFIG.get('CONFIG_GENERIC','DB_CHARSET',raw=True)
# Read JSON data from file
if not os.path.exists(json_file_path):
logging.error("CREDENTIAL FILE MISSING")
logging.error("CREDENTIAL FILE: %s" % json_file_path)
raise FileNotFoundError("CREDENTIAL FILE MISSING")
with open(json_file_path) as json_file:
json_data = json.load(json_file)
if main(args, json_data, log_file, logging, encoding):
print("Insertion of data successful")
statFile.write("SUCCESS")
else:
statFile.write("FAILED")
except FileNotFoundError as ferr:
print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno)))
statFile.write("FAILED")
except Exception as err:
print("Error - {} . Line No - {} ".format(str(err), str