Untitled
unknown
plain_text
a year ago
6.9 kB
1
Indexable
Never
# ... (previous imports and class definitions) # ... (setDbConnection function) def main(args, json_data, log_file, logging, encoding): try: releaseId = args.releaseId opId = args.opId buId = args.buId replicationTarget = args.replicationTarget replicationJobId = args.replicationJobId return_flag = True STATUS = "InProgress" STATUS_MESSAGE = "Insertion of federation tables successful." replicationTarget_EXT = replicationTarget + '_EXT' failed_entities = [] # Connect to PPM_PC database connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding) # Connect to source database connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data,'SOURCE', encoding) # Connect to source_ext database connection_source_ext, cursor_source_ext, schema_source_ext,db_type_source_ext = setDbConnection(logging, json_data,'SOURCE_EXT', encoding) # Connect to target_ext database connection_ext, cursor_ext, schema_ext, db_type_ext = setDbConnection(logging, json_data, replicationTarget_EXT, encoding) if not (connection_ppm and connection_source and connection_source_ext and connection_ext): raise DB_CONNECTION_ERROR # Fetch data from the etl_ppm_replication_master table primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'" df = pd_read_sql(primary_query, con=connection_ppm) logging.info("Count etl_ppm_replication_master: %s" % len(df)) if len(df) == 0: raise ETL_PPM_REPLICATION_MASTER_ERROR query_count = 0 for _, row in df.iterrows(): try: query_count+1 eprm_table_name = row['eprm_table_name'].lower() eprm_seq_nbr = row['eprm_seq_nbr'] logging.info(f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}") if eprm_table_name == 'pkg_prd_fed_ext_attrs': source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'" else: source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'" logging.info(f"Reading values") source_df = pd_read_sql(source_query, con=connection_source_ext) logging.info(f"Count {len(source_df):,}") if 'updated_by' in source_df: source_df['updated_by'] = replicationJobId # Export data to a CSV file csv_file_path = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{eprm_table_name}.csv' source_df.to_csv(csv_file_path, index=False) logging.info(f"Data exported to CSV: {csv_file_path}") # Load data from CSV to target table load_query = f"LOAD DATA LOCAL INFILE '{csv_file_path}' INTO TABLE {eprm_table_name} FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n';" cursor_ext.execute(load_query) logging.info(f"Data loaded from CSV to table: {eprm_table_name}") except Exception as e: failed_entities.append(eprm_table_name) logging.error("DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) # ... (remaining code for updating status) except DB_CONNECTION_ERROR: # ... (previous code) except ETL_PPM_REPLICATION_MASTER_ERROR: # ... (previous code) except Exception as e: # ... (previous code) return return_flag if __name__ == '__main__': import logging from configparser import ConfigParser as conf_ConfigParser from io import TextIOBase as io_TextIOBase statFile = "" try: parser = argparse.ArgumentParser(description="PPM Product Catalog Replication Script") parser.add_argument('--releaseId', required=True, help="Release ID") parser.add_argument('--releaseType', required=True, help="Release Type") parser.add_argument('--replicationTarget', required=True, help="Replication Target") parser.add_argument('--opId', required=True, help="Operation ID") parser.add_argument('--buId', required=True, help="Business Unit ID") parser.add_argument('--replicationJobId', required=True, help="Replication Job ID") args = parser.parse_args() replicationJobId = args.replicationJobId json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json" conf_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.conf" log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.log' statFile = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.status' args = parser.parse_args() statFile = open(statFile, "w") # Set up logging CONFIG = conf_ConfigParser() CONFIG.read(conf_file_path) logging.basicConfig(filename=log_file ,level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True) ,format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True) ,datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True) ) logging.info('LOGGER initiated') encoding=CONFIG.get('CONFIG_GENERIC','DB_CHARSET',raw=True) # Read JSON data from file if not os.path.exists(json_file_path): logging.error("CREDENTIAL FILE MISSING") logging.error("CREDENTIAL FILE: %s" % json_file_path) raise FileNotFoundError("CREDENTIAL FILE MISSING") with open(json_file_path) as json_file: json_data = json.load(json_file) if main(args, json_data, log_file, logging, encoding): print("Insertion of data successful") statFile.write("SUCCESS") else: statFile.write("FAILED") except FileNotFoundError as ferr: print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno))) statFile.write("FAILED") except Exception as err: print("Error - {} . Line No - {} ".format(str(err), str