Untitled

mail@pastecode.io avatarunknown
plain_text
24 days ago
4.2 kB
1
Indexable
Never
import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os
from configparser import ConfigParser as conf_ConfigParser

# ... (code up to the main function)

def main(args, json_data, log_file, logging, encoding):
    try:
        releaseId = args.releaseId
        opId = args.opId
        buId = args.buId
        replicationTarget = args.replicationTarget
        replicationJobId = args.replicationJobId
        return_flag = True
        STATUS = "InProgress"
        STATUS_MESSAGE = "Insertion of federation tables successful."
        replicationTarget_EXT = replicationTarget + '_EXT'
        failed_entities = []

        # ... (existing code)

        # Fetch data from the etl_ppm_replication_master table
        primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
        df = pd_read_sql(primary_query, con=connection_ppm)
        logging.info("Count etl_ppm_replication_master: %s" % len(df))
        if len(df) == 0:
            raise ETL_PPM_REPLICATION_MASTER_ERROR
        query_count = 0

        for _, row in df.iterrows():
            try:
                query_count + 1
                eprm_table_name = row['eprm_table_name'].lower()
                eprm_seq_nbr = row['eprm_seq_nbr']

                logging.info(
                    f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}")

                if eprm_table_name == 'pkg_prd_fed_ext_attrs':
                    source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'"
                else:
                    source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"

                logging.info(f"Reading values")

                # Fetch data in chunks using pd_read_sql
                chunk_size = 50000
                for offset in range(0, total_records, chunk_size):
                    source_query_chunk = source_query + f" LIMIT {chunk_size} OFFSET {offset}"
                    source_df_chunk = pd_read_sql(source_query_chunk, con=connection_source_ext)

                    if source_df_chunk.empty:
                        break

                    if 'updated_by' in source_df_chunk:
                        source_df_chunk['updated_by'] = replicationJobId

                    # Insert the chunk into the target database
                    if not source_df_chunk.empty:
                        source_df_chunk.to_sql(eprm_table_name, con=connection_ext, if_exists='append', index=False, method='multi')
                        logging.info(f"Insertion successful for chunk starting at {offset}")
            except Exception as e:
                failed_entities.append(eprm_table_name)
                logging.error(
                    "DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))

        # ... (remaining code)
        
    except DB_CONNECTION_ERROR:
        logging.error("EXCEPTION: DB CONNECTION ERROR PC_EXT")
        return_flag = False
    except ETL_PPM_REPLICATION_MASTER_ERROR:
        STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE"
        logging.error("EXCEPTION:" + STATUS_MESSAGE)
        return_flag = False
    except Exception as e:
        logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
        return_flag = False

    return return_flag

if __name__ == '__main__':
    # ... (code up to the argparse and other setup)

    # Inside the try block of the main execution
    try:
        # ... (existing code)

        # Fetch data from the etl_ppm_replication_master table
        primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
        df = pd_read_sql(primary_query, con=connection_ppm)
        # ... (rest of the code)