Untitled
unknown
plain_text
2 years ago
20 kB
5
Indexable
# ===================== ppm_sc_changesonly dna.py --operation ChangesOnly --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId SC_JOB_999
# ===========================================================================================================================================================
# PPM PRODUCT CATALOG (PC) REPLICATION - CE CUSTOM TABLES REPLICATE
# DATE AUTHOR VER CHANGE DESCRIPTION
# -------- --------- ----- ------------------
# 21.08.23 Veerendra 1.0 The below script replicates ppm table records dynamically from "etl_ppm_replication_master" "PC_EXT"
#
#
# ================================================================================================================================================================================
import pandas as pd
import json
from base64 import b64decode as base_b64decode
import logging
from pandas import read_sql as pd_read_sql
import sys
from sqlalchemy import create_engine
import argparse
from io import TextIOBase as io_TextIOBase
from json import load as json_load
import os
import datetime
# define Python user-defined exceptions
class Error(Exception):
"""Base class for other exceptions"""
pass
# define Python user-defined exceptions
class ETL_PPM_REPLICATION_MASTER_ERROR(Error):
pass
class DB_CONNECTION_ERROR(Error):
pass
def is_datetime(value):
try:
# Attempt to parse the value as a datetime
datetime.datetime.strptime(str(value), '%Y-%m-%d %H:%M:%S')
return True
except ValueError:
return False
def setDbConnection(logging, json_data, serverInfo, encoding):
from sqlalchemy import create_engine as sqlalchemy_create_engine
from base64 import b64decode as base_b64decode
import mysql.connector
try:
cnx = cursor = schema = db_type = None
encrypt = json_data.get(serverInfo, {}).get('ENCRYPT')
host = json_data.get(serverInfo, {}).get('DB_HOST')
port = json_data.get(serverInfo, {}).get('DB_PORT')
user = json_data.get(serverInfo, {}).get('DB_USER')
db_type = json_data.get(serverInfo, {}).get('DB_TYPE')
schema = json_data.get(serverInfo, {}).get('DB_SCHEMA')
if encrypt == 'Y':
password = base_b64decode(json_data.get(serverInfo, {}).get('DB_PASSWORD')).decode('utf-8')
else:
password = json_data.get(serverInfo, {}).get('DB_PASSWORD')
if db_type in ('MYSQL', 'MARIA'):
connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
, password
, host
, port
, schema))
elif db_type == 'ORACLE':
connection_text = ('oracle://%s:%s@%s:%s/%s' % (user
, password
, host
, port
, json_data.get(serverInfo, {}).get('DB_SID')
))
cnx = sqlalchemy_create_engine(connection_text
, encoding=encoding)
# ,fast_executemany=True
# , connect_args={'connect_timeout': 600})
cursor = cnx.connect()
logging.info(f"Connected to database server {serverInfo}: {host}:{port}/{schema}")
# except mysql.connector.Error as dberr:
# logging.error("DATABASE CONNECTION ERROR")
# logging.error("Error - {} . Line No - {} ".format(str(dberr), str(sys.exc_info()[-1].tb_lineno)))
# cnx = cursor = schema = None
except Exception as dbexp:
logging.error("DATABASE CONNECTION EXCEPTION")
logging.error("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno)))
cnx = cursor = schema = None
return cnx, cursor, schema, db_type
# def main(args, json_data, log_file, logging, encoding):
def process_changes_only(pdb_ppm_sc, pdb_source, pdb_target, ps_rep_env, ps_op_id, ps_bu_id, ps_rep_job_id, pl_logging):
try:
replicationJobId = args.replicationJobId
return_flag = True
STATUS = "InProgress"
STATUS_MESSAGE = "Updating service catalog changes successful."
failed_entities = []
opId=ps_op_id
buId=ps_bu_id
insert_sucess_count = 0
insert_failed_count = 0
update_sucess_count = 0
update_failed_count = 0
primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master WHERE eprm_catalog='SC' AND eprm_enabled_flg='Y'"
primary_df = pd_read_sql(primary_query, con=pdb_ppm_sc)
for index, row in primary_df.iterrows():
# Fetch primary key column name and table name
table_name = row['eprm_table_name'].lower()
eprm_table_col_pk = row['eprm_table_col_pk']
pk = eprm_table_col_pk.lower()
source_query = f"SELECT * FROM {schema_source}.{table_name} where OP_ID='{opId}' AND BU_ID='{buId}'"
source_df = pd.read_sql(source_query, connection_source)
target_query = f"SELECT * FROM {schema_ext}.{table_name} WHERE OP_ID='{opId}' AND BU_ID='{buId}'"
target_df = pd.read_sql(target_query, connection_target)
for index, source_row in source_df.iterrows():
pk_value = source_row[pk]
# Check if the primary key exists in the target DataFrame
if pk_value not in target_df[pk].values:
# Replace 'None' and 'NaT' with None in source_row
for column_name, source_val in source_row.items():
if source_val == 'None' or source_val == 'NaT' or source_val == 'nan':
source_row[column_name] = NULL
# Generate an INSERT query dynamically
insert_query = f"INSERT INTO {schema_ext}.{table_name} ("
insert_columns = []
insert_values = []
for column_name, source_val in source_row.items():
if source_val is not None:
if isinstance(source_val, str) and source_val.startswith('TO_DATE'):
# If it already starts with TO_DATE, don't add TO_DATE again
insert_values.append(source_val)
elif is_datetime(source_val):
# Format datetime values using the appropriate function for the database type
if db_type_ext == 'ORACLE':
insert_values.append(f"TO_DATE('{source_val}', 'YYYY-MM-DD HH24:MI:SS')")
elif db_type_ext in ('MYSQL', 'MARIA'):
# For MariaDB, use STR_TO_DATE
insert_values.append(f"STR_TO_DATE('{source_val}', '%Y-%m-%d %H:%i:%s')")
else:
# Enclose other values in single quotes
insert_values.append(f"'{source_val}'")
insert_columns.append(column_name) # Add the column name
elif str(source_val) == 'NaT':
# Replace 'NaT' with NULL without single quotes
insert_values.append('NULL')
insert_columns.append(column_name) # Add the column name
elif column_name == 'extended_rule_code':
parts = source_val.split('==')
if len(parts) == 2:
extended_rule_code = f"'{parts[0]}=='{parts[1]}''"
insert_values.append(extended_rule_code)
insert_columns.append(column_name)
elif str(source_val) == 'nan':
insert_values.append('NULL')
insert_columns.append(column_name)
else:
# Enclose other values in single quotes
insert_values.append(f"'{source_val}'")
insert_columns.append(column_name) # Add the column name
else:
insert_values.append('NULL') # Insert a true NULL
insert_columns.append(column_name) # Add the column name
# Construct the INSERT query with column names
insert_query = f"INSERT INTO {schema_ext}.{table_name} ({', '.join(insert_columns)}) VALUES ({', '.join(insert_values)})"
# Execute the INSERT query
try:
print(insert_query)
cursor_target.execute(insert_query)
insert_sucess_count += 1
except Exception as e:
pl_logging.error(
"Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
insert_failed_count += 1
else:
# Fetch the corresponding row from the target DataFrame based on the primary key
target_row = target_df[target_df[pk] == pk_value].iloc[0]
if not source_row.equals(target_row):
columns_to_update = []
for column_name, source_val in source_row.items():
target_val = target_row[column_name]
if source_val != target_val:
if is_datetime(source_val):
# Format datetime values using the appropriate function for the database type
if db_type_ext == 'ORACLE':
update_value = f"TO_DATE('{source_val}', 'YYYY-MM-DD HH24:MI:SS')"
elif db_type_ext in ('MYSQL', 'MARIA'):
# For MariaDB and MySQL, use STR_TO_DATE
update_value = f"STR_TO_DATE('{source_val}', '%Y-%m-%d %H:%i:%s')"
else:
# Enclose other values in single quotes
update_value = f"'{source_val}'"
elif str(source_val) == 'NaT':
# Replace 'NaT' with NULL without single quotes
update_value = 'NULL'
elif str(source_val) == 'nan':
# Replace 'NaT' with NULL without single quotes
update_value = 'NULL'
elif str(source_val) == 'None':
update_value = 'NULL'
elif column_name == 'extended_rule_code':
parts = source_val.split('==')
if len(parts) == 2:
extended_rule_code = f"'{parts[0]}=='{parts[1]}'"
update_value = extended_rule_code
else:
if column_name == 'created_by':
update_value = f"'{source_val}'"
else:
# Handle non-datetime columns (e.g., strings, numbers) here
update_value = f"'{source_val}'"
# Add the column name and formatted value to the update statement
columns_to_update.append(f"{column_name} = {update_value}")
# Generate an update query dynamically
if columns_to_update:
update_query = f"UPDATE {schema_ext}.{table_name} SET "
update_query += ", ".join(columns_to_update)
update_query += f" WHERE {eprm_table_col_pk} = '{pk_value}' AND OP_ID='{opId}' AND BU_ID='{buId}'"
try:
print(update_query)
cursor_target.execute(update_query)
update_sucess_count += 1
except Exception as e:
pl_logging.error(
"Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
update_failed_count += 1
print(f"Successful inserts: {insert_sucess_count}")
print(f"Failed inserts: {insert_failed_count}")
print(f"Successful updates: {update_sucess_count}")
print(f"Failed updates: {update_failed_count}")
pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ BACK UPDATE STATUS FOR UI ++++++++++++++++++ \n")
if len(failed_entities) > 0:
return_flag = False
STATUS = "Error"
STATUS_MESSAGE = str(failed_entities).replace("'", '').replace('"', '')
pl_logging.info("STATUS: %s" % STATUS)
pl_logging.info("STATUS_MESSAGE: %s" % STATUS_MESSAGE)
query_update = f"UPDATE {schema_source}.ppm_replication_status SET status='" + STATUS + "'" \
+ ", status_message='" + STATUS_MESSAGE + "'" \
+ ", error_description=NULL" \
+ ", updated_by='" + replicationJobId + "' /**/" \
if db_type_source == 'ORACLE':
query_update = query_update.replace('/**/', ', updated_date=SYSDATE')
elif db_type_source in ('MARIA', 'MYSQL'):
query_update = query_update.replace('/**/', ', updated_date=NOW()')
pl_logging.info("-- + ppm_replication_status - UPDATE \n")
pl_logging.info(query_update + ";\n")
pl_logging.info(f"-- ++++++++++++++++++++++++++++++++++ FIN ++++++++++++++++++++++++++++++++++++++++ \n")
res = cursor_source.execute(query_update)
pl_logging.info("query_update: %s" % res)
except ETL_PPM_REPLICATION_MASTER_ERROR:
STATUS_MESSAGE = "NO RECORDS PRESENT IN etl_ppm_replication_master TABLE"
pl_logging.error("EXCEPTION:" + STATUS_MESSAGE)
return_flag = False
except Exception as e:
pl_logging.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
return_flag = False
return return_flag
if __name__ == '__main__':
import logging
from configparser import ConfigParser as conf_ConfigParser
try:
# python3 ppm_sc_changesonly.py --operation ChangesOnly --replicationTarget SIT --opId HOB --buId DEFAULT --replicationJobId SC_JOB_999
parser = argparse.ArgumentParser(description="PPM Service Catalog Replication Script")
parser.add_argument('--operation', required=True, help="Operation Type")
parser.add_argument('--replicationTarget', required=True, help="Replication Target")
parser.add_argument('--opId', required=True, help="Operation ID")
parser.add_argument('--buId', required=True, help="Business Unit ID")
parser.add_argument('--replicationJobId', required=True, help="Replication Job ID")
args = parser.parse_args()
replicationJobId = args.replicationJobId
json_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/dna.json"
conf_file_path = "/app/scripts/PPM_Release_Management/Service_Catalog_ETL/config/ppm_sc_replication.conf"
log_file = f'/app/scripts/PPM_Release_Management/Service_Catalog_ETL/logs/{replicationJobId}_ppm_sc_changesonly.log'
args = parser.parse_args()
# Set up logging
CONFIG = conf_ConfigParser()
CONFIG.read(conf_file_path)
logging.basicConfig(filename=log_file
, level=CONFIG.get('CONFIG_LOGGING', 'LOG_LEVEL', raw=True)
, format=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DISP', raw=True)
, datefmt=CONFIG.get('CONFIG_LOG_FORMAT', 'LOG_FORMAT_DATE', raw=True)
)
logging.info('LOGGER initiated')
encoding = CONFIG.get('CONFIG_GENERIC', 'DB_CHARSET', raw=True)
# Read JSON data from file
if not os.path.exists(json_file_path):
logging.error("CREDENTIAL FILE MISSING")
logging.error("CREDENTIAL FILE: %s" % json_file_path)
raise FileNotFoundError("CREDENTIAL FILE MISSING")
with open(json_file_path) as json_file:
json_data = json_load(json_file)
# Connect to PPM_PC database
connection_ppm, cursor_ppm, schema_ppm, db_type_ppm = setDbConnection(logging, json_data, 'PPM_PC', encoding)
# Connect to source database
connection_source, cursor_source, schema_source, db_type_source = setDbConnection(logging, json_data, 'SOURCE',
encoding)
# Connect to target_ext database
connection_target, cursor_target, schema_ext, db_type_ext = setDbConnection(logging, json_data,
args.replicationTarget,
encoding)
if not (connection_ppm and connection_source and connection_target):
raise DB_CONNECTION_ERROR
# if main(args, json_data, log_file, logging, encoding):
if process_changes_only(pdb_ppm_sc=connection_ppm
, pdb_source=connection_source
, pdb_target=connection_target
, ps_rep_env=args.replicationTarget
, ps_op_id=args.opId
, ps_bu_id=args.buId
, ps_rep_job_id=args.replicationJobId
, pl_logging=logging):
print("Update successful")
else:
print("Update FAILED")
except DB_CONNECTION_ERROR:
logging.error("EXCEPTION: DB CONNECTION ERROR")
print("EXCEPTION: DB CONNECTION ERROR")
except FileNotFoundError as ferr:
print("Error - {} . Line No - {} ".format(str(ferr), str(sys.exc_info()[-1].tb_lineno)))
except Exception as err:
print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
in the above while reading source and target dataframes i dont have some tables and for that iam getting error as below
13:54:51|ERROR|process_changes_only|282|Error - (cx_Oracle.DatabaseError) ORA-00942: table or view does not exist
[SQL: SELECT * FROM tibtcare_ppm_st.sc_ppm_item_marketing_info WHERE OP_ID='HOB' AND BU_ID='DEFAULT']
(Background on this error at: https://sqlalche.me/e/14/4xp6) . Line No - 123
so i want you to hadle that if table doesnt exits then it should log that tables doesnt have in target database proceeding for next table like that...code should not stopEditor is loading...