Untitled
unknown
plain_text
a year ago
7.2 kB
3
Indexable
import pandas as pd import mysql.connector import json from base64 import b64decode as base_b64decode from base64 import b64encode as base_b64encode import logging import datetime releaseId = '275.2' releaseType = 'INPROGRESS' replicationTarget = 'TESTING' catalogId = '' opId = 'HOB' buId = 'DEFAULT' replicationJobId = 'REP_999_240' json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_reply.json" sql_log_file = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/ppm_reply.sql" logging.basicConfig(filename='/app/scripts/PPM_Release_Management/Product_Catalog_ETL/logs/ppm_reply.log', level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') query_count = 0 def write_sql(query_info): with open(sql_log_file, "a") as log_file: log_file.write(query_info) log_file.write('\n') try: with open(json_file_path) as json_file: json_data = json.load(json_file) # For PPM_PC encrypt = json_data.get('PPM_PC', {}).get('ENCRYPT') host = json_data.get('PPM_PC', {}).get('DB_HOST') port = json_data.get('PPM_PC', {}).get('DB_PORT') user = json_data.get('PPM_PC', {}).get('DB_USER') schema = json_data.get(replicationTarget, {}).get('DB_SCHEMA') logging.info("DATABASE - HOST FOR PPM_PC - " + host) logging.info("DATABASE - PORT FOR PPM_PC - " + port) logging.info("DATABASE - USER - PPM_PC" + user) if encrypt == 'Y': password = base_b64decode(json_data.get('PPM_PC', {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get('PPM_PC', {}).get('DB_PASSWORD') cnx = mysql.connector.connect(user=user, password=password, host=host, port=port) cursor = cnx.cursor() logging.info(f"connected to database server PPM_PC: {host}:{port}") primary_query = f"SELECT * FROM {schema}.etl_ppm_replication_master" cursor.execute(primary_query) logging.info(f"executed primary query - {primary_query}") query_info = f"-- ++++++++++++++++++++|PRIMARY_QUERY\n" query_info += f"-- #Query:\n{primary_query};\n" write_sql(query_info) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] df = pd.DataFrame(rows, columns=columns) # replicationTarget encrypt_tar = json_data.get(replicationTarget, {}).get('ENCRYPT') host_tar = json_data.get(replicationTarget, {}).get('DB_HOST') port_tar = json_data.get(replicationTarget, {}).get('DB_PORT') user_tar = json_data.get(replicationTarget, {}).get('DB_USER') logging.info("DATABASE - HOST FOR REPLICATION_TARGET - " + host_tar) logging.info("DATABASE - PORT FOR REPLICATION_TARGET - " + port_tar) logging.info("DATABASE - USER - REPLICATION_TARGET -" + user_tar) if encrypt_tar == 'Y': password = base_b64decode(json_data.get(replicationTarget, {}).get('DB_PASSWORD')).decode('utf-8') logging.info("password encryption is enabled") else: password = json_data.get(replicationTarget, {}).get('DB_PASSWORD') logging.info("password encryption is not enabled") cnx_tar = mysql.connector.connect(user=user_tar, password=password, host=host_tar, port=port_tar) cursor_tar = cnx_tar.cursor() logging.info(f"connected to database server REPLICATION_TARGET: {host_tar}:{port_tar}") replaced_string = "" if releaseType == 'DEPLOYED': logging.info(f"processing - {releaseType}") order = ['MASTER-CHILD', 'AUDIT-CHILD', 'MASTER', 'AUDIT', 'RELEASE'] logging.info("order of execution - 'MASTER-CHILD', 'AUDIT-CHILD', 'MASTER', 'AUDIT', 'RELEASE'") filtered_df = df.loc[ df['eprm_catalog'].isin(['PC', 'RELEASE']) & (df['eprm_enabled_flg'].isin(['Y', 'N'])) & df[ 'eprm_table_type'].isin( order)].copy() filtered_df['eprm_table_type'] = pd.Categorical(filtered_df['eprm_table_type'], categories=order, ordered=True) filtered_df = filtered_df.sort_values('eprm_table_type') df = df.sort_values('eprm_seq_nbr', ascending=False) for _, row in filtered_df.iterrows(): eprm_table_name = row['eprm_table_name'] eprm_join_cols_entity = row['eprm_join_cols_entity'] eprm_join_cols_reim = row['eprm_join_cols_reim'] eprm_table_alias = row['eprm_table_alias'] eprm_table_type = row['eprm_table_type'] eprm_parent_table_name = row['eprm_parent_table_name'] if eprm_table_type == 'AUDIT': eprm_table_col_pk = row['eprm_table_col_pk'] query = f"SELECT COUNT(*) FROM {schema}.{eprm_table_name} WHERE (" + eprm_table_col_pk + f") IN (SELECT entity_ref_nbr FROM {schema}.release_entity_inst_map WHERE release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "')" try: logging.info(f"processing {eprm_table_type}") cursor_tar.execute(query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += query + ";\n" write_sql(query_info) print(f"Count for {eprm_table_name}: {result[0]} (audit)") logging.info(f"Count for {eprm_table_name}: {result[0]} (audit)") except mysql.connector.Error as err: print(f"Error occurred while executing the query: {err}") logging.info(f"Error occurred while executing the query: {err}") if eprm_table_type == 'RELEASE': eprm_table_name = row['eprm_table_name'] eprm_seq_nbr = row['eprm_seq_nbr'] query = f"SELECT COUNT(*) FROM {schema}.{eprm_table_name} where release_id='" + releaseId + "' AND op_id='" + opId + "' AND bu_id='" + buId + "'" try: logging.info(f"processing {eprm_table_type}") cursor_tar.execute(query) result = cursor_tar.fetchone() query_info = f"-- ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ STATUS|{replicationTarget}| TABLE| {eprm_table_name}\n" query_info += f"-- #Query: Result:{result[0]}\n" query_count += 1 query_info += query + ";\n" write_sql(query_info) print(f"Count for {eprm_table_name}: {result[0]} (release){eprm_seq_nbr}") logging.info(f"Count for {eprm_table_name}: {result[0]} (release)") except mysql.connector.Error as err: print(f"Error occurred while executing the query: {err}") logging.info(f"Error occurred while executing the query: {err}")