Untitled
plain_text
2 months ago
29 kB
1
Indexable
Never
########################################################################################################################### """ AUTHOR - M.Veerendra Kumar EMP-ID - 2009927 SCRIPT - PPM delete script DESCRIPTION - The below script deletes ppm table records dynamically upon passing the values """ ############################################################################################################################ import pandas as pd import mysql.connector import json from base64 import b64decode as base_b64decode import logging import datetime import os from sqlalchemy import create_engine as sqlalchemy_create_engine from pandas import read_sql as pd_read_sql import sys releaseId = '275.2' releaseType = 'TESTING' replicationTarget = 'SOURCE' catalogId = '' opId = 'HOB' buId = 'DEFAULT' replicationJobId = 'REP_990_234' json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_reply.json" sql_log_file = f"/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}ppm_reply.sql" log_file = f'/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/{replicationJobId}ppm_reply.log' # sql_log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.sql" # json_file_path = os.get_path("PPM_PC_CONFIG") + "/ppm_pc_replication.json" # log_file = os.get_path("PPM_PC_LOG") + "/" + replicationJobId + "_ppm_pc_replication_delete.log" release_type=releaseType.casefold() # Set up logging logging.basicConfig( filename=log_file, level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) query_count = 0 # Open SQL log file for writing sql_log_file = open(sql_log_file, "w") # Function to write SQL query to the log file def write_sql(query_info): sql_log_file.write(query_info) sql_log_file.write('\n') try: # Function to establish a database connection def connect_to_database(json_data, replicationTarget): try: encrypt = json_data.get(replicationTarget, {}).get('ENCRYPT') host = json_data.get(replicationTarget, {}).get('DB_HOST') port = json_data.get(replicationTarget, {}).get('DB_PORT') user = json_data.get(replicationTarget, {}).get('DB_USER') db_type = json_data.get(replicationTarget, {}).get('DB_TYPE') schema = json_data.get(replicationTarget, {}).get('DB_SCHEMA') if encrypt == 'Y': password = b64decode(json_data.get(replicationTarget, {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get(replicationTarget, {}).get('DB_PASSWORD') if db_type == 'MYSQL': cnx = mysql.connector.connect(user=user, password=password, host=host, port=port) cursor = cnx.cursor() logging.info(f"Connected to MySQL database server {replicationTarget}: {host}:{port}") elif db_type == 'ORACLE': import oracledb oracle_mode = oracledb.is_thin_mode() print("Oracle mode: %s" % oracle_mode) if oracle_mode: oracledb.init_oracle_client() print("Enabled python-oracledb Thick mode") else: print("Default python-oracledb Thick mode") cnx_text = ('oracle://%s:%s@%s:%s/?service_name=%s' % (user, password, host, port, schema)) cnx = create_engine(cnx_text, encoding="utf8").raw_connection() cursor = cnx.cursor() return cnx, cursor, schema except Exception as dbexp: print("Error - {} . Line No - {} ".format(str(dbexp), str(sys.exc_info()[-1].tb_lineno))) try: # Read JSON data from file with open(json_file_path) as json_file: json_data = json.load(json_file) except FileNotFoundError: print("File not found: " + json_file_path) try: # Connect to PPM_PC database conn_ppm, cursor_ppm, schema_ppm = connect_to_database(json_data, 'PPM_PC') # Fetch data from the etl_ppm_replication_master table primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master" df = pd_read_sql(primary_query, con=conn_ppm) columns = df.columns.tolist() rows = df.values.tolist() # replicationTarget connection_tar, cursor_tar, schema_tar = connect_to_database(json_data, replicationTarget) # cursor_tar = connection_tar.cursor() except Exception as e: print("An Error occured:", str(e)) replaced_string = "" try: # Deleting records if releaseType is deployed or cancelled for MASTER-CHILD,AUDIT-CHILD,MASTER,AUDIT,RELEASE if release_type == 'deployed' or release_type == 'cancelled': logging.info(f"processing - {releaseType}") order = ['MASTER-CHILD', 'AUDIT-CHILD', 'MASTER', 'AUDIT', 'RELEASE'] logging.info("order of execution - 'MASTER-CHILD', 'AUDIT-CHILD', 'MASTER', 'AUDIT', 'RELEASE'") filtered_df = df.loc[ df['eprm_catalog'].isin(['PC', 'RELEASE']) & (df['eprm_enabled_flg'].isin(['Y'])) & df[ 'eprm_table_type'].isin( order)].copy() #filtered_df.sort_values(by='eprm_seq_nbr', ascending=True, inplace=True) filtered_df['eprm_table_type'] = pd.Categorical(filtered_df['eprm_table_type'], categories=order, ordered=True) filtered_df.sort_values(by=['eprm_table_type', 'eprm_seq_nbr'], ascending=[True, True], inplace=True) for _, row in filtered_df.iterrows(): eprm_table_name = row['eprm_table_name'] eprm_join_cols_entity = row['eprm_join_cols_entity'] eprm_join_cols_reim = row['eprm_join_cols_reim'] eprm_table_alias = row['eprm_table_alias'] eprm_table_type = row['eprm_table_type'] eprm_parent_table_name = row['eprm_parent_table_name'] eprm_seq_nbr = row['eprm_seq_nbr'] #df = df.sort_values('eprm_seq_nbr', ascending=True) if eprm_table_type == 'AUDIT': eprm_table_col_pk = row['eprm_table_col_pk'] query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE (" + eprm_table_col_pk + f") IN (SELECT entity_ref_nbr FROM {schema_tar}.release_entity_inst_map WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "')" try: logging.info(f"processing {eprm_table_type}") cursor_tar.execute(query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += query + ";\n" write_sql(query_info) print(f"Count for {eprm_table_name}: {result[0]} (audit){eprm_seq_nbr}") logging.info(f"Count for {eprm_table_name}: {result[0]} (audit)") except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) print(f"Error occurred while executing the query:{query}: {err}") logging.info(f"Error occurred while executing the query:{query}: {err}") if eprm_table_type == 'RELEASE': eprm_table_name = row['eprm_table_name'] eprm_seq_nbr = row['eprm_seq_nbr'] query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'" try: logging.info(f"processing {eprm_table_type}") cursor_tar.execute(query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += query + ";\n" write_sql(query_info) print(f"Count for {eprm_table_name}: {result[0]} (release){eprm_seq_nbr}") logging.info(f"Count for {eprm_table_name}: {result[0]} (release)") except mysql.connector.Error as err: print(f"Error occurred while executing the query:{query}: {err}") logging.info(f"Error occurred while executing the query:{query}: {err}") elif eprm_table_type == 'MASTER': clause_removed_reim_join = eprm_join_cols_reim.replace(" AND", "").replace("=", "") remove_string = [eprm_table_alias + "." + v for v in eprm_join_cols_entity.split(",")] if eprm_table_alias + ".version" not in remove_string and eprm_table_alias + ".version" in eprm_join_cols_reim: remove_string = remove_string + [eprm_table_alias + ".version"] eprm_join_cols_entity = eprm_join_cols_entity + ", version" reim_select_cols = clause_removed_reim_join for v in remove_string: reim_select_cols = reim_select_cols.replace(v, "").replace("reim.", "") reim_select_cols = reim_select_cols.replace(" ", ",") if eprm_table_alias + ".version" in remove_string and "reim.version" not in eprm_join_cols_reim: reim_select_cols = reim_select_cols + ",version" split_parts = reim_select_cols.split(',') replaced_parts = [] for part in split_parts: dot_index = part.find('.') if dot_index != -1: replaced_part = part[dot_index + 1:] replaced_parts.append(replaced_part) else: replaced_parts.append(part) replaced_string = ','.join(replaced_parts) secondary_query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE (" + eprm_join_cols_entity + ") IN (SELECT " + replaced_string + f" FROM {schema_tar}.release_entity_inst_map WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "')" try: logging.info(f"processing {eprm_table_type}") cursor_tar.execute(secondary_query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_info += secondary_query + ";\n" query_count += 1 write_sql(query_info) logging.info(f"Count for {eprm_table_name}: {result[0]} (master)") print(f"Count for {eprm_table_name}: {result[0]} (master) {eprm_seq_nbr}") except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) print(f"Error occurred while executing the query:{secondary_query}: {err}") logging.info(f"Error occurred while executing the query:{secondary_query}: {err}") elif eprm_table_type == 'MASTER-CHILD': qry = f"select * from {schema_tar}.etl_ppm_replication_master WHERE eprm_table_name=" + "'" + eprm_parent_table_name + "'" cursor_tar.execute(qry) rows = cursor_tar.fetchall() columns = [desc[0] for desc in cursor_tar.description] df_child = pd.DataFrame(rows, columns=columns) filtered_df_child = df_child[ df_child['eprm_catalog'].isin(['PC']) & (df_child['eprm_enabled_flg'] == 'Y') & df_child[ 'eprm_table_type'].isin( ['MASTER'])] for _, row in filtered_df_child.iterrows(): eprm_table_name_child = row['eprm_table_name'] eprm_join_cols_entity_child = row['eprm_join_cols_entity'] eprm_join_cols_reim_child = row['eprm_join_cols_reim'] eprm_table_type_child = row['eprm_table_type'] if eprm_table_type_child == 'MASTER': split_conditions_reim = eprm_join_cols_reim.split('AND') where_clause_reim = [] for condition in split_conditions_reim: condition = condition.split('=')[1] condition = condition.split('.')[1] where_clause_reim.append(condition.strip()) where_clause_reim = ','.join(where_clause_reim) values_child = eprm_join_cols_entity_child.split('=') column_names_child = [value.split('.')[-1].strip() for value in values_child] where_clause_child = ', '.join(column_names_child) split_conditions = eprm_join_cols_reim_child.split(' AND ') result = [condition.split('=')[0].split('.')[-1] for condition in split_conditions] where_clause = ', '.join(result) final_query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE ({where_clause_reim}) IN (select {where_clause_reim} from {schema_tar}.{eprm_table_name_child} where ({where_clause_child}) IN (select " + where_clause + f" FROM {schema_tar}.RELEASE_ENTITY_INST_MAP WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'))" try: cursor_tar.execute(final_query) logging.info(f"processing - {eprm_table_type}") result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += final_query + ";\n" write_sql(query_info) logging.info(f"Count for {eprm_table_name}: {result[0]}") print( f"Count for {eprm_table_name}: {result[0]}" + " ---------------------MASTER-AUDIT------------------") except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) print(f"Error occurred while executing the query:{final_query}: {err}") logging.info(f"Error occurred while executing the query:{final_query}: {err}") elif eprm_table_type == 'AUDIT-CHILD': qry = f"select * from {schema_tar}.etl_ppm_replication_master WHERE eprm_table_name=" + "'" + eprm_parent_table_name + "'" cursor_tar.execute(qry) rows = cursor_tar.fetchall() columns = [desc[0] for desc in cursor_tar.description] df_child = pd.DataFrame(rows, columns=columns) filtered_df_child = df_child[ df_child['eprm_catalog'].isin(['PC']) & (df_child['eprm_enabled_flg'] == 'Y') & df_child[ 'eprm_table_type'].isin( ['AUDIT'])] for _, row in filtered_df_child.iterrows(): eprm_table_name_child = row['eprm_table_name'] eprm_join_cols_entity_child = row['eprm_join_cols_entity'] eprm_join_cols_reim_child = row['eprm_join_cols_reim'] eprm_table_type_child = row['eprm_table_type'] if eprm_table_type_child == 'AUDIT': split_conditions_reim = eprm_join_cols_reim.split('AND') where_clause_reim = [] for condition in split_conditions_reim: condition = condition.split('=')[1] condition = condition.split('.')[1] where_clause_reim.append(condition.strip()) where_clause_reim = ','.join(where_clause_reim) values_child = eprm_join_cols_entity_child.split('=') column_names_child = [value.split('.')[-1].strip() for value in values_child] where_clause_child = ', '.join(column_names_child) split_conditions = eprm_join_cols_reim_child.split(' AND ') result = [condition.split('=')[0].split('.')[-1] for condition in split_conditions] where_clause = ', '.join(result) final_query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE ({where_clause_reim}) IN (select {where_clause_reim} from {schema_tar}.{eprm_table_name_child} where ({where_clause_child}) IN (select " + where_clause + f" FROM {schema_tar}.RELEASE_ENTITY_INST_MAP WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'))" try: logging.info(f"processing - {eprm_table_type}") cursor_tar.execute(final_query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += final_query + ";\n" write_sql(query_info) print( f"Count for {eprm_table_name}: {result[0]}" + " ---------------------AUDIT-CHILD------------------") logging.info(f"Count for {eprm_table_name}: {result[0]}") except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) print(f"Error occurred while executing the query:{final_query}: {err}") logging.info(f"Error occurred while executing the query:{final_query}: {err}") elif release_type == 'testing' or release_type == 'inprogress': logging.info(f"{releaseType}") filtered_df = df[df['eprm_catalog'].isin(['PC', 'RELEASE']) & (df['eprm_enabled_flg'].isin(['Y'])) & df[ 'eprm_table_type'].isin( ['AUDIT', 'RELEASE'])] filtered_df = filtered_df.sort_values('eprm_table_type') df = df.sort_values('eprm_seq_nbr', ascending=False) for _, row in filtered_df.iterrows(): eprm_table_name = row['eprm_table_name'] eprm_table_col_pk = row['eprm_table_col_pk'] eprm_table_type = row['eprm_table_type'] eprm_seq_nbr = row['eprm_seq_nbr'] if eprm_table_type == 'RELEASE': eprm_table_name = row['eprm_table_name'] query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'" try: logging.info(f"processing {eprm_table_type}") cursor_tar.execute(query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += query + ";\n" write_sql(query_info) print(f"Count for {eprm_table_name}: {result[0]} (release){eprm_seq_nbr}") logging.info(f"Count for {eprm_table_name}: {result[0]} (release)") except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) print(f"Error occurred while executing the query:{query}: {err}") logging.info(f"Error occurred while executing the query:{query}: {err}") if eprm_table_type == 'AUDIT': query = f"SELECT COUNT(*) FROM {schema_tar}.{eprm_table_name} WHERE (" + eprm_table_col_pk + f") IN (SELECT entity_ref_nbr FROM {schema_tar}.release_entity_inst_map WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "')" try: logging.info(f"processing {eprm_table_type}") cursor_tar.execute(query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_info += query + ";\n" query_count += 1 write_sql(query_info) print(f"Count for {eprm_table_name}: {result[0]}" + " inprogress-audit") logging.info(f"Count for {eprm_table_name}: {result[0]}" + " inprogress-audit") except mysql.connector.Error as err: print(f"Error occurred while executing the query:{query}: {err}") print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) logging.info(f"Error occurred while executing the query:{query}: {err}") else: print("Release Type is not defined") logging.info("Release Type is not defined") # PC_EXT replicationTarget_EXT = replicationTarget + '_EXT' connection_ext, cursor_ext, schema_ext = connect_to_database(json_data, replicationTarget_EXT) cursor_ext = connection_ext.cursor() filtered_df = df[df['eprm_catalog'].isin(['PC_EXT']) & (df['eprm_enabled_flg'].isin(['Y']))] if len(filtered_df) > 0: for _, row in filtered_df.iterrows(): eprm_table_name = row['eprm_table_name'] if eprm_table_name != 'PKG_PRD_FED_EXT_ATTRS': query = f"SELECT COUNT(*) FROM {schema_ext}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'" try: result = pd_read_sql(query, con=connection_ext) count = result.iloc[0, 0] query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{count}\n" query_count += 1 query_info += query + ";\n" write_sql(query_info) print(f"Count for {eprm_table_name}: {count} PC_EXT") logging.info(f"Count for {eprm_table_name}: {count}") if query_count > 0: try: query_del = f"SELECT COUNT(*) FROM {schema_ext}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'" cursor_ext.execute(query_del) result = cursor_ext.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += query + ";\n" write_sql(query_info) print(f"Count for {eprm_table_name}: {result[0]}" + "PC_EXT for deletion") except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) print(f"Error occurred while executing the query:{query}: {err}") logging.info(f"Error occurred while executing the query:{query}: {err}") except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) print(f"Error occurred while executing the query :{query}: {err}") logging.info(f"Error occurred while executing the query:{query}: {err}") logging.info("COMPLETED") except Exception as e: print("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) print("An Error occured while constructing dataframe:", str(e)) except mysql.connector.Error as err: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) print(f"An error occurred: {err}") logging.info(f"An error occurred: {err}") finally: if cursor_tar: cursor_tar.close() if connection_tar: connection_tar.close() if cursor_ppm: cursor_ppm.close() if conn_ppm: conn_ppm.close() if json_file: json_file.close() if sql_log_file: sql_log_file.close() in the above code can you modify one thing like at the end of the program that means after line number 428 check for all conditions and add the status and status message i will give one example you can check below if len(missingReleases)==0: statusMessage = "Releases match between source and target. Proceeding (re)replication." statusCode = "InProgress" elif len(missingReleases)>0: statusMessage="Missing releases between source and target." statusCode="Error" elif len(df_sourcePpmMaster)>len(df_targetPpmMaster): statusMessage="Source and Target does not match with previous releases." statusCode="Error" elif len(df_targetPpmMaster)>len(df_sourcePpmMaster): statusMessage="Target/Staging/Lower environments cannot have own releases." statusCode="Error" else: statusMessage="Unpredicted release compare." statusCode="Error"