Untitled

mail@pastecode.io avatarunknown
plain_text
a month ago
28 kB
1
Indexable
Never

import pandas as pd
import mysql.connector
import json
from base64 import b64decode as base_b64decode
import logging
import datetime
import os
from sqlalchemy import create_engine as sqlalchemy_create_engine
from pandas import read_sql as pd_read_sql
import sys

releaseId = '275.2'
releaseType = 'TESTING'
replicationTarget = 'SOURCE'
catalogId = ''
opId = 'HOB'
buId = 'DEFAULT'
replicationJobId = 'REP_990_234'
json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_reply.json"
sql_log_file = f"/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}ppm_reply.sql"
log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}ppm_reply.log'
# sql_log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.sql"
# json_file_path = os.get_path("PPM_PC_CONFIG") + "/ppm_pc_replication.json"
# log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.log"
release_type = releaseType.casefold()
STATUS = replicationTarget
STATUS_MESSAGE = "success"

# Set up logging
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
query_count = 0

# Open SQL log file for writing
sql_log_file = open(sql_log_file, "w")


# Function to write SQL query to the log file
def write_sql(query_info):
    sql_log_file.write(query_info)
    sql_log_file.write('\n')


try:
    # Function to establish a database connection
    def connect_to_database(json_data, replicationTarget):
        try:
            encrypt = json_data.get(replicationTarget, {}).get('ENCRYPT')
            host = json_data.get(replicationTarget, {}).get('DB_HOST')
            port = json_data.get(replicationTarget, {}).get('DB_PORT')
            user = json_data.get(replicationTarget, {}).get('DB_USER')
            db_type = json_data.get(replicationTarget, {}).get('DB_TYPE')
            schema = json_data.get(replicationTarget, {}).get('DB_SCHEMA')
            sid = json_data.get(replicationTarget, {}).get('DB_SID')
            if encrypt == 'Y':
                password = b64decode(json_data.get(replicationTarget, {}).get('DB_PASSWORD')).decode('utf-8')
            else:
                password = json_data.get(replicationTarget, {}).get('DB_PASSWORD')

            if db_type == 'MYSQL':
                cnx = mysql.connector.connect(user=user, password=password, host=host, port=port)
                cursor = cnx.cursor()
                logging.info(f"Connected to MySQL database server {replicationTarget}: {host}:{port}")

            elif db_type == 'ORACLE':
                

            return cnx, cursor, schema

        except Exception as dbexp:
            print("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno)))


    try:
        # Read JSON data from file
        with open(json_file_path) as json_file:
            json_data = json.load(json_file)
    except FileNotFoundError:
        print("File not found: " + json_file_path)

    try:
        # Connect to PPM_PC database
        conn_ppm, cursor_ppm, schema_ppm = connect_to_database(json_data, 'PPM_PC')

        # Fetch data from the etl_ppm_replication_master table
        primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master"

        df = pd_read_sql(primary_query, con=conn_ppm)
        columns = df.columns.tolist()
        rows = df.values.tolist()
        if len(rows) == 0:
            STATUS = "Error"
            STATUS_MESSAGE = "No records present in etl_ppm_replication_master table"

        # replicationTarget
        connection_tar, cursor_tar, schema_tar = connect_to_database(json_data, replicationTarget)
        # cursor_tar = connection_tar.cursor()
    except Exception as e:
        print("An Error occured:", str(e))
    replaced_string = ""

    try:
        # Deleting records if releaseType is deployed or cancelled for MASTER-CHILD,AUDIT-CHILD,MASTER,AUDIT,RELEASE
        if release_type == 'deployed' or release_type == 'cancelled':
            logging.info(f"processing - {releaseType}")
            order = ['MASTER-CHILD', 'AUDIT-CHILD', 'MASTER', 'AUDIT', 'RELEASE']
            logging.info("order of execution - 'MASTER-CHILD', 'AUDIT-CHILD', 'MASTER', 'AUDIT', 'RELEASE'")
            filtered_df = df.loc[
                df['eprm_catalog'].isin(['PC', 'RELEASE']) & (df['eprm_enabled_flg'].isin(['Y'])) & df[
                    'eprm_table_type'].isin(
                    order)].copy()
            # filtered_df.sort_values(by='eprm_seq_nbr', ascending=True, inplace=True)
            filtered_df['eprm_table_type'] = pd.Categorical(filtered_df['eprm_table_type'], categories=order,
                                                            ordered=True)
            filtered_df.sort_values(by=['eprm_table_type', 'eprm_seq_nbr'], ascending=[True, True], inplace=True)

            for _, row in filtered_df.iterrows():
                eprm_table_name = row['eprm_table_name']
                eprm_join_cols_entity = row['eprm_join_cols_entity']
                eprm_join_cols_reim = row['eprm_join_cols_reim']
                eprm_table_alias = row['eprm_table_alias']
                eprm_table_type = row['eprm_table_type']
                eprm_parent_table_name = row['eprm_parent_table_name']
                eprm_seq_nbr = row['eprm_seq_nbr']
                # df = df.sort_values('eprm_seq_nbr', ascending=True)

                if eprm_table_type == 'AUDIT':
                    eprm_table_col_pk = row['eprm_table_col_pk']
                    query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE (" + eprm_table_col_pk + f") IN (SELECT entity_ref_nbr FROM  {schema_tar}.release_entity_inst_map WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "')"
                    try:
                        logging.info(f"processing {eprm_table_type}")
                        cursor_tar.execute(query)
                        result = cursor_tar.fetchone()
                        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                        query_info += f"-- #Query: Result:{result[0]}\n"
                        query_count += 1
                        query_info += query + ";\n"
                        write_sql(query_info)
                        print(f"Count for {eprm_table_name}: {result[0]} (audit){eprm_seq_nbr}")
                        logging.info(f"Count for {eprm_table_name}: {result[0]} (audit)")
                    except mysql.connector.Error as err:
                        print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
                        print(f"Error occurred while executing the query:{query}: {err}")
                        logging.info(f"Error occurred while executing the query:{query}: {err}")
                if eprm_table_type == 'RELEASE':
                    eprm_table_name = row['eprm_table_name']
                    eprm_seq_nbr = row['eprm_seq_nbr']
                    query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'"
                    try:
                        logging.info(f"processing {eprm_table_type}")
                        cursor_tar.execute(query)
                        result = cursor_tar.fetchone()
                        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                        query_info += f"-- #Query: Result:{result[0]}\n"
                        query_count += 1
                        query_info += query + ";\n"
                        write_sql(query_info)
                        print(f"Count for {eprm_table_name}: {result[0]} (release){eprm_seq_nbr}")
                        logging.info(f"Count for {eprm_table_name}: {result[0]} (release)")
                    except mysql.connector.Error as err:
                        print(f"Error occurred while executing the query:{query}: {err}")
                        logging.info(f"Error occurred while executing the query:{query}: {err}")
                elif eprm_table_type == 'MASTER':
                    clause_removed_reim_join = eprm_join_cols_reim.replace(" AND", "").replace("=", "")
                    remove_string = [eprm_table_alias + "." + v for v in eprm_join_cols_entity.split(",")]

                    if eprm_table_alias + ".version" not in remove_string and eprm_table_alias + ".version" in eprm_join_cols_reim:
                        remove_string = remove_string + [eprm_table_alias + ".version"]
                        eprm_join_cols_entity = eprm_join_cols_entity + ", version"

                    reim_select_cols = clause_removed_reim_join
                    for v in remove_string:
                        reim_select_cols = reim_select_cols.replace(v, "").replace("reim.", "")
                    reim_select_cols = reim_select_cols.replace(" ", ",")

                    if eprm_table_alias + ".version" in remove_string and "reim.version" not in eprm_join_cols_reim:
                        reim_select_cols = reim_select_cols + ",version"

                    split_parts = reim_select_cols.split(',')
                    replaced_parts = []
                    for part in split_parts:
                        dot_index = part.find('.')
                        if dot_index != -1:
                            replaced_part = part[dot_index + 1:]
                            replaced_parts.append(replaced_part)
                        else:
                            replaced_parts.append(part)

                    replaced_string = ','.join(replaced_parts)

                    secondary_query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE (" + eprm_join_cols_entity + ") IN (SELECT " + replaced_string + f" FROM {schema_tar}.release_entity_inst_map WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "')"

                    try:
                        logging.info(f"processing {eprm_table_type}")
                        cursor_tar.execute(secondary_query)
                        result = cursor_tar.fetchone()
                        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                        query_info += f"-- #Query: Result:{result[0]}\n"
                        query_info += secondary_query + ";\n"
                        query_count += 1
                        write_sql(query_info)
                        logging.info(f"Count for {eprm_table_name}: {result[0]} (master)")
                        print(f"Count for {eprm_table_name}: {result[0]} (master) {eprm_seq_nbr}")
                    except mysql.connector.Error as err:
                        print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
                        print(f"Error occurred while executing the query:{secondary_query}: {err}")
                        logging.info(f"Error occurred while executing the query:{secondary_query}: {err}")
                elif eprm_table_type == 'MASTER-CHILD':
                    qry = f"select * from {schema_tar}.etl_ppm_replication_master WHERE eprm_table_name=" + "'" + eprm_parent_table_name + "'"
                    cursor_tar.execute(qry)
                    rows = cursor_tar.fetchall()
                    columns = [desc[0] for desc in cursor_tar.description]
                    df_child = pd.DataFrame(rows, columns=columns)
                    filtered_df_child = df_child[
                        df_child['eprm_catalog'].isin(['PC']) & (df_child['eprm_enabled_flg'] == 'Y') & df_child[
                            'eprm_table_type'].isin(
                            ['MASTER'])]
                    for _, row in filtered_df_child.iterrows():
                        eprm_table_name_child = row['eprm_table_name']
                        eprm_join_cols_entity_child = row['eprm_join_cols_entity']
                        eprm_join_cols_reim_child = row['eprm_join_cols_reim']
                        eprm_table_type_child = row['eprm_table_type']
                        if eprm_table_type_child == 'MASTER':
                            split_conditions_reim = eprm_join_cols_reim.split('AND')
                            where_clause_reim = []
                            for condition in split_conditions_reim:
                                condition = condition.split('=')[1]
                                condition = condition.split('.')[1]
                                where_clause_reim.append(condition.strip())
                            where_clause_reim = ','.join(where_clause_reim)

                            values_child = eprm_join_cols_entity_child.split('=')
                            column_names_child = [value.split('.')[-1].strip() for value in values_child]
                            where_clause_child = ', '.join(column_names_child)

                            split_conditions = eprm_join_cols_reim_child.split(' AND ')
                            result = [condition.split('=')[0].split('.')[-1] for condition in split_conditions]

                            where_clause = ', '.join(result)

                            final_query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE ({where_clause_reim}) IN (select {where_clause_reim} from {schema_tar}.{eprm_table_name_child} where ({where_clause_child}) IN (select " + where_clause + f" FROM {schema_tar}.RELEASE_ENTITY_INST_MAP WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'))"
                            try:
                                cursor_tar.execute(final_query)
                                logging.info(f"processing - {eprm_table_type}")
                                result = cursor_tar.fetchone()
                                query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                                query_info += f"-- #Query: Result:{result[0]}\n"
                                query_count += 1
                                query_info += final_query + ";\n"
                                write_sql(query_info)
                                logging.info(f"Count for {eprm_table_name}: {result[0]}")
                                print(
                                    f"Count for {eprm_table_name}: {result[0]}" + " ---------------------MASTER-AUDIT------------------")
                            except mysql.connector.Error as err:
                                print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
                                print(f"Error occurred while executing the query:{final_query}: {err}")
                                logging.info(f"Error occurred while executing the query:{final_query}: {err}")
                elif eprm_table_type == 'AUDIT-CHILD':
                    qry = f"select * from {schema_tar}.etl_ppm_replication_master WHERE eprm_table_name=" + "'" + eprm_parent_table_name + "'"
                    cursor_tar.execute(qry)
                    rows = cursor_tar.fetchall()
                    columns = [desc[0] for desc in cursor_tar.description]
                    df_child = pd.DataFrame(rows, columns=columns)
                    filtered_df_child = df_child[
                        df_child['eprm_catalog'].isin(['PC']) & (df_child['eprm_enabled_flg'] == 'Y') & df_child[
                            'eprm_table_type'].isin(
                            ['AUDIT'])]
                    for _, row in filtered_df_child.iterrows():
                        eprm_table_name_child = row['eprm_table_name']
                        eprm_join_cols_entity_child = row['eprm_join_cols_entity']
                        eprm_join_cols_reim_child = row['eprm_join_cols_reim']
                        eprm_table_type_child = row['eprm_table_type']
                        if eprm_table_type_child == 'AUDIT':
                            split_conditions_reim = eprm_join_cols_reim.split('AND')
                            where_clause_reim = []
                            for condition in split_conditions_reim:
                                condition = condition.split('=')[1]
                                condition = condition.split('.')[1]
                                where_clause_reim.append(condition.strip())
                            where_clause_reim = ','.join(where_clause_reim)

                            values_child = eprm_join_cols_entity_child.split('=')
                            column_names_child = [value.split('.')[-1].strip() for value in values_child]
                            where_clause_child = ', '.join(column_names_child)

                            split_conditions = eprm_join_cols_reim_child.split(' AND ')
                            result = [condition.split('=')[0].split('.')[-1] for condition in split_conditions]

                            where_clause = ', '.join(result)

                            final_query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE ({where_clause_reim}) IN (select {where_clause_reim} from {schema_tar}.{eprm_table_name_child} where ({where_clause_child}) IN (select " + where_clause + f" FROM {schema_tar}.RELEASE_ENTITY_INST_MAP WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'))"
                            try:
                                logging.info(f"processing - {eprm_table_type}")
                                cursor_tar.execute(final_query)
                                result = cursor_tar.fetchone()
                                query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                                query_info += f"-- #Query: Result:{result[0]}\n"
                                query_count += 1
                                query_info += final_query + ";\n"
                                write_sql(query_info)
                                print(
                                    f"Count for {eprm_table_name}: {result[0]}" + " ---------------------AUDIT-CHILD------------------")
                                logging.info(f"Count for {eprm_table_name}: {result[0]}")
                            except mysql.connector.Error as err:
                                print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))

                                print(f"Error occurred while executing the query:{final_query}: {err}")
                                logging.info(f"Error occurred while executing the query:{final_query}: {err}")



        elif release_type == 'testing' or release_type == 'inprogress':
            logging.info(f"{releaseType}")
            filtered_df = df[df['eprm_catalog'].isin(['PC', 'RELEASE']) & (df['eprm_enabled_flg'].isin(['Y'])) & df[
                'eprm_table_type'].isin(
                ['AUDIT', 'RELEASE'])]
            filtered_df = filtered_df.sort_values('eprm_table_type')
            df = df.sort_values('eprm_seq_nbr', ascending=False)
            for _, row in filtered_df.iterrows():
                eprm_table_name = row['eprm_table_name']
                eprm_table_col_pk = row['eprm_table_col_pk']
                eprm_table_type = row['eprm_table_type']
                eprm_seq_nbr = row['eprm_seq_nbr']

                if eprm_table_type == 'RELEASE':
                    eprm_table_name = row['eprm_table_name']
                    query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'"
                    try:
                        logging.info(f"processing {eprm_table_type}")
                        cursor_tar.execute(query)
                        result = cursor_tar.fetchone()
                        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                        query_info += f"-- #Query: Result:{result[0]}\n"
                        query_count += 1
                        query_info += query + ";\n"
                        write_sql(query_info)
                        print(f"Count for {eprm_table_name}: {result[0]} (release){eprm_seq_nbr}")
                        logging.info(f"Count for {eprm_table_name}: {result[0]} (release)")
                    except mysql.connector.Error as err:
                        print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
                        print(f"Error occurred while executing the query:{query}: {err}")
                        logging.info(f"Error occurred while executing the query:{query}: {err}")

                if eprm_table_type == 'AUDIT':
                    query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE (" + eprm_table_col_pk + f") IN (SELECT entity_ref_nbr FROM {schema_tar}.release_entity_inst_map WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "')"
                    try:
                        logging.info(f"processing {eprm_table_type}")
                        cursor_tar.execute(query)
                        result = cursor_tar.fetchone()
                        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                        query_info += f"-- #Query: Result:{result[0]}\n"
                        query_info += query + ";\n"
                        query_count += 1
                        write_sql(query_info)
                        print(f"Count for {eprm_table_name}: {result[0]}" + " inprogress-audit")
                        logging.info(f"Count for {eprm_table_name}: {result[0]}" + " inprogress-audit")
                    except mysql.connector.Error as err:
                        print(f"Error occurred while executing the query:{query}: {err}")
                        print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
                        logging.info(f"Error occurred while executing the query:{query}: {err}")


        else:
            print("Release Type is not defined")
            logging.info("Release Type is not defined")
        # PC_EXT
        replicationTarget_EXT = replicationTarget + '_EXT'
        connection_ext, cursor_ext, schema_ext = connect_to_database(json_data, replicationTarget_EXT)
        cursor_ext = connection_ext.cursor()

        filtered_df = df[df['eprm_catalog'].isin(['PC_EXT']) & (df['eprm_enabled_flg'].isin(['Y']))]
        if len(filtered_df) > 0:
            for _, row in filtered_df.iterrows():
                eprm_table_name = row['eprm_table_name']
                if eprm_table_name != 'PKG_PRD_FED_EXT_ATTRS':
                    query = f"SELECT COUNT(*) FROM {schema_ext}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'"
                    try:
                        result = pd_read_sql(query, con=connection_ext)
                        count = result.iloc[0, 0]
                        query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                        query_info += f"-- #Query: Result:{count}\n"
                        query_count += 1
                        query_info += query + ";\n"
                        write_sql(query_info)
                        print(f"Count for {eprm_table_name}: {count} PC_EXT")
                        logging.info(f"Count for {eprm_table_name}: {count}")
                        if query_count > 0:
                            try:
                                query_del = f"SELECT COUNT(*) FROM {schema_ext}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'"
                                cursor_ext.execute(query_del)
                                result = cursor_ext.fetchone()
                                query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n"
                                query_info += f"-- #Query: Result:{result[0]}\n"
                                query_count += 1
                                query_info += query + ";\n"
                                write_sql(query_info)
                                print(f"Count for {eprm_table_name}: {result[0]}" + "PC_EXT for deletion")
                            except mysql.connector.Error as err:
                                print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
                                print(f"Error occurred while executing the query:{query}: {err}")
                                logging.info(f"Error occurred while executing the query:{query}: {err}")
                    except mysql.connector.Error as err:
                        print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
                        print(f"Error occurred while executing the query :{query}: {err}")
                        logging.info(f"Error occurred while executing the query:{query}: {err}")
        else:
            STATUS = "Error"
            STATUS_MESSAGE = "No records present in PC_EXT"
        logging.info("COMPLETED")

        query_update = f"UPDATE {schema_tar}.ppm_replication_status SET status='" + STATUS + "'" \
                       + ", status_message='" + STATUS_MESSAGE + "'" \
                       + ", error_description=NULL" \
                       + ", updated_by='" + replicationJobId + "' /**/" \
                       + " WHERE replication_job_id='" + replicationJobId + "' AND release_id='" + str(releaseId) + "'"
        query_ppm_update = f"UPDATE {schema_tar}.ppm_release_master SET replication_status='" + STATUS + "'" \
                           + ", updated_by='" + replicationJobId + "' /**/" \
                           + " WHERE release_id='" + str(releaseId) + "'"
        db_type = json_data.get(replicationTarget, {}).get('DB_TYPE')
        if db_type == 'ORACLE':
            query_update = query_update.replace('/**/', ', updated_date=SYSDATE')
            query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=SYSDATE')
        elif db_type in ('MARIA', 'MYSQL'):
            query_update = query_update.replace('/**/', ', updated_date=NOW()')
            query_ppm_update = query_ppm_update.replace('/**/', ', updated_on=NOW()')
        print(query_update)
        try:
            cursor_tar.execute(query_update)
            cursor_tar.execute(query_ppm_update)
            connection_tar.commit()
        except mysql.connector.Error as err:
            print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
            # print(f"Error occurred while executing the query :{query}: {err}")



    except Exception as e:
        print("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
        print("An Error occured while constructing dataframe:", str(e))



except mysql.connector.Error as err:
    print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
    print(f"An error occurred: {err}")
    logging.info(f"An error occurred: {err}")

finally:
    if cursor_tar:
        cursor_tar.close()
    if connection_tar:
        connection_tar.close()
    if cursor_ppm:
        cursor_ppm.close()
    if conn_ppm:
        conn_ppm.close()
    if json_file:
        json_file.close()
    if sql_log_file:
        sql_log_file.close()

can you please add oracle connections the code should work for both mysql and oracle connections and take care of variables also because if you add any new variables in making connection with oracle the rest of the code will be disturbed you should modify code in else block of connect_to_database function