Untitled
unknown
plain_text
2 years ago
11 kB
9
Indexable
from io import TextIOBase as io_TextIOBase
from json import load as json_load
from os import environ as os_environ
from os import path as os_path
from sys import exc_info as sys_exc_info
from sys import argv as argv_param
def setDbConnection(pdb_credentials):
from base64 import b64decode as base_b64decode
from sqlalchemy import create_engine as sqlalchemy_create_engine
from sqlalchemy.engine.url import URL
glbset_charset = 'utf-8'
if 'ENCRYPT' in pdb_credentials and pdb_credentials['ENCRYPT'] == 'Y':
pdb_credentials['DB_PASSWORD'] = base_b64decode(pdb_credentials['DB_PASSWORD']).decode('utf-8')
print("")
print("DbType : %s" % pdb_credentials['DB_TYPE'])
print("Host : %s" % pdb_credentials['DB_HOST'])
print("Port : %s" % pdb_credentials['DB_PORT'])
print("User : %s" % pdb_credentials['DB_USER'])
if pdb_credentials['DB_TYPE'] == 'ORACLE':
connectionText = ('oracle://%s:%s@%s:%s/%s' % (pdb_credentials['DB_USER']
,pdb_credentials['DB_PASSWORD']
,pdb_credentials['DB_HOST']
,pdb_credentials['DB_PORT']
,pdb_credentials['DB_SID']
))
credentials = sqlalchemy_create_engine(connectionText
,encoding=glbset_charset
)
connection = credentials.raw_connection()
cursorOrSession = connection.cursor()
elif pdb_credentials['DB_TYPE'] in ('MARIA','MYSQL'):
db_url = {'database': pdb_credentials['DB_SCHEMA']
,'drivername': 'mysql+pymysql'
,'username': pdb_credentials['DB_USER']
,'password': pdb_credentials['DB_PASSWORD']
,'host': pdb_credentials['DB_HOST']
,'port':pdb_credentials['DB_PORT']
,'query': {'charset': 'utf8'}
}
credentials = sqlalchemy_create_engine(URL(**db_url), encoding="utf8")
#pymysql.ver.1.0.2 and above pdb_credentials = sqlalchemy_create_engine(URL.create(**db_url), encoding="utf8")
connection = credentials.raw_connection()
cursorOrSession = connection.cursor()
else:
raise Exception('REPLICATION STATUS INVALID DB_TYPE')
return connection, cursorOrSession
def getPpmMasterData(connection,argv_param,db_schema, pn_releaseId):
from pandas import read_sql as pd_read_sql
catalogId = argv_param[5]
opId = argv_param[6]
buId = argv_param[7]
query=("SELECT * FROM %s.ppm_release_master WHERE op_id='%s' AND bu_id='%s' AND catalog_id='%s'" % (db_schema,opId,buId,catalogId))
df_data=pd_read_sql(query,con=connection)
df_data.columns =df_data.columns.str.lower()
print("PPM_Release_Master: %s {Total}" % len(df_data))
df_data=df_data.copy().loc[((df_data['op_id']==opId)
& (df_data['bu_id']==buId)
& (df_data['catalog_id']==catalogId)
& ((df_data['release_status']=='Deployed')
| (df_data['release_status']=='InProgress')
| (df_data['release_status']=='InTesting')
| (df_data['release_status']=='Cancelled')
)
)]
df_data=df_data[['release_id','release_status']]
print("No. of releases : %s {InValid Releases removed]" % len(df_data))
df_data = df_data.loc[(df_data['release_id']!=pn_releaseId)]
print("No. of releases : %s {current releaseId removed}" % len(df_data))
df_data["release_id"] = df_data["release_id"].astype(float)
df_data = df_data.loc[(df_data['release_id']<=float(pn_releaseId))]
print("No. of releases : %s {Temp Compare}" % len(df_data))
df_data = df_data.sort_values(by=['release_id'], ascending=[True])
return (df_data)
def main(argv_param):
try:
releaseId = argv_param[1]
releaseType = argv_param[2]
repEnv = argv_param[3]
repJobId = argv_param[4]
catalogId = argv_param[5]
opId = argv_param[6]
buId = argv_param[7]
statusMessage = "Releases match between source and target. Proceeding (re)replication."
query_ppm = "InProgress"
print("__main__.main(argv_param)")
print("repEnv : %s" % repEnv)
print("repJobId : %s" % repJobId)
print("catalogId : %s" % catalogId)
print("opId : %s" % opId)
print("buId : %s" % buId)
dirPC_conf = os_environ['PPM_PC_CONFIG']
dirPC_log = os_environ['PPM_PC_LOG']
fileJson = dirPC_conf + '/ppm_pc_replication.json'
if not os_path.exists(fileJson):
raise Exception("CREDENTIAL FILE NOT FOUND")
db_credentials = json_load(open(fileJson))
dbSrc_credentials = {k.upper(): v for k, v in db_credentials.items() if k.upper()=="SOURCE"}
if len(dbSrc_credentials)==0:
raise Exception("SOURCE CREDENTIALS MISSING")
dbSrc_credentials = dbSrc_credentials['SOURCE']
dbTgt_credentials = {k.upper(): v for k, v in db_credentials.items() if k.upper()==repEnv}
if len(dbTgt_credentials)==0:
raise Exception(repEnv + " CREDENTIALS MISSING")
dbTgt_credentials = dbTgt_credentials[repEnv]
srcConnection, src_cursorOrSession = setDbConnection(dbSrc_credentials)
tgtConnection, tgt_cursorOrSession = setDbConnection(dbTgt_credentials)
print("\nSOURCE")
df_sourcePpmMaster = getPpmMasterData(srcConnection,argv_param,dbSrc_credentials["DB_SCHEMA"],releaseId)
df_sourcePpmMaster = df_sourcePpmMaster.rename(columns={"release_id": "release_id_source", "release_status": "release_status_source"})
df_sourcePpmMaster.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_Source.csv",index=False,header=True,sep="|")
print(df_sourcePpmMaster.dtypes)
print("\nTARGET")
df_targetPpmMaster = getPpmMasterData(tgtConnection,argv_param,dbTgt_credentials["DB_SCHEMA"],releaseId)
df_targetPpmMaster = df_targetPpmMaster.rename(columns={"release_id": "release_id_target", "release_status": "release_status_target"})
df_targetPpmMaster.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_Target.csv",index=False,header=True,sep="|")
print(df_targetPpmMaster.dtypes)
print("\nCOMPARE")
df_diffSrc = df_sourcePpmMaster.merge(df_targetPpmMaster
,how="left"
,left_on="release_id_source"
,right_on="release_id_target"
)
df_diffSrc.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_DiffSrc.csv",index=False,header=True,sep="|")
print("df_diffSrc : %s" % len(df_diffSrc))
missingReleases = df_diffSrc["release_id_source"].loc[df_diffSrc["release_id_target"].isnull()].to_list()
print("missingReleases : %s" % len(missingReleases))
print(missingReleases)
if len(missingReleases)==0:
statusMessage = "Releases match between source and target. Proceeding (re)replication."
statusCode = "InProgress"
elif len(missingReleases)>0:
statusMessage="Missing releases between source and target."
statusCode="Error"
elif len(df_sourcePpmMaster)>len(df_targetPpmMaster):
statusMessage="Source and Target does not match with previous releases."
statusCode="Error"
elif len(df_targetPpmMaster)>len(df_sourcePpmMaster):
statusMessage="Target/Staging/Lower environments cannot have own releases."
statusCode="Error"
else:
statusMessage="Unpredicted release compare."
statusCode="Error"
query = "UPDATE "+dbSrc_credentials['DB_SCHEMA']+".ppm_replication_status SET status='"+statusCode+"'" \
+ ", status_message='"+statusMessage+"'" \
+ ", error_description=NULL"\
+ ", updated_by='"+repJobId+"' /**/" \
+ " WHERE replication_job_id='"+repJobId+"' AND release_id='"+str(releaseId)+"'"
query_ppm = "UPDATE "+dbSrc_credentials['DB_SCHEMA']+".ppm_release_master SET replication_status='"+statusCode+"'"\
+ ", updated_by='"+repJobId+"' /**/" \
+ " WHERE release_id='"+str(releaseId)+"'"
if dbSrc_credentials['DB_TYPE'] == 'ORACLE':
query = query.replace('/**/',', updated_date=SYSDATE')
query_ppm = query_ppm.replace('/**/',', updated_on=SYSDATE')
elif dbSrc_credentials['DB_TYPE'] in ('MARIA','MYSQL'):
query = query.replace('/**/',', updated_date=NOW()')
query_ppm = query_ppm.replace('/**/',', updated_on=NOW()')
print("Query: %s" % query)
print("Query PPM: %s" % query_ppm)
src_cursorOrSession.execute(query)
src_cursorOrSession.execute(query_ppm)
srcConnection.commit()
if statusCode != "InProgress":
raise Exception(statusMessage)
except Exception as err:
print ("Error - {} . Line No - {} ".format(str(err),str(sys_exc_info()[-1].tb_lineno)))
return False
return True
if __name__ == '__main__':
statFileTxt = ""
statFile = ""
errMessage = ""
try:
print("__main__")
print("argv_param : %s" % argv_param)
print("argv_param : %s" % len(argv_param))
if (len(argv_param)!=8):
raise Exception ("ARGUMENTS NOT RECEIVED")
repJobId = argv_param[4]
print("repJobId : %s" % repJobId)
statFileTxt = os_environ['PPM_PC_LOG'] + '/' + repJobId + "_" + os_path.basename(__file__).replace('.py', '.status')
print("statFileTxt: %s" % statFileTxt)
statFile=open(statFileTxt, "w")
if main(argv_param):
print("Source and Target are identical. Proceeding with (re)replication.")
statFile.write("SUCCESS")
else:
statFile.write("FAILED")
except Exception as err:
print ("Error - {} . Line No - {} ".format(str(err),str(sys_exc_info()[-1].tb_lineno)))
statFile.write("FAILED")
if isinstance(statFile,io_TextIOBase):
statFile.close()
what does argv_param do?Editor is loading...