Untitled
plain_text
a month ago
11 kB
1
Indexable
Never
from io import TextIOBase as io_TextIOBase from json import load as json_load from os import environ as os_environ from os import path as os_path from sys import exc_info as sys_exc_info from sys import argv as argv_param def setDbConnection(pdb_credentials): from base64 import b64decode as base_b64decode from sqlalchemy import create_engine as sqlalchemy_create_engine from sqlalchemy.engine.url import URL glbset_charset = 'utf-8' if 'ENCRYPT' in pdb_credentials and pdb_credentials['ENCRYPT'] == 'Y': pdb_credentials['DB_PASSWORD'] = base_b64decode(pdb_credentials['DB_PASSWORD']).decode('utf-8') print("") print("DbType : %s" % pdb_credentials['DB_TYPE']) print("Host : %s" % pdb_credentials['DB_HOST']) print("Port : %s" % pdb_credentials['DB_PORT']) print("User : %s" % pdb_credentials['DB_USER']) if pdb_credentials['DB_TYPE'] == 'ORACLE': connectionText = ('oracle://%s:%s@%s:%s/%s' % (pdb_credentials['DB_USER'] ,pdb_credentials['DB_PASSWORD'] ,pdb_credentials['DB_HOST'] ,pdb_credentials['DB_PORT'] ,pdb_credentials['DB_SID'] )) credentials = sqlalchemy_create_engine(connectionText ,encoding=glbset_charset ) connection = credentials.raw_connection() cursorOrSession = connection.cursor() elif pdb_credentials['DB_TYPE'] in ('MARIA','MYSQL'): db_url = {'database': pdb_credentials['DB_SCHEMA'] ,'drivername': 'mysql+pymysql' ,'username': pdb_credentials['DB_USER'] ,'password': pdb_credentials['DB_PASSWORD'] ,'host': pdb_credentials['DB_HOST'] ,'port':pdb_credentials['DB_PORT'] ,'query': {'charset': 'utf8'} } credentials = sqlalchemy_create_engine(URL(**db_url), encoding="utf8") #pymysql.ver.1.0.2 and above pdb_credentials = sqlalchemy_create_engine(URL.create(**db_url), encoding="utf8") connection = credentials.raw_connection() cursorOrSession = connection.cursor() else: raise Exception('REPLICATION STATUS INVALID DB_TYPE') return connection, cursorOrSession def getPpmMasterData(connection,argv_param,db_schema, pn_releaseId): from pandas import read_sql as pd_read_sql catalogId = argv_param[5] opId = argv_param[6] buId = argv_param[7] query=("SELECT * FROM %s.ppm_release_master WHERE op_id='%s' AND bu_id='%s' AND catalog_id='%s'" % (db_schema,opId,buId,catalogId)) df_data=pd_read_sql(query,con=connection) df_data.columns =df_data.columns.str.lower() print("PPM_Release_Master: %s {Total}" % len(df_data)) df_data=df_data.copy().loc[((df_data['op_id']==opId) & (df_data['bu_id']==buId) & (df_data['catalog_id']==catalogId) & ((df_data['release_status']=='Deployed') | (df_data['release_status']=='InProgress') | (df_data['release_status']=='InTesting') | (df_data['release_status']=='Cancelled') ) )] df_data=df_data[['release_id','release_status']] print("No. of releases : %s {InValid Releases removed]" % len(df_data)) df_data = df_data.loc[(df_data['release_id']!=pn_releaseId)] print("No. of releases : %s {current releaseId removed}" % len(df_data)) df_data["release_id"] = df_data["release_id"].astype(float) df_data = df_data.loc[(df_data['release_id']<=float(pn_releaseId))] print("No. of releases : %s {Temp Compare}" % len(df_data)) df_data = df_data.sort_values(by=['release_id'], ascending=[True]) return (df_data) def main(argv_param): try: releaseId = argv_param[1] releaseType = argv_param[2] repEnv = argv_param[3] repJobId = argv_param[4] catalogId = argv_param[5] opId = argv_param[6] buId = argv_param[7] statusMessage = "Releases match between source and target. Proceeding (re)replication." query_ppm = "InProgress" print("__main__.main(argv_param)") print("repEnv : %s" % repEnv) print("repJobId : %s" % repJobId) print("catalogId : %s" % catalogId) print("opId : %s" % opId) print("buId : %s" % buId) dirPC_conf = os_environ['PPM_PC_CONFIG'] dirPC_log = os_environ['PPM_PC_LOG'] fileJson = dirPC_conf + '/ppm_pc_replication.json' if not os_path.exists(fileJson): raise Exception("CREDENTIAL FILE NOT FOUND") db_credentials = json_load(open(fileJson)) dbSrc_credentials = {k.upper(): v for k, v in db_credentials.items() if k.upper()=="SOURCE"} if len(dbSrc_credentials)==0: raise Exception("SOURCE CREDENTIALS MISSING") dbSrc_credentials = dbSrc_credentials['SOURCE'] dbTgt_credentials = {k.upper(): v for k, v in db_credentials.items() if k.upper()==repEnv} if len(dbTgt_credentials)==0: raise Exception(repEnv + " CREDENTIALS MISSING") dbTgt_credentials = dbTgt_credentials[repEnv] srcConnection, src_cursorOrSession = setDbConnection(dbSrc_credentials) tgtConnection, tgt_cursorOrSession = setDbConnection(dbTgt_credentials) print("\nSOURCE") df_sourcePpmMaster = getPpmMasterData(srcConnection,argv_param,dbSrc_credentials["DB_SCHEMA"],releaseId) df_sourcePpmMaster = df_sourcePpmMaster.rename(columns={"release_id": "release_id_source", "release_status": "release_status_source"}) df_sourcePpmMaster.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_Source.csv",index=False,header=True,sep="|") print(df_sourcePpmMaster.dtypes) print("\nTARGET") df_targetPpmMaster = getPpmMasterData(tgtConnection,argv_param,dbTgt_credentials["DB_SCHEMA"],releaseId) df_targetPpmMaster = df_targetPpmMaster.rename(columns={"release_id": "release_id_target", "release_status": "release_status_target"}) df_targetPpmMaster.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_Target.csv",index=False,header=True,sep="|") print(df_targetPpmMaster.dtypes) print("\nCOMPARE") df_diffSrc = df_sourcePpmMaster.merge(df_targetPpmMaster ,how="left" ,left_on="release_id_source" ,right_on="release_id_target" ) df_diffSrc.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_DiffSrc.csv",index=False,header=True,sep="|") print("df_diffSrc : %s" % len(df_diffSrc)) missingReleases = df_diffSrc["release_id_source"].loc[df_diffSrc["release_id_target"].isnull()].to_list() print("missingReleases : %s" % len(missingReleases)) print(missingReleases) if len(missingReleases)==0: statusMessage = "Releases match between source and target. Proceeding (re)replication." statusCode = "InProgress" elif len(missingReleases)>0: statusMessage="Missing releases between source and target." statusCode="Error" elif len(df_sourcePpmMaster)>len(df_targetPpmMaster): statusMessage="Source and Target does not match with previous releases." statusCode="Error" elif len(df_targetPpmMaster)>len(df_sourcePpmMaster): statusMessage="Target/Staging/Lower environments cannot have own releases." statusCode="Error" else: statusMessage="Unpredicted release compare." statusCode="Error" query = "UPDATE "+dbSrc_credentials['DB_SCHEMA']+".ppm_replication_status SET status='"+statusCode+"'" \ + ", status_message='"+statusMessage+"'" \ + ", error_description=NULL"\ + ", updated_by='"+repJobId+"' /**/" \ + " WHERE replication_job_id='"+repJobId+"' AND release_id='"+str(releaseId)+"'" query_ppm = "UPDATE "+dbSrc_credentials['DB_SCHEMA']+".ppm_release_master SET replication_status='"+statusCode+"'"\ + ", updated_by='"+repJobId+"' /**/" \ + " WHERE release_id='"+str(releaseId)+"'" if dbSrc_credentials['DB_TYPE'] == 'ORACLE': query = query.replace('/**/',', updated_date=SYSDATE') query_ppm = query_ppm.replace('/**/',', updated_on=SYSDATE') elif dbSrc_credentials['DB_TYPE'] in ('MARIA','MYSQL'): query = query.replace('/**/',', updated_date=NOW()') query_ppm = query_ppm.replace('/**/',', updated_on=NOW()') print("Query: %s" % query) print("Query PPM: %s" % query_ppm) src_cursorOrSession.execute(query) src_cursorOrSession.execute(query_ppm) srcConnection.commit() if statusCode != "InProgress": raise Exception(statusMessage) except Exception as err: print ("Error - {} . Line No - {} ".format(str(err),str(sys_exc_info()[-1].tb_lineno))) return False return True if __name__ == '__main__': statFileTxt = "" statFile = "" errMessage = "" try: print("__main__") print("argv_param : %s" % argv_param) print("argv_param : %s" % len(argv_param)) if (len(argv_param)!=8): raise Exception ("ARGUMENTS NOT RECEIVED") repJobId = argv_param[4] print("repJobId : %s" % repJobId) statFileTxt = os_environ['PPM_PC_LOG'] + '/' + repJobId + "_" + os_path.basename(__file__).replace('.py', '.status') print("statFileTxt: %s" % statFileTxt) statFile=open(statFileTxt, "w") if main(argv_param): print("Source and Target are identical. Proceeding with (re)replication.") statFile.write("SUCCESS") else: statFile.write("FAILED") except Exception as err: print ("Error - {} . Line No - {} ".format(str(err),str(sys_exc_info()[-1].tb_lineno))) statFile.write("FAILED") if isinstance(statFile,io_TextIOBase): statFile.close() what does argv_param do?