Untitled
plain_text
24 days ago
4.2 kB
1
Indexable
Never
import pandas as pd import json from base64 import b64decode as base_b64decode import logging from pandas import read_sql as pd_read_sql import sys from sqlalchemy import create_engine import argparse from io import TextIOBase as io_TextIOBase from json import load as json_load import os from configparser import ConfigParser as conf_ConfigParser # ... (code up to the main function) def main(args, json_data, log_file, logging, encoding): try: releaseId = args.releaseId opId = args.opId buId = args.buId replicationTarget = args.replicationTarget replicationJobId = args.replicationJobId return_flag = True STATUS = "InProgress" STATUS_MESSAGE = "Insertion of federation tables successful." replicationTarget_EXT = replicationTarget + '_EXT' failed_entities = [] # ... (existing code) # Fetch data from the etl_ppm_replication_master table primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'" df = pd_read_sql(primary_query, con=connection_ppm) logging.info("Count etl_ppm_replication_master: %s" % len(df)) if len(df) == 0: raise ETL_PPM_REPLICATION_MASTER_ERROR query_count = 0 for _, row in df.iterrows(): try: query_count + 1 eprm_table_name = row['eprm_table_name'].lower() eprm_seq_nbr = row['eprm_seq_nbr'] logging.info( f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}") if eprm_table_name == 'pkg_prd_fed_ext_attrs': source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'" else: source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'" logging.info(f"Reading values") # Fetch data in chunks using pd_read_sql chunk_size = 50000 for offset in range(0, total_records, chunk_size): source_query_chunk = source_query + f" LIMIT {chunk_size} OFFSET {offset}" source_df_chunk = pd_read_sql(source_query_chunk, con=connection_source_ext) if source_df_chunk.empty: break if 'updated_by' in source_df_chunk: source_df_chunk['updated_by'] = replicationJobId # Insert the chunk into the target database if not source_df_chunk.empty: source_df_chunk.to_sql(eprm_table_name, con=connection_ext, if_exists='append', index=False, method='multi') logging.info(f"Insertion successful for chunk starting at {offset}") except Exception as e: failed_entities.append(eprm_table_name) logging.error( "DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) # ... (remaining code) except DB_CONNECTION_ERROR: logging.error("EXCEPTION: DB CONNECTION ERROR PC_EXT") return_flag = False except ETL_PPM_REPLICATION_MASTER_ERROR: STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE" logging.error("EXCEPTION:" + STATUS_MESSAGE) return_flag = False except Exception as e: logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) return_flag = False return return_flag if __name__ == '__main__': # ... (code up to the argparse and other setup) # Inside the try block of the main execution try: # ... (existing code) # Fetch data from the etl_ppm_replication_master table primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'" df = pd_read_sql(primary_query, con=connection_ppm) # ... (rest of the code)