import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os
# define Python user-defined exceptions
class Error(Exception):
"""Base class for other exceptions"""
pass
# define Python user-defined exceptions
class ETL_PPM_REPLICATION_MASTER_ERROR(Error):
pass
class DB_CONNECTION_ERROR(Error):
pass
def setDbConnection(logging, json_data, serverInfo, encoding):
from sqlalchemy import create_engine as sqlalchemy_create_engine
from base64 import b64decode as base_b64decode
import mysql.connector
try:
cnx = cursor = schema = db_type = None
encrypt = json_data.get(serverInfo, {}).get('ENCRYPT')
host = json_data.get(serverInfo, {}).get('DB_HOST')
port = json_data.get(serverInfo, {}).get('DB_PORT')
user = json_data.get(serverInfo, {}).get('DB_USER')
db_type = json_data.get(serverInfo, {}).get('DB_TYPE')
schema = json_data.get(serverInfo, {}).get('DB_SCHEMA')
if encrypt == 'Y':
password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8')
else:
password = json_data.get(serverInfo, {}).get('DB_PASSWORD')
if db_type in ('MYSQL', 'MARIA'):
connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
, password
, host
, port
, schema))
elif db_type == 'ORACLE':
connection_text = ('oracle://%s:%s@%s:%s/%s' % (user
, password
, host
, port
, json_data.get(serverInfo, {}).get('DB_SID')
))
cnx = sqlalchemy_create_engine(connection_text
, encoding=encoding
# ,fast_executemany=True
, connect_args={'connect_timeout': 600})
cursor = cnx.connect()
logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}")
# except mysql.connector.Error as dberr:
# logging.error("DATABASE CONNECTION ERROR")
# logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno)))
# cnx = cursor = schema = None
except Exception as dbexp:
logging.error("DATABASE CONNECTION EXCEPTION")
logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno)))
cnx = cursor = schema = None
return cnx, cursor, schema, db_type
def main(args, json_data, log_file, logging, encoding):
try:
releaseId = args.releaseId
opId = args.opId
buId = args.buId
replicationTarget = args.replicationTarget
replicationJobId = args.replicationJobId
return_flag = True
STATUS = "InProgress"
STATUS_MESSAGE = "Insertion of federation tables successful."
replicationTarget_EXT = replicationTarget + '_EXT'
failed_entities = []
# Connect to PPM_PC database
connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)
# Connect to source database
connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE',
encoding)
# Connect to source_ext database
connection_source_ext, cursor_source_ext, schema_source_ext, db_type_source_ext = setDbConnection(logging,
json_data,
'SOURCE_EXT',
encoding)
# Connect to target_ext database
connection_ext, cursor_ext, schema_ext, db_type_ext = setDbConnection(logging, json_data, replicationTarget_EXT,
encoding)
if not (connection_ppm and connection_source and connection_source_ext and connection_ext):
raise DB_CONNECTION_ERROR
# Fetch data from the etl_ppm_replication_master table
primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
df = pd_read_sql(primary_query, con=connection_ppm)
target_query=f"SELECT * FROM {schema_ext}.etl_ppm_replication_master WHERE eprm_catalog='PC_EXT' AND eprm_enabled_flg='Y'"
target_df=pd_read_sql(target_query, con=connection_ext)
in the above i have two quaries primary_query and target_query and also i have two dataframes df and target_df..now i want to compare both the dataframes both the dataframes will have same number of columns and records
eprm_seq_nbr,eprm_catalog,eprm_entity_type,eprm_table_name,eprm_table_col_pk,eprm_table_col_pk_seq,eprm_table_type,eprm_table_alias,eprm_parent_table_name,eprm_join_cols_reim,eprm_join_cols_entity,eprm_join_with_reim,eprm_enabled_flg,eprm_reim_entity_id,eprm_reim_entity_ref_id,eprm_reim_entity_ref_id_1,eprm_reim_entity_ref_id_2,eprm_reim_entity_desc,eprm_reim_version,eprm_remarks,eprm_created_by,eprm_created_on,eprm_updated_by,eprm_updated_on,EPRM_SEQ_NBR,EPRM_CATALOG,EPRM_ENTITY_TYPE,EPRM_TABLE_NAME,EPRM_TABLE_COL_PK,EPRM_TABLE_COL_PK_SEQ,EPRM_TABLE_TYPE,EPRM_TABLE_ALIAS,EPRM_PARENT_TABLE_NAME,EPRM_JOIN_COLS_REIM,EPRM_JOIN_COLS_ENTITY,EPRM_JOIN_WITH_REIM,EPRM_ENABLED_FLG,EPRM_REIM_ENTITY_ID,EPRM_REIM_ENTITY_REF_ID,EPRM_REIM_ENTITY_REF_ID_1,EPRM_REIM_ENTITY_REF_ID_2,EPRM_REIM_ENTITY_DESC,EPRM_REIM_VERSION,EPRM_REMARKS,EPRM_CREATED_BY,EPRM_CREATED_ON,EPRM_UPDATED_BY,EPRM_UPDATED_ON
if any column data is changed or updated(basically you should compare the values 0f columns) you should take that row and write it to csv and also while comparing both the dataframes should compare using primarykey 'eprm_table_col_pk' for example in the source we have primary key as container and in the target also we should comapare the row which has same primary key container like that i want to comapre all the data in source with target