Untitled

 avatar
unknown
plain_text
2 years ago
20 kB
3
Indexable
# ===================== ppm_sc_changesonly dna.py --operation ChangesOnly --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId SC_JOB_999
# ===========================================================================================================================================================
# PPM PRODUCT CATALOG (PC) REPLICATION - CE CUSTOM TABLES REPLICATE
#   DATE     AUTHOR     VER   CHANGE DESCRIPTION
# --------  ---------  -----  ------------------
# 21.08.23  Veerendra    1.0   The below script replicates ppm table records dynamically from "etl_ppm_replication_master" "PC_EXT"
#
#
# ================================================================================================================================================================================

import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os
import datetime


# define Python user-defined exceptions
class Error(Exception):
    """Base class for other exceptions"""
    pass


# define Python user-defined exceptions
class ETL_PPM_REPLICATION_MASTER_ERROR(Error):
    pass


class DB_CONNECTION_ERROR(Error):
    pass


def is_datetime(value):
    try:
        # Attempt to parse the value as a datetime
        datetime.datetime.strptime(str(value), '%Y-%m-%d %H:%M:%S')
        return True
    except ValueError:
        return False


def setDbConnection(logging, json_data, serverInfo, encoding):
    from sqlalchemy import create_engine as sqlalchemy_create_engine
    from base64 import b64decode as base_b64decode
    import mysql.connector
    try:
        cnx = cursor = schema = db_type = None
        encrypt = json_data.get(serverInfo, {}).get('ENCRYPT')
        host = json_data.get(serverInfo, {}).get('DB_HOST')
        port = json_data.get(serverInfo, {}).get('DB_PORT')
        user = json_data.get(serverInfo, {}).get('DB_USER')
        db_type = json_data.get(serverInfo, {}).get('DB_TYPE')
        schema = json_data.get(serverInfo, {}).get('DB_SCHEMA')
        if encrypt == 'Y':
            password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8')
        else:
            password = json_data.get(serverInfo, {}).get('DB_PASSWORD')

        if db_type in ('MYSQL', 'MARIA'):
            connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
                                                                          , password
                                                                          , host
                                                                          , port
                                                                          , schema))
        elif db_type == 'ORACLE':
            connection_text = ('oracle://%s:%s@%s:%s/%s' % (user
                                                            , password
                                                            , host
                                                            , port
                                                            , json_data.get(serverInfo, {}).get('DB_SID')
                                                            ))
        cnx = sqlalchemy_create_engine(connection_text
                                       , encoding=encoding)
        # ,fast_executemany=True
        # , connect_args={'connect_timeout': 600})
        cursor = cnx.connect()
        logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}")
    # except mysql.connector.Error as dberr:
    #    logging.error("DATABASE CONNECTION ERROR")
    #    logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno)))
    #    cnx = cursor = schema = None
    except Exception as dbexp:
        logging.error("DATABASE CONNECTION EXCEPTION")
        logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno)))
        cnx = cursor = schema = None

    return cnx, cursor, schema, db_type


# def main(args, json_data, log_file, logging, encoding):
def process_changes_only(pdb_ppm_sc, pdb_source, pdb_target, ps_rep_env, ps_op_id, ps_bu_id, ps_rep_job_id, pl_logging):
    try:
        replicationJobId = args.replicationJobId
        return_flag = True
        STATUS = "InProgress"
        STATUS_MESSAGE = "Updating service catalog changes successful."
        failed_entities = []
        opId=ps_op_id
        buId=ps_bu_id

        insert_sucess_count = 0
        insert_failed_count = 0
        update_sucess_count = 0
        update_failed_count = 0
        primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='SC' AND eprm_enabled_flg='Y'"
        primary_df = pd_read_sql(primary_query, con=pdb_ppm_sc)
        for index, row in primary_df.iterrows():
            # Fetch primary key column name and table name
            table_name = row['eprm_table_name'].lower()
            eprm_table_col_pk = row['eprm_table_col_pk']
            pk = eprm_table_col_pk.lower()

            source_query = f"SELECT * FROM {schema_source}.{table_name} where  OP_ID='{opId}' AND BU_ID='{buId}'"
            source_df = pd.read_sql(source_query, connection_source)
            target_query = f"SELECT * FROM {schema_ext}.{table_name} WHERE OP_ID='{opId}' AND BU_ID='{buId}'"
            target_df = pd.read_sql(target_query, connection_target)

            for index, source_row in source_df.iterrows():
                pk_value = source_row[pk]

                # Check if the primary key exists in the target DataFrame
                if pk_value not in target_df[pk].values:

                    # Replace 'None' and 'NaT' with None in source_row
                    for column_name, source_val in source_row.items():
                        if source_val == 'None' or source_val == 'NaT' or source_val == 'nan':
                            source_row[column_name] = NULL

                    # Generate an INSERT query dynamically
                    insert_query = f"INSERT INTO {schema_ext}.{table_name} ("
                    insert_columns = []
                    insert_values = []

                    for column_name, source_val in source_row.items():
                        if source_val is not None:
                            if isinstance(source_val, str) and source_val.startswith('TO_DATE'):
                                # If it already starts with TO_DATE, don't add TO_DATE again
                                insert_values.append(source_val)
                            elif is_datetime(source_val):
                                # Format datetime values using the appropriate function for the database type
                                if db_type_ext == 'ORACLE':
                                    insert_values.append(f"TO_DATE('{source_val}', 'YYYY-MM-DD HH24:MI:SS')")
                                elif db_type_ext in ('MYSQL', 'MARIA'):
                                    # For MariaDB, use STR_TO_DATE
                                    insert_values.append(f"STR_TO_DATE('{source_val}', '%Y-%m-%d %H:%i:%s')")
                                else:
                                    # Enclose other values in single quotes
                                    insert_values.append(f"'{source_val}'")
                                insert_columns.append(column_name)  # Add the column name
                            elif str(source_val) == 'NaT':
                                # Replace 'NaT' with NULL without single quotes
                                insert_values.append('NULL')
                                insert_columns.append(column_name)  # Add the column name
                            elif column_name == 'extended_rule_code':
                                parts = source_val.split('==')
                                if len(parts) == 2:
                                    extended_rule_code = f"'{parts[0]}=='{parts[1]}''"
                                    insert_values.append(extended_rule_code)
                                insert_columns.append(column_name)
                            elif str(source_val) == 'nan':
                                insert_values.append('NULL')
                                insert_columns.append(column_name)
                            else:
                                # Enclose other values in single quotes
                                insert_values.append(f"'{source_val}'")
                                insert_columns.append(column_name)  # Add the column name
                        else:
                            insert_values.append('NULL')  # Insert a true NULL
                            insert_columns.append(column_name)  # Add the column name

                        # Construct the INSERT query with column names
                    insert_query = f"INSERT INTO {schema_ext}.{table_name} ({', '.join(insert_columns)}) VALUES ({', '.join(insert_values)})"

                    # Execute the INSERT query
                    try:
                        print(insert_query)
                        cursor_target.execute(insert_query)
                        insert_sucess_count += 1
                    except Exception as e:
                        pl_logging.error(
                            "Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
                        insert_failed_count += 1
                else:
                    # Fetch the corresponding row from the target DataFrame based on the primary key
                    target_row = target_df[target_df[pk] == pk_value].iloc[0]
                    if not source_row.equals(target_row):
                        columns_to_update = []
                        for column_name, source_val in source_row.items():
                            target_val = target_row[column_name]
                            if source_val != target_val:
                                if is_datetime(source_val):
                                    # Format datetime values using the appropriate function for the database type
                                    if db_type_ext == 'ORACLE':
                                        update_value = f"TO_DATE('{source_val}', 'YYYY-MM-DD HH24:MI:SS')"
                                    elif db_type_ext in ('MYSQL', 'MARIA'):
                                        # For MariaDB and MySQL, use STR_TO_DATE
                                        update_value = f"STR_TO_DATE('{source_val}', '%Y-%m-%d %H:%i:%s')"
                                    else:
                                        # Enclose other values in single quotes
                                        update_value = f"'{source_val}'"
                                elif str(source_val) == 'NaT':
                                    # Replace 'NaT' with NULL without single quotes
                                    update_value = 'NULL'
                                elif str(source_val) == 'nan':
                                    # Replace 'NaT' with NULL without single quotes
                                    update_value = 'NULL'
                                elif str(source_val) == 'None':
                                    update_value = 'NULL'


                                elif column_name == 'extended_rule_code':
                                    parts = source_val.split('==')
                                    if len(parts) == 2:
                                        extended_rule_code = f"'{parts[0]}=='{parts[1]}'"
                                        update_value = extended_rule_code
                                else:
                                    if column_name == 'created_by':
                                        update_value = f"'{source_val}'"
                                    else:
                                        # Handle non-datetime columns (e.g., strings, numbers) here
                                        update_value = f"'{source_val}'"

                                # Add the column name and formatted value to the update statement
                                columns_to_update.append(f"{column_name} = {update_value}")

                        # Generate an update query dynamically
                        if columns_to_update:
                            update_query = f"UPDATE {schema_ext}.{table_name} SET "
                            update_query += ", ".join(columns_to_update)
                            update_query += f" WHERE {eprm_table_col_pk} = '{pk_value}' AND OP_ID='{opId}' AND BU_ID='{buId}'"

                            try:
                                print(update_query)
                                cursor_target.execute(update_query)
                                update_sucess_count += 1
                            except Exception as e:
                                pl_logging.error(
                                    "Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
                                update_failed_count += 1

        print(f"Successful inserts: {insert_sucess_count}")
        print(f"Failed inserts: {insert_failed_count}")
        print(f"Successful updates: {update_sucess_count}")
        print(f"Failed updates: {update_failed_count}")

        pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n")
        if len(failed_entities) > 0:
            return_flag = False
            STATUS = "Error"
            STATUS_MESSAGE = str(failed_entities).replace("'", '').replace('"', '')

        pl_logging.info("STATUS: %s" % STATUS)
        pl_logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE)

        query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \
                       + ", status_message='" + STATUS_MESSAGE + "'" \
                       + ", error_description=NULL" \
                       + ", updated_by='" + replicationJobId + "' /**/" \

        if db_type_source == 'ORACLE':
            query_update = query_update.replace('/**/', ', updated_date=SYSDATE')
        elif db_type_source in ('MARIA', 'MYSQL'):
            query_update = query_update.replace('/**/', ', updated_date=NOW()')

        pl_logging.info("-- + ppm_replication_status - UPDATE \n")
        pl_logging.info(query_update + ";\n")
        pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n")
        res = cursor_source.execute(query_update)
        pl_logging.info("query_update: %s" % res)
    except ETL_PPM_REPLICATION_MASTER_ERROR:
        STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE"
        pl_logging.error("EXCEPTION:" + STATUS_MESSAGE)
        return_flag = False
    except Exception as e:
        pl_logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
        return_flag = False

    return return_flag


if __name__ == '__main__':
    import logging
    from configparser import ConfigParser as conf_ConfigParser

    try:
        # python3 ppm_sc_changesonly.py --operation ChangesOnly --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId SC_JOB_999
        parser = argparse.ArgumentParser(description="PPM Service Catalog Replication Script")
        parser.add_argument('--operation', required=True, help="Operation Type")
        parser.add_argument('--replicationTarget', required=True, help="Replication Target")
        parser.add_argument('--opId', required=True, help="Operation ID")
        parser.add_argument('--buId', required=True, help="Business Unit ID")
        parser.add_argument('--replicationJobId', required=True, help="Replication Job ID")
        args = parser.parse_args()
        replicationJobId = args.replicationJobId
        json_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/dna.json"
        conf_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/ppm_sc_replication.conf"
        log_file = f'/app/scripts/PPM_Release_Management/Service_Catalog_ETL/logs/{replicationJobId}_ppm_sc_changesonly.log'
        args = parser.parse_args()

        # Set up logging
        CONFIG = conf_ConfigParser()
        CONFIG.read(conf_file_path)

        logging.basicConfig(filename=log_file
                            , level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True)
                            , format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True)
                            , datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True)
                            )
        logging.info('LOGGER initiated')

        encoding = CONFIG.get('CONFIG_GENERIC', 'DB_CHARSET', raw=True)

        # Read JSON data from file
        if not os.path.exists(json_file_path):
            logging.error("CREDENTIAL FILE MISSING")
            logging.error("CREDENTIAL FILE: %s" % json_file_path)
            raise FileNotFoundError("CREDENTIAL FILE MISSING")

        with open(json_file_path) as json_file:
            json_data = json_load(json_file)

        # Connect to PPM_PC database
        connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)

        # Connect to source database
        connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE',
                                                                                          encoding)

        # Connect to target_ext database
        connection_target, cursor_target, schema_ext, db_type_ext = setDbConnection(logging, json_data,
                                                                                    args.replicationTarget,
                                                                                    encoding)

        if not (connection_ppm and connection_source and connection_target):
            raise DB_CONNECTION_ERROR

        # if main(args, json_data, log_file, logging, encoding):
        if process_changes_only(pdb_ppm_sc=connection_ppm
                , pdb_source=connection_source
                , pdb_target=connection_target
                , ps_rep_env=args.replicationTarget
                , ps_op_id=args.opId
                , ps_bu_id=args.buId
                , ps_rep_job_id=args.replicationJobId
                , pl_logging=logging):
            print("Update successful")
        else:
            print("Update FAILED")

    except DB_CONNECTION_ERROR:
        logging.error("EXCEPTION: DB CONNECTION ERROR")
        print("EXCEPTION: DB CONNECTION ERROR")

    except FileNotFoundError as ferr:
        print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno)))

    except Exception as err:
        print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))


in the above while reading source and target dataframes i dont have some tables and for that iam getting error as below

13:54:51|ERROR|process_changes_only|282|Error - (cx_Oracle.DatabaseError) ORA-00942: table or view does not exist
[SQL: SELECT * FROM tibtcare_ppm_st.sc_ppm_item_marketing_info WHERE OP_ID='HOB' AND BU_ID='DEFAULT']
(Background on this error at: https://sqlalche.me/e/14/4xp6) . Line No - 123 


so i want you to hadle that if table doesnt exits then it should log that tables doesnt have in target database proceeding for next table like that...code should not stop
Editor is loading...