Untitled
unknown
plain_text
2 years ago
4.2 kB
8
Indexable
import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os
from configparser import ConfigParser as conf_ConfigParser
# ... (code up to the main function)
def main(args, json_data, log_file, logging, encoding):
try:
releaseId = args.releaseId
opId = args.opId
buId = args.buId
replicationTarget = args.replicationTarget
replicationJobId = args.replicationJobId
return_flag = True
STATUS = "InProgress"
STATUS_MESSAGE = "Insertion of federation tables successful."
replicationTarget_EXT = replicationTarget + '_EXT'
failed_entities = []
# ... (existing code)
# Fetch data from the etl_ppm_replication_master table
primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
df = pd_read_sql(primary_query, con=connection_ppm)
logging.info("Count etl_ppm_replication_master: %s" % len(df))
if len(df) == 0:
raise ETL_PPM_REPLICATION_MASTER_ERROR
query_count = 0
for _, row in df.iterrows():
try:
query_count + 1
eprm_table_name = row['eprm_table_name'].lower()
eprm_seq_nbr = row['eprm_seq_nbr']
logging.info(
f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}")
if eprm_table_name == 'pkg_prd_fed_ext_attrs':
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'"
else:
source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
logging.info(f"Reading values")
# Fetch data in chunks using pd_read_sql
chunk_size = 50000
for offset in range(0, total_records, chunk_size):
source_query_chunk = source_query + f" LIMIT {chunk_size} OFFSET {offset}"
source_df_chunk = pd_read_sql(source_query_chunk, con=connection_source_ext)
if source_df_chunk.empty:
break
if 'updated_by' in source_df_chunk:
source_df_chunk['updated_by'] = replicationJobId
# Insert the chunk into the target database
if not source_df_chunk.empty:
source_df_chunk.to_sql(eprm_table_name, con=connection_ext, if_exists='append', index=False, method='multi')
logging.info(f"Insertion successful for chunk starting at {offset}")
except Exception as e:
failed_entities.append(eprm_table_name)
logging.error(
"DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
# ... (remaining code)
except DB_CONNECTION_ERROR:
logging.error("EXCEPTION: DB CONNECTION ERROR PC_EXT")
return_flag = False
except ETL_PPM_REPLICATION_MASTER_ERROR:
STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE"
logging.error("EXCEPTION:" + STATUS_MESSAGE)
return_flag = False
except Exception as e:
logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
return_flag = False
return return_flag
if __name__ == '__main__':
# ... (code up to the argparse and other setup)
# Inside the try block of the main execution
try:
# ... (existing code)
# Fetch data from the etl_ppm_replication_master table
primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
df = pd_read_sql(primary_query, con=connection_ppm)
# ... (rest of the code)
Editor is loading...