Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
5.7 kB
1
Indexable
Never
import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os


# define Python user-defined exceptions
class Error(Exception):
    """Base class for other exceptions"""
    pass


# define Python user-defined exceptions
class ETL_PPM_REPLICATION_MASTER_ERROR(Error):
    pass


class DB_CONNECTION_ERROR(Error):
    pass


def setDbConnection(logging, json_data, serverInfo, encoding):
    from sqlalchemy import create_engine as sqlalchemy_create_engine
    from base64 import b64decode as base_b64decode
    import mysql.connector
    try:
        cnx = cursor = schema = db_type = None
        encrypt = json_data.get(serverInfo, {}).get('ENCRYPT')
        host = json_data.get(serverInfo, {}).get('DB_HOST')
        port = json_data.get(serverInfo, {}).get('DB_PORT')
        user = json_data.get(serverInfo, {}).get('DB_USER')
        db_type = json_data.get(serverInfo, {}).get('DB_TYPE')
        schema = json_data.get(serverInfo, {}).get('DB_SCHEMA')
        if encrypt == 'Y':
            password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8')
        else:
            password = json_data.get(serverInfo, {}).get('DB_PASSWORD')

        if db_type in ('MYSQL', 'MARIA'):
            connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
                                                                          , password
                                                                          , host
                                                                          , port
                                                                          , schema))
        elif db_type == 'ORACLE':
            connection_text = ('oracle://%s:%s@%s:%s/%s' % (user
                                                            , password
                                                            , host
                                                            , port
                                                            , json_data.get(serverInfo, {}).get('DB_SID')
                                                            ))
        cnx = sqlalchemy_create_engine(connection_text
                                       , encoding=encoding
                                       # ,fast_executemany=True
                                       , connect_args={'connect_timeout': 600})
        cursor = cnx.connect()
        logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}")
    # except mysql.connector.Error as dberr:
    #    logging.error("DATABASE CONNECTION ERROR")
    #    logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno)))
    #    cnx = cursor = schema = None
    except Exception as dbexp:
        logging.error("DATABASE CONNECTION EXCEPTION")
        logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno)))
        cnx = cursor = schema = None

    return cnx, cursor, schema, db_type


def main(args, json_data, log_file, logging, encoding):
    try:
        releaseId = args.releaseId
        opId = args.opId
        buId = args.buId
        replicationTarget = args.replicationTarget
        replicationJobId = args.replicationJobId
        return_flag = True
        STATUS = "InProgress"
        STATUS_MESSAGE = "Insertion of federation tables successful."
        replicationTarget_EXT = replicationTarget + '_EXT'
        failed_entities = []

        # Connect to PPM_PC database
        connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)

        # Connect to source database
        connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE',
                                                                                          encoding)

        # Connect to target_ext database
        connection_ext, cursor_ext, schema_ext, db_type_ext = setDbConnection(logging, json_data, replicationTarget_EXT,
                                                                              encoding)

        if not (connection_ppm and connection_source  and connection_ext):
            raise DB_CONNECTION_ERROR

        # Fetch data from the etl_ppm_replication_master table
        primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='SC' AND eprm_enabled_flg='Y'"
        df = pd_read_sql(primary_query, con=connection_ppm)
        logging.info("Count etl_ppm_replication_master: %s" % len(df))
        if len(df) == 0:
            raise ETL_PPM_REPLICATION_MASTER_ERROR
        
        #Fetch data from the target
        target_query = f"SELECT * FROM {schema_ext}.etl_ppm_replication_master WHERE eprm_catalog='SC' AND eprm_enabled_flg='Y'"
        target_df = pd_read_sql(target_query, con=connection_ext)

i want to add in the above code like in above code we have two dataframes one is df and target_df and we are holiding data there and i want to compare both the records from the dataframe and i want to create another dataframe and hold those differenced records in that dataframe and update them in the target_ext database..for example take i have 10 columns in both source and target databases and if any one record or column has undergone any change then we have to update them..if you have any doubts please ask me