Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
21 kB
2
Indexable
Never
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator
import pandas as pd
from datetime import datetime
import json
import gc
from os import environ as os_environ
from os import path as os_path
import sys
import uuid
import time

#--Invoke utility based function
from utils.setCassandraDBConnection import setCassandraDBConnection
from utils.setDbEngine import setDbEngine
from utils.loadDataFrameToCassandra import loadDataFrameToCassandra

def getConfiguration(__file__):
    from configparser import ConfigParser as conf_ConfigParser
    #print("DAG __file__      : %s" % __file__)
    
    #Entity-Account
    configProperties = conf_ConfigParser()
    configFile = __file__.replace('/dags/', '/config/').replace('.py', '.config')
    if not os_path.exists(configFile):
        print("Entity config file does not exist.")
    #print("Entity config File: %s" % configFile)
    configProperties.read(configFile)
    
    #Entity-Generic
    configMainProperties = conf_ConfigParser()
    configMainFile = __file__.replace('/dags/', '/config/')
    configMainFile = configMainFile[:configMainFile.find("_")]+".config"
    if not os_path.exists(configMainFile):
        print("Entity generic config file does not exist.")
    #print("Config File       : %s" % configMainFile)
    configMainProperties.read(configMainFile)
    
    return (configProperties, configMainProperties)


def func_pandas_factory(colnames,rows):
    return pd.DataFrame(rows,columns = colnames)

#def fn_processAccount():
#    __file__="/app/server/HOBS-DataPipeline/dags/odsBulkMigration/odsBulkMigration_Account.py"
def fn_processAccount(**kwargs):
    
    try:
        entity = 'Account'
        dagIdVar = os_path.basename(__file__).replace(".py","")
        adminVariableValue = {}
        cassPath = os_environ.get('CASSANDRA_HOME')
        dfDataSet = pd.DataFrame()
        dfMigMasterFailed = pd.DataFrame()
        failedEntities = {}
        
        if not cassPath:
            raise Exception("CASSANDRA_HOME not configured.")

        # Configuration  Properties
        configProperties, configMainProperties = getConfiguration(__file__)
        print("configMainProperties")
        print(configMainProperties.sections())
        print("configProperties")
        print(configProperties.sections())
        
        opId = configMainProperties.get("config","opId")
        buId = configMainProperties.get("config","buId")
        dateTimeFormat = configMainProperties.get("config","dateTimeFormat", raw=True)
        dateFormat = configMainProperties.get("config","dateFormat", raw=True)
        entityType = json.loads(configProperties.get("config","entityType"))
        adminVariableValue = None
        dfDataSet = pd.DataFrame()
        dfMigMasterFailed = pd.DataFrame()
        failedEntities = {}
        cassPath = os_environ.get('CASSANDRA_HOME')
        
        if kwargs['dag_run'].conf:
            adminVariableValue = kwargs['dag_run'].conf
            print ('Airflow conf variable value assigned {odsBulkMigration_Account: %s}' % adminVariableValue)
        elif Variable.get("odsBulkMigration_Account",None):
            adminVariableValue = Variable.get("odsBulkMigration_Account",None)
            print ('Airflow UI variable value assigned {odsBulkMigration_Account: %s}' % adminVariableValue)
            adminVariableValue = json.loads(adminVariableValue)
        
        if not adminVariableValue:
            raise Exception("adminVariableValue{odsBulkMigration_Account} value not set.")
        
        if 'entityId' in adminVariableValue.keys() and adminVariableValue['entityId']=='Account':
            entityId = adminVariableValue['entityId']
        else:
            raise Exception("entityId not set or entityId is not account.")
        
        if kwargs['dag_run'].conf.get('batchId') :
            batchId = int(kwargs['dag_run'].conf.get('batchId'))
            print("batchId: %s [conf defined]" % batchId)
        elif ('batchId' in adminVariableValue.keys() and adminVariableValue['batchId']!=''):
            batchId = int(adminVariableValue['batchId'])
            print("batchId: %s [UI defined]" % batchId)
        else:
            raise Exception("batchId not present or batchId value is not valid")
        
        if not cassPath:
            raise Exception("CASSANDRA_HOME not configured.")
        
        if ('opId' in adminVariableValue.keys() and adminVariableValue['opId']):
            opId = adminVariableValue['opId']
            print("opId   : %s [UI defined]" % opId)
        
        if ('buId' in adminVariableValue.keys() and adminVariableValue['buId']):
            buId = adminVariableValue['buId']
            print("buId   : %s [UI defined]" % buId)

        
        # Cassandra database connection
        ##dbConnectRetVal = setCassandraDBConnection(configMainProperties)
        #--Invoke utility based function
        dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
        if not dbConnectRetVal[0]:
            raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1])
        cassDbSession = dbConnectRetVal[1]
        print("Cassandra database connection established")
        
        # Source database connection
        #dbConnectRetVal = setSourceConnection(configMainProperties)
        #--Invoke utility based function
        dbConnectRetVal = setDbEngine()
        if not dbConnectRetVal[0]:
           raise Exception("Source database connection failed: " + dbConnectRetVal[1])
        sourceDbEngine = dbConnectRetVal[1]
        print("Source database connection established")
        
        print("opId                : %s" % opId)
        print("buId                : %s" % buId)
        #print("cassPath            : %s" % cassPath)
        
        for indx in entityType:
            if configProperties.has_section(indx):
                print("-------- PROCESSING ENTITY: %s ----------------" % indx)
                if configProperties.has_option(indx,"datasets"):
                    dataset = json.loads(configProperties.get(indx,"datasets"))
                    print("dataset: %s" % dataset)
                    for indxDS in dataset:
                        print("--- Processing dataset: %s" % indxDS)
                        query=configProperties.get(indx,indxDS+".query",raw=True)
                        columnsForComparison=json.loads(configProperties.get(indx,indxDS+".columnsForComparison"))
                        lkupColumns=json.loads(configProperties.get(indx,indxDS+".lkupColumns"))
                        joinCondition=configProperties.get(indx,indxDS+".joinCondition",raw=True)
                        primaryKey=configProperties.get(indx,indxDS+".primaryKey",raw=True) if configProperties.has_option(indx,indxDS+".primaryKey") else None
                        dateTimeColumns=json.loads(configProperties.get(indx,indxDS+".dateTimeColumns")) if configProperties.has_option(indx,indxDS+".dateTimeColumns") else []
                        dateColumns=json.loads(configProperties.get(indx,indxDS+".dateColumns")) if configProperties.has_option(indx,indxDS+".dateColumns") else []
                        columnsToRemove=json.loads(configProperties.get(indx,indxDS+".columnsToRemove")) if configProperties.has_option(indx,indxDS+".columnsToRemove") else []
                        connDatabase=configProperties.get(indx,indxDS+".database",raw=True) if configProperties.has_option(indx,indxDS+".database") else None
                        columnsToInt=json.loads(configProperties.get(indx,indxDS+".columnsToInt")) if configProperties.has_option(indx,indxDS+".columnsToInt") else []
                        columnsToText=json.loads(configProperties.get(indx,indxDS+".columnsToText")) if configProperties.has_option(indx,indxDS+".columnsToText") else []
                        
                        if 'homm_batch' in query.lower():
                            query=(configProperties.get(indx,indxDS+".query",raw=True) % (batchId))
                        elif "op_id='%s'" in query.lower().replace(' ','') and "bu_id='%s'" in query.lower().replace(' ',''):
                            query=(configProperties.get(indx,indxDS+".query",raw=True) % (opId, buId))
                        
                        print("Query               : %s" % query)
                        print("columnsForComparison: %s" % columnsForComparison)
                        print("lkupColumns         : %s" % lkupColumns)
                        print("joinCondition       : %s" % joinCondition)
                        print("primaryKey          : %s" % primaryKey)
                        print("dateTimeColumns     : %s" % dateTimeColumns)
                        print("dateColumns         : %s" % dateColumns)
                        print("columnsToInt        : %s" % columnsToInt)
                        print("columnsToText       : %s" % columnsToText)
                        print("connDatabase        : %s" % connDatabase)
                        
                        if indxDS=="stageData":
                            dfSrcData = cassDbSession.execute(query, timeout=1000)
                            dfSrcData = dfSrcData._current_rows
                            dfSrcData.columns=dfSrcData.columns.str.lower()
                            print("dfSrcData" , type(dfSrcData))
                            print("Extracted           : %s [%s]" % (len(dfSrcData), indxDS))
                            if len(dfSrcData)==0:
                                raise Exception("Entity account not stagged for batch[%s]" % batchId)
                            dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str)
                            dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status']=='staged')]
                            print("homm_entity_status  : %s [Filter]" % (len(dfDataSet)))
                            if len(dfDataSet)==0:
                                raise Exception("Entity account not stagged for batch[%s] after filter=staged" % batchId)
                            dfMigMasterFailed = dfSrcData.copy()
                        else:
                            if not (len(lkupColumns)>0 and len(columnsForComparison)>0 and joinCondition):
                                raise Exception("Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly")
                            
                            dfSrcData = pd.read_sql(query, con=sourceDbEngine)
                            dfSrcData.columns=dfSrcData.columns.str.lower()
                            print("Extracted           : %s [%s]" % (len(dfSrcData), indxDS))
                            if len(dfSrcData)==0:
                                raise Exception("Entity account failed in traversing datasets.")
                            
                            if 'hosa_account_extn_attributes' in list(dfSrcData):
                                for indx, row in dfSrcData.copy().iterrows():
                                    try:
                                        extnVal={}
                                        temp= json.loads(row['hosa_account_extn_attributes']) if row['hosa_account_extn_attributes'] else []
                                        for v in temp:
                                            try:
                                                if 'attributeName' in v and 'attributeValue' in v and v['attributeValue']:
                                                    extnVal.update({v['attributeName']:v['attributeValue']})
                                            except Exception as errx:
                                                print("Error - {} . Line No - {} ".format(str(errx),str(sys.exc_info()[-1].tb_lineno)))
                                        if len(extnVal)>0:
                                            dfSrcData.at[indx, 'hosa_account_extn_attributes'] = extnVal
                                    except Exception as err:
                                        print("Error - {} . Line No - {} ".format(str(err),str(sys.exc_info()[-1].tb_lineno)))
                            
                            if primaryKey:
                                dfSrcData[primaryKey]=uuid.uuid4()
                                dfSrcData[primaryKey]=dfSrcData[primaryKey].apply(lambda _: uuid.uuid4())
                            
                            for indxDateTimCol in dateTimeColumns:
                                #dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat)
                                dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat).where(dfSrcData[indxDateTimCol].notnull(), None)
                            for indxDateCol in dateColumns:
                                dfSrcData[indxDateCol] = dfSrcData[indxDateCol].dt.strftime(dateFormat).where(dfSrcData[indxDateCol].notnull(), None)
                            for indxCharCol in columnsToInt:
                                dfSrcData[indxCharCol] = pd.to_numeric(dfSrcData[indxCharCol], errors='coerce')
                            for indxMiscelCol in columnsToText:
                                dfSrcData[indxMiscelCol] = dfSrcData[indxMiscelCol].astype(str)
                            
                            print("dfDataSet           : %s [Before Merge]" % len(dfDataSet))
                            dfDataSet = dfDataSet.copy().merge(dfSrcData
                                                              ,how=joinCondition
                                                              ,left_on=lkupColumns
                                                              ,right_on=columnsForComparison
                                                              )
                            
                            print("dfDataSet           : %s [After Merge]" % len(dfDataSet))
                            print("Columns             : %s" % (list(dfDataSet)))
                            if len(dfDataSet)==0:
                                raise Exception("LOST RECORDS POST MERGE")
                            
                            dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True)
                            if len(columnsToRemove)>0:
                                print("columnsToRemove     : %s {%s}" % (columnsToRemove, type(columnsToRemove)))
                                dfDataSet.drop(columnsToRemove, axis=1, inplace=True)
                            
                    # endFor==dataset
                else:
                    failedEntities.update({indx: "datasets section not found"})
            else:
                failedEntities.update({indx: "entityAccount section not found"})
        # endFor==>entityType
        
        # DataLoading and update Status
        if len(failedEntities):
            raise Exception("Failed entities: "+str({k:v for k,v in failedEntities.items()}))
        elif len(dfDataSet)==0:
            raise Exception("NO RECORDS TO PROCESS")
        else:
            print("dfDataSet : type{%s}" % type(list(dfDataSet)))
            print(list(dfDataSet))
            print("")
            
            print("Total               : %s" % len(dfDataSet))
            dfDataSet_partyNbrNan = dfDataSet.dropna(subset=["hosa_party_nbr","hosa_customer_party_nbr"])
            print("partyNbr Nan dropped: %s" % len(dfDataSet_partyNbrNan))
            dfDataSet = dfDataSet.copy().loc[~((dfDataSet["hosa_party_nbr"].isnull()) | (dfDataSet["hosa_customer_party_nbr"].isnull()))]
            print("Remaing for loading : %s" % len(dfDataSet))
            
            if 'hosa_party_nbr' in list(dfDataSet):
                print("dfDataSet_partyNbrNan: %s" % len(dfDataSet_partyNbrNan))
                dfDataSet["hosa_party_nbr"]=dfDataSet["hosa_party_nbr"].astype(int)
            
            if 'hosa_customer_party_nbr' in list(dfDataSet):
                dfDataSet["hosa_customer_party_nbr"]=dfDataSet["hosa_customer_party_nbr"].astype(int)
            
            col = dfDataSet.columns
            print('Number of columns :',len(col))
            tableName='hos_account'
            if not loadDataFrameToCassandra(connection_id="cassandrahost"
                                ,table_name=tableName
                                ,df_data=dfDataSet
                                ,load_type_cqlldr=True
                                ,dag_name= kwargs['dag_run'].dag_id
                                ,file_suffix="_"+str(batchId)):
                raise Exception("ODS data store load failed.") 
            
            print("Cassandra dataload submitted")
            
            tableName='hos_migration_master'
            dfDataSet['homm_entity_value']=dfDataSet['hosa_account_id']
            dfDataSet['homm_batch']=batchId
            dfDataSet['homm_entity_id']='Account'
            dfDataSet['homm_entity_status']='odsStaged'
            dfDataSet['homm_updated_on']=datetime.strftime(datetime.utcnow(),dateTimeFormat)
            if not loadDataFrameToCassandra(connection_id="cassandrahost"
                               ,table_name=tableName
                               ,df_data=dfDataSet[['homm_entity_id','homm_entity_status','homm_batch','homm_updated_on','homm_entity_value']]
                               ,load_type_cqlldr=True
                               ,dag_name= kwargs['dag_run'].dag_id
                               ,file_suffix="_"+str(batchId)):
                raise Exception("Staging master odsStaged update failed")
            
            dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str)
            dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']]
                                                                              ,how='left'
                                                                              ,on=['homm_entity_value']
                                                                              ,indicator = True
                                                                              ).loc[lambda x : x['_merge']!='both']
            print("dfMigMasterFailed   : %s" % len(dfMigMasterFailed))
            dfMigMasterFailed['homm_batch']=batchId
            dfMigMasterFailed['homm_entity_id']='Account'
            dfMigMasterFailed['homm_entity_status']='odsFailed'
            dfMigMasterFailed['homm_updated_on']=datetime.strftime(datetime.utcnow(),dateTimeFormat)
            if not loadDataFrameToCassandra(connection_id="cassandrahost"
                               ,table_name=tableName
                               ,df_data=dfMigMasterFailed[['homm_entity_id','homm_entity_status','homm_batch','homm_updated_on','homm_entity_value']]
                               ,load_type_cqlldr=True
                               ,dag_name= kwargs['dag_run'].dag_id
                               ,file_suffix="_"+str(batchId)+"_Failed"):
                raise Exception("Staging master odsFailed update failed")
            
        # endIf
           
        print("SUCCESS")
        gc.collect()
        
    except Exception as exp:
        gc.collect()
        print("Error - {} . Line No - {} ".format(str(exp),str(sys.exc_info()[-1].tb_lineno)))
        raise AirflowException(str(exp))
    



#********************* DAG Tasks********************************************************************************************#

default_args = {'owner': 'airflow'
               ,'depends_on_past': False
               ,'start_date': days_ago(2)
               }

dag = DAG("odsBulkMigration_Account"
         ,catchup=False
         ,default_args=default_args
         ,description="odsBulkMigration_Account"
         ,schedule_interval="@once"
	 ,tags=["odsBulkMigration"]
         )

operatorStageData = PythonOperator(task_id='task_odsBulkMigration_Account'
                                  ,python_callable=fn_processAccount
                                  ,dag=dag
                                  ,provide_context=True
                                  )

#********************* DAG Sequence*******************************************************************************************#

operatorStageData


"""

if __name__ == '__main__':
    fn_processAccount()
"""