import pandas as pd
import mysql.connector
import json
from base64 import b64decode as base_b64decode
from base64 import b64encode as base_b64encode
releaseId = '1.0'
releaseType = 'TESTING'
replicationTarget = ''
catalogId = ''
opId = 'HOB'
buId = 'DEFAULT'
replicationJobId = ''
json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_reply.json"
try:
with open(json_file_path) as json_file:
json_data = json.load(json_file)
encrypt = json_data.get(releaseType, {}).get('ENCRYPT')
host = json_data.get(releaseType, {}).get('DB_HOST')
print(host)
port = json_data.get(releaseType, {}).get('DB_PORT')
user = json_data.get(releaseType, {}).get('DB_USER')
if encrypt == 'Y':
password = base_b64decode(json_data.get(releaseType, {}).get('DB_PASSWORD')).decode('utf-8')
else:
password = json_data.get(releaseType, {}).get('DB_PASSWORD')
cnx = mysql.connector.connect(user=user, password=password, host=host, port=port)
cursor = cnx.cursor()
primary_query = "SELECT * FROM tibtcare_ppm_st2.etl_ppm_replication_master"
cursor.execute(primary_query)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(rows, columns=columns)
filtered_df = df[df['eprm_catalog'].isin(['PC']) & (df['eprm_enabled_flg'] == 'Y')]
for _, row in filtered_df.iterrows():
eprm_table_name = row['eprm_table_name']
eprm_join_cols_entity = row['eprm_join_cols_entity']
eprm_join_cols_reim = row['eprm_join_cols_reim']
eprm_table_alias = row['eprm_table_alias']
# Replace 'AND' with ',' in eprm_join_cols_reim
eprm_join_cols_reim = eprm_join_cols_reim.replace('AND', ',')
# Split the string by ',' to get individual column assignments
columns_reim = eprm_join_cols_reim.split(',')
# Construct the modified assignment string
assignment_string = ''
for col_reim in columns_reim:
# Extract the column name after '='
col_name = col_reim.split('=')[0].strip()
# Append the modified assignment to the string
assignment_string += f"{col_reim.replace(col_name, eprm_table_alias + '.' + col_name)}, "
# Remove the trailing comma and whitespace
assignment_string = assignment_string.rstrip(', ')
assignment_string = assignment_string.replace("AND", ",")
eprm_join_reim=assignment_string
secondary_query = f"SELECT COUNT(*) FROM tibtcare_ppm_st2.{eprm_table_name} WHERE ({eprm_join_cols_entity}) IN (SELECT {eprm_join_reim} FROM tibtcare_ppm_st2.release_entity_inst_map WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}')"
# Extract only the column names after "=" and "."
column_names = [col.split('=')[1].split('.')[-1].strip() for col in columns_reim]
# Create a string of the extracted column names separated by commas
column_names_string = ', '.join(column_names)
# Replace the column fields in the secondary query with the extracted column names
secondary_query = secondary_query.replace(assignment_string, column_names_string)
try:
cursor.execute(secondary_query)
result = cursor.fetchone()
print(f"Count for {eprm_table_name}: {result[0]}")
except mysql.connector.Error as err:
print(f"Error occurred while executing the query: {err}")
cursor.close()
cnx.close()
except FileNotFoundError:
print(f"File {json_file_path} not found.")
except json.JSONDecodeError:
print(f"Error occurred while parsing JSON file.")
except mysql.connector.Error as err:
print(f"Error occurred while connecting to the MySQL database: {err}")
except Exception as e:
print(f"An unexpected error occurred: {e}")