Untitled

mail@pastecode.io avatar
unknown
plain_text
2 years ago
3.9 kB
2
Indexable
import pandas as pd
import mysql.connector
import json
from base64 import b64decode as base_b64decode
from base64 import b64encode as base_b64encode

releaseId = '1.0'
releaseType = 'TESTING'
replicationTarget = ''
catalogId = ''
opId = 'HOB'
buId = 'DEFAULT'
replicationJobId = ''
json_file_path = "/app/scripts/PPM_Release_Management/Product_Catalog_ETL/config/ppm_reply.json"

try:
    with open(json_file_path) as json_file:
        json_data = json.load(json_file)

    encrypt = json_data.get(releaseType, {}).get('ENCRYPT')
    host = json_data.get(releaseType, {}).get('DB_HOST')
    print(host)
    port = json_data.get(releaseType, {}).get('DB_PORT')
    user = json_data.get(releaseType, {}).get('DB_USER')

    if encrypt == 'Y':
        password = base_b64decode(json_data.get(releaseType, {}).get('DB_PASSWORD')).decode('utf-8')
    else:
        password = json_data.get(releaseType, {}).get('DB_PASSWORD')

    cnx = mysql.connector.connect(user=user, password=password, host=host, port=port)
    cursor = cnx.cursor()

    primary_query = "SELECT * FROM tibtcare_ppm_st2.etl_ppm_replication_master"
    cursor.execute(primary_query)

    rows = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]
    df = pd.DataFrame(rows, columns=columns)

    filtered_df = df[df['eprm_catalog'].isin(['PC']) & (df['eprm_enabled_flg'] == 'Y')]

    for _, row in filtered_df.iterrows():
        eprm_table_name = row['eprm_table_name']
        eprm_join_cols_entity = row['eprm_join_cols_entity']
        eprm_join_cols_reim = row['eprm_join_cols_reim']
        eprm_table_alias = row['eprm_table_alias']

        # Replace 'AND' with ',' in eprm_join_cols_reim
        eprm_join_cols_reim = eprm_join_cols_reim.replace('AND', ',')
        # Split the string by ',' to get individual column assignments
        columns_reim = eprm_join_cols_reim.split(',')

        # Construct the modified assignment string
        assignment_string = ''
        for col_reim in columns_reim:
            # Extract the column name after '='
            col_name = col_reim.split('=')[0].strip()
            # Append the modified assignment to the string
            assignment_string += f"{col_reim.replace(col_name, eprm_table_alias + '.' + col_name)}, "

        # Remove the trailing comma and whitespace
        assignment_string = assignment_string.rstrip(', ')
        assignment_string = assignment_string.replace("AND", ",")
        eprm_join_reim=assignment_string

        secondary_query = f"SELECT COUNT(*) FROM tibtcare_ppm_st2.{eprm_table_name} WHERE ({eprm_join_cols_entity}) IN (SELECT {eprm_join_reim} FROM tibtcare_ppm_st2.release_entity_inst_map WHERE release_id='{releaseId}' AND op_id='{opId}' AND bu_id='{buId}')"

        # Extract only the column names after "=" and "."
        column_names = [col.split('=')[1].split('.')[-1].strip() for col in columns_reim]
        # Create a string of the extracted column names separated by commas
        column_names_string = ', '.join(column_names)

        # Replace the column fields in the secondary query with the extracted column names
        secondary_query = secondary_query.replace(assignment_string, column_names_string)
        try:
            cursor.execute(secondary_query)
            result = cursor.fetchone()
            print(f"Count for {eprm_table_name}: {result[0]}")
        except mysql.connector.Error as err:
            print(f"Error occurred while executing the query: {err}")

    cursor.close()
    cnx.close()

except FileNotFoundError:
    print(f"File {json_file_path} not found.")
except json.JSONDecodeError:
    print(f"Error occurred while parsing JSON file.")
except mysql.connector.Error as err:
    print(f"Error occurred while connecting to the MySQL database: {err}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")