Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
6.9 kB
1
Indexable
Never
# ... (previous imports and class definitions)

# ... (setDbConnection function)

def main(args, json_data, log_file, logging, encoding):
    try:
        releaseId = args.releaseId
        opId = args.opId
        buId = args.buId
        replicationTarget = args.replicationTarget
        replicationJobId = args.replicationJobId
        return_flag = True
        STATUS = "InProgress"
        STATUS_MESSAGE = "Insertion of federation tables successful."
        replicationTarget_EXT = replicationTarget + '_EXT'
        failed_entities = []

        # Connect to PPM_PC database
        connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)

        # Connect to source database
        connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data,'SOURCE', encoding)

        # Connect to source_ext database
        connection_source_ext, cursor_source_ext, schema_source_ext,db_type_source_ext = setDbConnection(logging, json_data,'SOURCE_EXT', encoding)

        # Connect to target_ext database
        connection_ext, cursor_ext, schema_ext, db_type_ext  = setDbConnection(logging, json_data, replicationTarget_EXT, encoding)

        if not (connection_ppm and connection_source  and connection_source_ext and connection_ext):
            raise DB_CONNECTION_ERROR
        
        # Fetch data from the etl_ppm_replication_master table
        primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
        df = pd_read_sql(primary_query, con=connection_ppm)
        logging.info("Count etl_ppm_replication_master: %s" % len(df))
        if len(df) == 0:
            raise ETL_PPM_REPLICATION_MASTER_ERROR
        query_count = 0
        
        for _, row in df.iterrows():
            try:
                query_count+1
                eprm_table_name = row['eprm_table_name'].lower()
                eprm_seq_nbr = row['eprm_seq_nbr']
                
                logging.info(f"-- ++++++++++++++++++++++++++++++++ Seq# {query_count}| entity# {eprm_seq_nbr} | PC_EXT | {eprm_table_name}")
                
                if eprm_table_name == 'pkg_prd_fed_ext_attrs':
                    source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}'"
                else:
                    source_query = f"SELECT * FROM {schema_source_ext}.{eprm_table_name} WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}'"
                
                logging.info(f"Reading values")
                source_df = pd_read_sql(source_query, con=connection_source_ext)
                logging.info(f"Count {len(source_df):,}")
                
                if 'updated_by' in source_df:
                    source_df['updated_by'] = replicationJobId
                
                # Export data to a CSV file
                csv_file_path = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{eprm_table_name}.csv'
                source_df.to_csv(csv_file_path, index=False)
                logging.info(f"Data exported to CSV: {csv_file_path}")
                
                # Load data from CSV to target table
                load_query = f"LOAD DATA LOCAL INFILE '{csv_file_path}' INTO TABLE {eprm_table_name} FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n';"
                cursor_ext.execute(load_query)
                logging.info(f"Data loaded from CSV to table: {eprm_table_name}")
                
            except Exception as e:
                failed_entities.append(eprm_table_name)
                logging.error("DB Execution Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
        
        # ... (remaining code for updating status)
        
    except DB_CONNECTION_ERROR:
        # ... (previous code)
    except ETL_PPM_REPLICATION_MASTER_ERROR:
        # ... (previous code)
    except Exception as e:
        # ... (previous code)

    return return_flag

if __name__ == '__main__':
    import logging
    from configparser import ConfigParser as conf_ConfigParser
    from io import TextIOBase as io_TextIOBase

    statFile = ""
    try:
        parser = argparse.ArgumentParser(description="PPM Product Catalog Replication Script")
        parser.add_argument('--releaseId', required=True, help="Release ID")
        parser.add_argument('--releaseType', required=True, help="Release Type")
        parser.add_argument('--replicationTarget', required=True, help="Replication Target")
        parser.add_argument('--opId', required=True, help="Operation ID")
        parser.add_argument('--buId', required=True, help="Business Unit ID")
        parser.add_argument('--replicationJobId', required=True, help="Replication Job ID")
        args = parser.parse_args()
        replicationJobId = args.replicationJobId
        json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.json"
        conf_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_pc_replication.conf"
        log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.log'
        statFile = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}_ppm_pc_replication_insert_pc_ext.status'
        args = parser.parse_args()
        statFile = open(statFile, "w")
        
        # Set up logging
        CONFIG = conf_ConfigParser()
        CONFIG.read(conf_file_path)

        logging.basicConfig(filename=log_file
                            ,level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True)
                            ,format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True)
                            ,datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True)
                            )
        logging.info('LOGGER initiated')
        
        encoding=CONFIG.get('CONFIG_GENERIC','DB_CHARSET',raw=True)
        
        # Read JSON data from file
        if not os.path.exists(json_file_path):
            logging.error("CREDENTIAL FILE MISSING")
            logging.error("CREDENTIAL FILE: %s" % json_file_path)
            raise FileNotFoundError("CREDENTIAL FILE MISSING")
        with open(json_file_path) as json_file:
            json_data = json.load(json_file)
        
        if main(args, json_data, log_file, logging, encoding):
            print("Insertion of data successful")
            statFile.write("SUCCESS")
        else:
            statFile.write("FAILED")

    except FileNotFoundError as ferr:
        print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno)))
        statFile.write("FAILED")

    except Exception as err:
        print("Error - {} . Line No - {} ".format(str(err), str