Untitled

mail@pastecode.io avatarunknown
plain_text
a month ago
11 kB
1
Indexable
Never
from io import TextIOBase as io_TextIOBase
from json import load as json_load
from os import environ as os_environ
from os import path as os_path
from sys import exc_info as sys_exc_info
from sys import argv as argv_param


def setDbConnection(pdb_credentials):
    from base64 import b64decode as base_b64decode
    from sqlalchemy import create_engine as sqlalchemy_create_engine
    from sqlalchemy.engine.url import URL
    glbset_charset = 'utf-8'
    
    if 'ENCRYPT' in pdb_credentials and pdb_credentials['ENCRYPT'] == 'Y':
        pdb_credentials['DB_PASSWORD'] = base_b64decode(pdb_credentials['DB_PASSWORD']).decode('utf-8')
    print("")
    print("DbType     : %s" % pdb_credentials['DB_TYPE'])
    print("Host       : %s" % pdb_credentials['DB_HOST'])
    print("Port       : %s" % pdb_credentials['DB_PORT'])
    print("User       : %s" % pdb_credentials['DB_USER'])
    
    if pdb_credentials['DB_TYPE'] == 'ORACLE':
        connectionText = ('oracle://%s:%s@%s:%s/%s' % (pdb_credentials['DB_USER']
                                                      ,pdb_credentials['DB_PASSWORD']
                                                      ,pdb_credentials['DB_HOST']
                                                      ,pdb_credentials['DB_PORT']
                                                      ,pdb_credentials['DB_SID']
                                                      ))
        credentials = sqlalchemy_create_engine(connectionText
                                             ,encoding=glbset_charset
                                             )
        connection = credentials.raw_connection()
        cursorOrSession = connection.cursor()
        
    elif pdb_credentials['DB_TYPE'] in ('MARIA','MYSQL'):
        db_url = {'database': pdb_credentials['DB_SCHEMA']
                 ,'drivername': 'mysql+pymysql'
                 ,'username': pdb_credentials['DB_USER']
                 ,'password': pdb_credentials['DB_PASSWORD']
                 ,'host': pdb_credentials['DB_HOST']
                 ,'port':pdb_credentials['DB_PORT']
                 ,'query': {'charset': 'utf8'}
                 }
        credentials = sqlalchemy_create_engine(URL(**db_url), encoding="utf8")
        #pymysql.ver.1.0.2 and above pdb_credentials = sqlalchemy_create_engine(URL.create(**db_url), encoding="utf8")
        connection = credentials.raw_connection()
        cursorOrSession = connection.cursor()
    else:
        raise Exception('REPLICATION STATUS INVALID DB_TYPE')
    
    return connection, cursorOrSession


def getPpmMasterData(connection,argv_param,db_schema, pn_releaseId):
    from pandas import read_sql as pd_read_sql
    catalogId = argv_param[5]
    opId = argv_param[6]
    buId = argv_param[7]
    
    query=("SELECT * FROM %s.ppm_release_master WHERE op_id='%s' AND bu_id='%s' AND catalog_id='%s'" % (db_schema,opId,buId,catalogId))
    df_data=pd_read_sql(query,con=connection)
    df_data.columns =df_data.columns.str.lower()
    print("PPM_Release_Master: %s {Total}" % len(df_data))
    df_data=df_data.copy().loc[((df_data['op_id']==opId)
                                 & (df_data['bu_id']==buId)
                                 & (df_data['catalog_id']==catalogId)
                                 & ((df_data['release_status']=='Deployed') 
                                     | (df_data['release_status']=='InProgress')
                                     | (df_data['release_status']=='InTesting')
                                     | (df_data['release_status']=='Cancelled')
                                    )
                               )]
    df_data=df_data[['release_id','release_status']]
    print("No. of releases   : %s {InValid Releases removed]" % len(df_data))
    df_data = df_data.loc[(df_data['release_id']!=pn_releaseId)]
    print("No. of releases   : %s {current releaseId removed}" % len(df_data))
    df_data["release_id"] = df_data["release_id"].astype(float)
    df_data = df_data.loc[(df_data['release_id']<=float(pn_releaseId))]
    print("No. of releases   : %s {Temp Compare}" % len(df_data))
    df_data = df_data.sort_values(by=['release_id'], ascending=[True])
    return (df_data)


def main(argv_param):
    try:
        releaseId = argv_param[1]
        releaseType = argv_param[2]
        repEnv = argv_param[3]
        repJobId = argv_param[4]
        catalogId = argv_param[5]
        opId = argv_param[6]
        buId = argv_param[7]
        
        statusMessage = "Releases match between source and target. Proceeding (re)replication."
        query_ppm = "InProgress"
        
        print("__main__.main(argv_param)")
        print("repEnv     : %s" % repEnv)
        print("repJobId   : %s" % repJobId)
        print("catalogId  : %s" % catalogId)
        print("opId       : %s" % opId)
        print("buId       : %s" % buId)
        
        dirPC_conf = os_environ['PPM_PC_CONFIG']
        dirPC_log = os_environ['PPM_PC_LOG']
        fileJson = dirPC_conf + '/ppm_pc_replication.json'
        if not os_path.exists(fileJson):
            raise Exception("CREDENTIAL FILE NOT FOUND")
        
        db_credentials = json_load(open(fileJson))
        dbSrc_credentials = {k.upper(): v for k, v in db_credentials.items() if k.upper()=="SOURCE"}
        if len(dbSrc_credentials)==0:
            raise Exception("SOURCE CREDENTIALS MISSING")
        dbSrc_credentials = dbSrc_credentials['SOURCE']
        
        dbTgt_credentials = {k.upper(): v for k, v in db_credentials.items() if k.upper()==repEnv}
        if len(dbTgt_credentials)==0:
            raise Exception(repEnv + " CREDENTIALS MISSING")
        dbTgt_credentials = dbTgt_credentials[repEnv]
        
        srcConnection, src_cursorOrSession = setDbConnection(dbSrc_credentials)
        tgtConnection, tgt_cursorOrSession = setDbConnection(dbTgt_credentials)
        
        print("\nSOURCE")
        df_sourcePpmMaster = getPpmMasterData(srcConnection,argv_param,dbSrc_credentials["DB_SCHEMA"],releaseId)
        df_sourcePpmMaster = df_sourcePpmMaster.rename(columns={"release_id": "release_id_source", "release_status": "release_status_source"})
        df_sourcePpmMaster.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_Source.csv",index=False,header=True,sep="|")
        print(df_sourcePpmMaster.dtypes)
        
        print("\nTARGET")
        df_targetPpmMaster = getPpmMasterData(tgtConnection,argv_param,dbTgt_credentials["DB_SCHEMA"],releaseId)
        df_targetPpmMaster = df_targetPpmMaster.rename(columns={"release_id": "release_id_target", "release_status": "release_status_target"})
        df_targetPpmMaster.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_Target.csv",index=False,header=True,sep="|")
        print(df_targetPpmMaster.dtypes)
        
        print("\nCOMPARE")
        df_diffSrc = df_sourcePpmMaster.merge(df_targetPpmMaster
                                             ,how="left"
                                             ,left_on="release_id_source"
                                             ,right_on="release_id_target"
                                             )
        df_diffSrc.to_csv(dirPC_log+"/"+repJobId+"_ppmReleaseMaster_DiffSrc.csv",index=False,header=True,sep="|")
        print("df_diffSrc        : %s" % len(df_diffSrc))
        missingReleases = df_diffSrc["release_id_source"].loc[df_diffSrc["release_id_target"].isnull()].to_list()
        print("missingReleases   : %s" % len(missingReleases))
        print(missingReleases)
        
        if len(missingReleases)==0:
            statusMessage = "Releases match between source and target. Proceeding (re)replication."
            statusCode = "InProgress"
        elif len(missingReleases)>0:
            statusMessage="Missing releases between source and target."
            statusCode="Error"
        elif len(df_sourcePpmMaster)>len(df_targetPpmMaster):
            statusMessage="Source and Target does not match with previous releases."
            statusCode="Error"
        elif len(df_targetPpmMaster)>len(df_sourcePpmMaster):
            statusMessage="Target/Staging/Lower environments cannot have own releases."
            statusCode="Error"
        else:
            statusMessage="Unpredicted release compare."
            statusCode="Error"
        
        
        query = "UPDATE "+dbSrc_credentials['DB_SCHEMA']+".ppm_replication_status SET status='"+statusCode+"'" \
                           + ", status_message='"+statusMessage+"'" \
                           + ", error_description=NULL"\
                           + ", updated_by='"+repJobId+"' /**/" \
                           + " WHERE replication_job_id='"+repJobId+"' AND release_id='"+str(releaseId)+"'"
        
        query_ppm = "UPDATE "+dbSrc_credentials['DB_SCHEMA']+".ppm_release_master SET replication_status='"+statusCode+"'"\
                           + ", updated_by='"+repJobId+"' /**/" \
                           + " WHERE release_id='"+str(releaseId)+"'"
        
        if dbSrc_credentials['DB_TYPE'] == 'ORACLE':
            query = query.replace('/**/',', updated_date=SYSDATE')
            query_ppm = query_ppm.replace('/**/',', updated_on=SYSDATE')
        elif dbSrc_credentials['DB_TYPE'] in ('MARIA','MYSQL'):
            query = query.replace('/**/',', updated_date=NOW()')
            query_ppm = query_ppm.replace('/**/',', updated_on=NOW()')
        print("Query: %s" % query)
        print("Query PPM: %s" % query_ppm)
        src_cursorOrSession.execute(query)
        src_cursorOrSession.execute(query_ppm)
        srcConnection.commit()
        
        if statusCode != "InProgress":
            raise Exception(statusMessage)
    except Exception as err:
        print ("Error - {} . Line No - {} ".format(str(err),str(sys_exc_info()[-1].tb_lineno)))
        return False
    return True


if __name__ == '__main__':
    statFileTxt = ""
    statFile = ""
    errMessage = ""
    try:
        print("__main__")
        print("argv_param : %s" % argv_param)
        print("argv_param : %s" % len(argv_param))
        
        if (len(argv_param)!=8):
            raise Exception ("ARGUMENTS NOT RECEIVED")
        
        repJobId = argv_param[4]
        print("repJobId   : %s" % repJobId)
        statFileTxt = os_environ['PPM_PC_LOG'] + '/' + repJobId + "_" + os_path.basename(__file__).replace('.py', '.status')
        
        print("statFileTxt: %s" % statFileTxt)
        statFile=open(statFileTxt, "w")
        
        if main(argv_param):
            print("Source and Target are identical. Proceeding with (re)replication.")
            statFile.write("SUCCESS")
        else:
            statFile.write("FAILED")
    except Exception as err:
        print ("Error - {} . Line No - {} ".format(str(err),str(sys_exc_info()[-1].tb_lineno)))
        statFile.write("FAILED")
    
    if isinstance(statFile,io_TextIOBase):
        statFile.close()



what does argv_param do?