Untitled
unknown
plain_text
a year ago
21 kB
2
Indexable
Never
from airflow import DAG from airflow.utils.dates import days_ago from airflow.models import Variable from airflow import AirflowException from airflow.operators.python_operator import PythonOperator import pandas as pd from datetime import datetime import json import gc from os import environ as os_environ from os import path as os_path import sys import uuid import time #--Invoke utility based function from utils.setCassandraDBConnection import setCassandraDBConnection from utils.setDbEngine import setDbEngine from utils.loadDataFrameToCassandra import loadDataFrameToCassandra def getConfiguration(__file__): from configparser import ConfigParser as conf_ConfigParser #print("DAG __file__ : %s" % __file__) #Entity-Account configProperties = conf_ConfigParser() configFile = __file__.replace('/dags/', '/config/').replace('.py', '.config') if not os_path.exists(configFile): print("Entity config file does not exist.") #print("Entity config File: %s" % configFile) configProperties.read(configFile) #Entity-Generic configMainProperties = conf_ConfigParser() configMainFile = __file__.replace('/dags/', '/config/') configMainFile = configMainFile[:configMainFile.find("_")]+".config" if not os_path.exists(configMainFile): print("Entity generic config file does not exist.") #print("Config File : %s" % configMainFile) configMainProperties.read(configMainFile) return (configProperties, configMainProperties) def func_pandas_factory(colnames,rows): return pd.DataFrame(rows,columns = colnames) #def fn_processAccount(): # __file__="/app/server/HOBS-DataPipeline/dags/odsBulkMigration/odsBulkMigration_Account.py" def fn_processAccount(**kwargs): try: entity = 'Account' dagIdVar = os_path.basename(__file__).replace(".py","") adminVariableValue = {} cassPath = os_environ.get('CASSANDRA_HOME') dfDataSet = pd.DataFrame() dfMigMasterFailed = pd.DataFrame() failedEntities = {} if not cassPath: raise Exception("CASSANDRA_HOME not configured.") # Configuration Properties configProperties, configMainProperties = getConfiguration(__file__) print("configMainProperties") print(configMainProperties.sections()) print("configProperties") print(configProperties.sections()) opId = configMainProperties.get("config","opId") buId = configMainProperties.get("config","buId") dateTimeFormat = configMainProperties.get("config","dateTimeFormat", raw=True) dateFormat = configMainProperties.get("config","dateFormat", raw=True) entityType = json.loads(configProperties.get("config","entityType")) adminVariableValue = None dfDataSet = pd.DataFrame() dfMigMasterFailed = pd.DataFrame() failedEntities = {} cassPath = os_environ.get('CASSANDRA_HOME') if kwargs['dag_run'].conf: adminVariableValue = kwargs['dag_run'].conf print ('Airflow conf variable value assigned {odsBulkMigration_Account: %s}' % adminVariableValue) elif Variable.get("odsBulkMigration_Account",None): adminVariableValue = Variable.get("odsBulkMigration_Account",None) print ('Airflow UI variable value assigned {odsBulkMigration_Account: %s}' % adminVariableValue) adminVariableValue = json.loads(adminVariableValue) if not adminVariableValue: raise Exception("adminVariableValue{odsBulkMigration_Account} value not set.") if 'entityId' in adminVariableValue.keys() and adminVariableValue['entityId']=='Account': entityId = adminVariableValue['entityId'] else: raise Exception("entityId not set or entityId is not account.") if kwargs['dag_run'].conf.get('batchId') : batchId = int(kwargs['dag_run'].conf.get('batchId')) print("batchId: %s [conf defined]" % batchId) elif ('batchId' in adminVariableValue.keys() and adminVariableValue['batchId']!=''): batchId = int(adminVariableValue['batchId']) print("batchId: %s [UI defined]" % batchId) else: raise Exception("batchId not present or batchId value is not valid") if not cassPath: raise Exception("CASSANDRA_HOME not configured.") if ('opId' in adminVariableValue.keys() and adminVariableValue['opId']): opId = adminVariableValue['opId'] print("opId : %s [UI defined]" % opId) if ('buId' in adminVariableValue.keys() and adminVariableValue['buId']): buId = adminVariableValue['buId'] print("buId : %s [UI defined]" % buId) # Cassandra database connection ##dbConnectRetVal = setCassandraDBConnection(configMainProperties) #--Invoke utility based function dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True) if not dbConnectRetVal[0]: raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1]) cassDbSession = dbConnectRetVal[1] print("Cassandra database connection established") # Source database connection #dbConnectRetVal = setSourceConnection(configMainProperties) #--Invoke utility based function dbConnectRetVal = setDbEngine() if not dbConnectRetVal[0]: raise Exception("Source database connection failed: " + dbConnectRetVal[1]) sourceDbEngine = dbConnectRetVal[1] print("Source database connection established") print("opId : %s" % opId) print("buId : %s" % buId) #print("cassPath : %s" % cassPath) for indx in entityType: if configProperties.has_section(indx): print("-------- PROCESSING ENTITY: %s ----------------" % indx) if configProperties.has_option(indx,"datasets"): dataset = json.loads(configProperties.get(indx,"datasets")) print("dataset: %s" % dataset) for indxDS in dataset: print("--- Processing dataset: %s" % indxDS) query=configProperties.get(indx,indxDS+".query",raw=True) columnsForComparison=json.loads(configProperties.get(indx,indxDS+".columnsForComparison")) lkupColumns=json.loads(configProperties.get(indx,indxDS+".lkupColumns")) joinCondition=configProperties.get(indx,indxDS+".joinCondition",raw=True) primaryKey=configProperties.get(indx,indxDS+".primaryKey",raw=True) if configProperties.has_option(indx,indxDS+".primaryKey") else None dateTimeColumns=json.loads(configProperties.get(indx,indxDS+".dateTimeColumns")) if configProperties.has_option(indx,indxDS+".dateTimeColumns") else [] dateColumns=json.loads(configProperties.get(indx,indxDS+".dateColumns")) if configProperties.has_option(indx,indxDS+".dateColumns") else [] columnsToRemove=json.loads(configProperties.get(indx,indxDS+".columnsToRemove")) if configProperties.has_option(indx,indxDS+".columnsToRemove") else [] connDatabase=configProperties.get(indx,indxDS+".database",raw=True) if configProperties.has_option(indx,indxDS+".database") else None columnsToInt=json.loads(configProperties.get(indx,indxDS+".columnsToInt")) if configProperties.has_option(indx,indxDS+".columnsToInt") else [] columnsToText=json.loads(configProperties.get(indx,indxDS+".columnsToText")) if configProperties.has_option(indx,indxDS+".columnsToText") else [] if 'homm_batch' in query.lower(): query=(configProperties.get(indx,indxDS+".query",raw=True) % (batchId)) elif "op_id='%s'" in query.lower().replace(' ','') and "bu_id='%s'" in query.lower().replace(' ',''): query=(configProperties.get(indx,indxDS+".query",raw=True) % (opId, buId)) print("Query : %s" % query) print("columnsForComparison: %s" % columnsForComparison) print("lkupColumns : %s" % lkupColumns) print("joinCondition : %s" % joinCondition) print("primaryKey : %s" % primaryKey) print("dateTimeColumns : %s" % dateTimeColumns) print("dateColumns : %s" % dateColumns) print("columnsToInt : %s" % columnsToInt) print("columnsToText : %s" % columnsToText) print("connDatabase : %s" % connDatabase) if indxDS=="stageData": dfSrcData = cassDbSession.execute(query, timeout=1000) dfSrcData = dfSrcData._current_rows dfSrcData.columns=dfSrcData.columns.str.lower() print("dfSrcData" , type(dfSrcData)) print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if len(dfSrcData)==0: raise Exception("Entity account not stagged for batch[%s]" % batchId) dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str) dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status']=='staged')] print("homm_entity_status : %s [Filter]" % (len(dfDataSet))) if len(dfDataSet)==0: raise Exception("Entity account not stagged for batch[%s] after filter=staged" % batchId) dfMigMasterFailed = dfSrcData.copy() else: if not (len(lkupColumns)>0 and len(columnsForComparison)>0 and joinCondition): raise Exception("Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly") dfSrcData = pd.read_sql(query, con=sourceDbEngine) dfSrcData.columns=dfSrcData.columns.str.lower() print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if len(dfSrcData)==0: raise Exception("Entity account failed in traversing datasets.") if 'hosa_account_extn_attributes' in list(dfSrcData): for indx, row in dfSrcData.copy().iterrows(): try: extnVal={} temp= json.loads(row['hosa_account_extn_attributes']) if row['hosa_account_extn_attributes'] else [] for v in temp: try: if 'attributeName' in v and 'attributeValue' in v and v['attributeValue']: extnVal.update({v['attributeName']:v['attributeValue']}) except Exception as errx: print("Error - {} . Line No - {} ".format(str(errx),str(sys.exc_info()[-1].tb_lineno))) if len(extnVal)>0: dfSrcData.at[indx, 'hosa_account_extn_attributes'] = extnVal except Exception as err: print("Error - {} . Line No - {} ".format(str(err),str(sys.exc_info()[-1].tb_lineno))) if primaryKey: dfSrcData[primaryKey]=uuid.uuid4() dfSrcData[primaryKey]=dfSrcData[primaryKey].apply(lambda _: uuid.uuid4()) for indxDateTimCol in dateTimeColumns: #dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat) dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat).where(dfSrcData[indxDateTimCol].notnull(), None) for indxDateCol in dateColumns: dfSrcData[indxDateCol] = dfSrcData[indxDateCol].dt.strftime(dateFormat).where(dfSrcData[indxDateCol].notnull(), None) for indxCharCol in columnsToInt: dfSrcData[indxCharCol] = pd.to_numeric(dfSrcData[indxCharCol], errors='coerce') for indxMiscelCol in columnsToText: dfSrcData[indxMiscelCol] = dfSrcData[indxMiscelCol].astype(str) print("dfDataSet : %s [Before Merge]" % len(dfDataSet)) dfDataSet = dfDataSet.copy().merge(dfSrcData ,how=joinCondition ,left_on=lkupColumns ,right_on=columnsForComparison ) print("dfDataSet : %s [After Merge]" % len(dfDataSet)) print("Columns : %s" % (list(dfDataSet))) if len(dfDataSet)==0: raise Exception("LOST RECORDS POST MERGE") dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True) if len(columnsToRemove)>0: print("columnsToRemove : %s {%s}" % (columnsToRemove, type(columnsToRemove))) dfDataSet.drop(columnsToRemove, axis=1, inplace=True) # endFor==dataset else: failedEntities.update({indx: "datasets section not found"}) else: failedEntities.update({indx: "entityAccount section not found"}) # endFor==>entityType # DataLoading and update Status if len(failedEntities): raise Exception("Failed entities: "+str({k:v for k,v in failedEntities.items()})) elif len(dfDataSet)==0: raise Exception("NO RECORDS TO PROCESS") else: print("dfDataSet : type{%s}" % type(list(dfDataSet))) print(list(dfDataSet)) print("") print("Total : %s" % len(dfDataSet)) dfDataSet_partyNbrNan = dfDataSet.dropna(subset=["hosa_party_nbr","hosa_customer_party_nbr"]) print("partyNbr Nan dropped: %s" % len(dfDataSet_partyNbrNan)) dfDataSet = dfDataSet.copy().loc[~((dfDataSet["hosa_party_nbr"].isnull()) | (dfDataSet["hosa_customer_party_nbr"].isnull()))] print("Remaing for loading : %s" % len(dfDataSet)) if 'hosa_party_nbr' in list(dfDataSet): print("dfDataSet_partyNbrNan: %s" % len(dfDataSet_partyNbrNan)) dfDataSet["hosa_party_nbr"]=dfDataSet["hosa_party_nbr"].astype(int) if 'hosa_customer_party_nbr' in list(dfDataSet): dfDataSet["hosa_customer_party_nbr"]=dfDataSet["hosa_customer_party_nbr"].astype(int) col = dfDataSet.columns print('Number of columns :',len(col)) tableName='hos_account' if not loadDataFrameToCassandra(connection_id="cassandrahost" ,table_name=tableName ,df_data=dfDataSet ,load_type_cqlldr=True ,dag_name= kwargs['dag_run'].dag_id ,file_suffix="_"+str(batchId)): raise Exception("ODS data store load failed.") print("Cassandra dataload submitted") tableName='hos_migration_master' dfDataSet['homm_entity_value']=dfDataSet['hosa_account_id'] dfDataSet['homm_batch']=batchId dfDataSet['homm_entity_id']='Account' dfDataSet['homm_entity_status']='odsStaged' dfDataSet['homm_updated_on']=datetime.strftime(datetime.utcnow(),dateTimeFormat) if not loadDataFrameToCassandra(connection_id="cassandrahost" ,table_name=tableName ,df_data=dfDataSet[['homm_entity_id','homm_entity_status','homm_batch','homm_updated_on','homm_entity_value']] ,load_type_cqlldr=True ,dag_name= kwargs['dag_run'].dag_id ,file_suffix="_"+str(batchId)): raise Exception("Staging master odsStaged update failed") dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str) dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']] ,how='left' ,on=['homm_entity_value'] ,indicator = True ).loc[lambda x : x['_merge']!='both'] print("dfMigMasterFailed : %s" % len(dfMigMasterFailed)) dfMigMasterFailed['homm_batch']=batchId dfMigMasterFailed['homm_entity_id']='Account' dfMigMasterFailed['homm_entity_status']='odsFailed' dfMigMasterFailed['homm_updated_on']=datetime.strftime(datetime.utcnow(),dateTimeFormat) if not loadDataFrameToCassandra(connection_id="cassandrahost" ,table_name=tableName ,df_data=dfMigMasterFailed[['homm_entity_id','homm_entity_status','homm_batch','homm_updated_on','homm_entity_value']] ,load_type_cqlldr=True ,dag_name= kwargs['dag_run'].dag_id ,file_suffix="_"+str(batchId)+"_Failed"): raise Exception("Staging master odsFailed update failed") # endIf print("SUCCESS") gc.collect() except Exception as exp: gc.collect() print("Error - {} . Line No - {} ".format(str(exp),str(sys.exc_info()[-1].tb_lineno))) raise AirflowException(str(exp)) #********************* DAG Tasks********************************************************************************************# default_args = {'owner': 'airflow' ,'depends_on_past': False ,'start_date': days_ago(2) } dag = DAG("odsBulkMigration_Account" ,catchup=False ,default_args=default_args ,description="odsBulkMigration_Account" ,schedule_interval="@once" ,tags=["odsBulkMigration"] ) operatorStageData = PythonOperator(task_id='task_odsBulkMigration_Account' ,python_callable=fn_processAccount ,dag=dag ,provide_context=True ) #********************* DAG Sequence*******************************************************************************************# operatorStageData """ if __name__ == '__main__': fn_processAccount() """