from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator
import pandas as pd
from datetime import datetime
import json
import gc
from os import environ as os_environ
from os import path as os_path
import sys
import uuid
import time
#--Invoke utility based function
from utils.setCassandraDBConnection import setCassandraDBConnection
from utils.setDbEngine import setDbEngine
from utils.loadDataFrameToCassandra import loadDataFrameToCassandra
def getConfiguration(__file__):
from configparser import ConfigParser as conf_ConfigParser
#print("DAG __file__ : %s" % __file__)
#Entity-Account
configProperties = conf_ConfigParser()
configFile = __file__.replace('/dags/', '/config/').replace('.py', '.config')
if not os_path.exists(configFile):
print("Entity config file does not exist.")
#print("Entity config File: %s" % configFile)
configProperties.read(configFile)
#Entity-Generic
configMainProperties = conf_ConfigParser()
configMainFile = __file__.replace('/dags/', '/config/')
configMainFile = configMainFile[:configMainFile.find("_")]+".config"
if not os_path.exists(configMainFile):
print("Entity generic config file does not exist.")
#print("Config File : %s" % configMainFile)
configMainProperties.read(configMainFile)
return (configProperties, configMainProperties)
def func_pandas_factory(colnames,rows):
return pd.DataFrame(rows,columns = colnames)
#def fn_processAccount():
# __file__="/app/server/HOBS-DataPipeline/dags/odsBulkMigration/odsBulkMigration_Account.py"
def fn_processAccount(**kwargs):
try:
entity = 'Account'
dagIdVar = os_path.basename(__file__).replace(".py","")
adminVariableValue = {}
cassPath = os_environ.get('CASSANDRA_HOME')
dfDataSet = pd.DataFrame()
dfMigMasterFailed = pd.DataFrame()
failedEntities = {}
if not cassPath:
raise Exception("CASSANDRA_HOME not configured.")
# Configuration Properties
configProperties, configMainProperties = getConfiguration(__file__)
print("configMainProperties")
print(configMainProperties.sections())
print("configProperties")
print(configProperties.sections())
opId = configMainProperties.get("config","opId")
buId = configMainProperties.get("config","buId")
dateTimeFormat = configMainProperties.get("config","dateTimeFormat", raw=True)
dateFormat = configMainProperties.get("config","dateFormat", raw=True)
entityType = json.loads(configProperties.get("config","entityType"))
adminVariableValue = None
dfDataSet = pd.DataFrame()
dfMigMasterFailed = pd.DataFrame()
failedEntities = {}
cassPath = os_environ.get('CASSANDRA_HOME')
if kwargs['dag_run'].conf:
adminVariableValue = kwargs['dag_run'].conf
print ('Airflow conf variable value assigned {odsBulkMigration_Account: %s}' % adminVariableValue)
elif Variable.get("odsBulkMigration_Account",None):
adminVariableValue = Variable.get("odsBulkMigration_Account",None)
print ('Airflow UI variable value assigned {odsBulkMigration_Account: %s}' % adminVariableValue)
adminVariableValue = json.loads(adminVariableValue)
if not adminVariableValue:
raise Exception("adminVariableValue{odsBulkMigration_Account} value not set.")
if 'entityId' in adminVariableValue.keys() and adminVariableValue['entityId']=='Account':
entityId = adminVariableValue['entityId']
else:
raise Exception("entityId not set or entityId is not account.")
if kwargs['dag_run'].conf.get('batchId') :
batchId = int(kwargs['dag_run'].conf.get('batchId'))
print("batchId: %s [conf defined]" % batchId)
elif ('batchId' in adminVariableValue.keys() and adminVariableValue['batchId']!=''):
batchId = int(adminVariableValue['batchId'])
print("batchId: %s [UI defined]" % batchId)
else:
raise Exception("batchId not present or batchId value is not valid")
if not cassPath:
raise Exception("CASSANDRA_HOME not configured.")
if ('opId' in adminVariableValue.keys() and adminVariableValue['opId']):
opId = adminVariableValue['opId']
print("opId : %s [UI defined]" % opId)
if ('buId' in adminVariableValue.keys() and adminVariableValue['buId']):
buId = adminVariableValue['buId']
print("buId : %s [UI defined]" % buId)
# Cassandra database connection
##dbConnectRetVal = setCassandraDBConnection(configMainProperties)
#--Invoke utility based function
dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
if not dbConnectRetVal[0]:
raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1])
cassDbSession = dbConnectRetVal[1]
print("Cassandra database connection established")
# Source database connection
#dbConnectRetVal = setSourceConnection(configMainProperties)
#--Invoke utility based function
dbConnectRetVal = setDbEngine()
if not dbConnectRetVal[0]:
raise Exception("Source database connection failed: " + dbConnectRetVal[1])
sourceDbEngine = dbConnectRetVal[1]
print("Source database connection established")
print("opId : %s" % opId)
print("buId : %s" % buId)
#print("cassPath : %s" % cassPath)
for indx in entityType:
if configProperties.has_section(indx):
print("-------- PROCESSING ENTITY: %s ----------------" % indx)
if configProperties.has_option(indx,"datasets"):
dataset = json.loads(configProperties.get(indx,"datasets"))
print("dataset: %s" % dataset)
for indxDS in dataset:
print("--- Processing dataset: %s" % indxDS)
query=configProperties.get(indx,indxDS+".query",raw=True)
columnsForComparison=json.loads(configProperties.get(indx,indxDS+".columnsForComparison"))
lkupColumns=json.loads(configProperties.get(indx,indxDS+".lkupColumns"))
joinCondition=configProperties.get(indx,indxDS+".joinCondition",raw=True)
primaryKey=configProperties.get(indx,indxDS+".primaryKey",raw=True) if configProperties.has_option(indx,indxDS+".primaryKey") else None
dateTimeColumns=json.loads(configProperties.get(indx,indxDS+".dateTimeColumns")) if configProperties.has_option(indx,indxDS+".dateTimeColumns") else []
dateColumns=json.loads(configProperties.get(indx,indxDS+".dateColumns")) if configProperties.has_option(indx,indxDS+".dateColumns") else []
columnsToRemove=json.loads(configProperties.get(indx,indxDS+".columnsToRemove")) if configProperties.has_option(indx,indxDS+".columnsToRemove") else []
connDatabase=configProperties.get(indx,indxDS+".database",raw=True) if configProperties.has_option(indx,indxDS+".database") else None
columnsToInt=json.loads(configProperties.get(indx,indxDS+".columnsToInt")) if configProperties.has_option(indx,indxDS+".columnsToInt") else []
columnsToText=json.loads(configProperties.get(indx,indxDS+".columnsToText")) if configProperties.has_option(indx,indxDS+".columnsToText") else []
if 'homm_batch' in query.lower():
query=(configProperties.get(indx,indxDS+".query",raw=True) % (batchId))
elif "op_id='%s'" in query.lower().replace(' ','') and "bu_id='%s'" in query.lower().replace(' ',''):
query=(configProperties.get(indx,indxDS+".query",raw=True) % (opId, buId))
print("Query : %s" % query)
print("columnsForComparison: %s" % columnsForComparison)
print("lkupColumns : %s" % lkupColumns)
print("joinCondition : %s" % joinCondition)
print("primaryKey : %s" % primaryKey)
print("dateTimeColumns : %s" % dateTimeColumns)
print("dateColumns : %s" % dateColumns)
print("columnsToInt : %s" % columnsToInt)
print("columnsToText : %s" % columnsToText)
print("connDatabase : %s" % connDatabase)
if indxDS=="stageData":
dfSrcData = cassDbSession.execute(query, timeout=1000)
dfSrcData = dfSrcData._current_rows
dfSrcData.columns=dfSrcData.columns.str.lower()
print("dfSrcData" , type(dfSrcData))
print("Extracted : %s [%s]" % (len(dfSrcData), indxDS))
if len(dfSrcData)==0:
raise Exception("Entity account not stagged for batch[%s]" % batchId)
dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str)
dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status']=='staged')]
print("homm_entity_status : %s [Filter]" % (len(dfDataSet)))
if len(dfDataSet)==0:
raise Exception("Entity account not stagged for batch[%s] after filter=staged" % batchId)
dfMigMasterFailed = dfSrcData.copy()
else:
if not (len(lkupColumns)>0 and len(columnsForComparison)>0 and joinCondition):
raise Exception("Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly")
dfSrcData = pd.read_sql(query, con=sourceDbEngine)
dfSrcData.columns=dfSrcData.columns.str.lower()
print("Extracted : %s [%s]" % (len(dfSrcData), indxDS))
if len(dfSrcData)==0:
raise Exception("Entity account failed in traversing datasets.")
if 'hosa_account_extn_attributes' in list(dfSrcData):
for indx, row in dfSrcData.copy().iterrows():
try:
extnVal={}
temp= json.loads(row['hosa_account_extn_attributes']) if row['hosa_account_extn_attributes'] else []
for v in temp:
try:
if 'attributeName' in v and 'attributeValue' in v and v['attributeValue']:
extnVal.update({v['attributeName']:v['attributeValue']})
except Exception as errx:
print("Error - {} . Line No - {} ".format(str(errx),str(sys.exc_info()[-1].tb_lineno)))
if len(extnVal)>0:
dfSrcData.at[indx, 'hosa_account_extn_attributes'] = extnVal
except Exception as err:
print("Error - {} . Line No - {} ".format(str(err),str(sys.exc_info()[-1].tb_lineno)))
if primaryKey:
dfSrcData[primaryKey]=uuid.uuid4()
dfSrcData[primaryKey]=dfSrcData[primaryKey].apply(lambda _: uuid.uuid4())
for indxDateTimCol in dateTimeColumns:
#dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat)
dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat).where(dfSrcData[indxDateTimCol].notnull(), None)
for indxDateCol in dateColumns:
dfSrcData[indxDateCol] = dfSrcData[indxDateCol].dt.strftime(dateFormat).where(dfSrcData[indxDateCol].notnull(), None)
for indxCharCol in columnsToInt:
dfSrcData[indxCharCol] = pd.to_numeric(dfSrcData[indxCharCol], errors='coerce')
for indxMiscelCol in columnsToText:
dfSrcData[indxMiscelCol] = dfSrcData[indxMiscelCol].astype(str)
print("dfDataSet : %s [Before Merge]" % len(dfDataSet))
dfDataSet = dfDataSet.copy().merge(dfSrcData
,how=joinCondition
,left_on=lkupColumns
,right_on=columnsForComparison
)
print("dfDataSet : %s [After Merge]" % len(dfDataSet))
print("Columns : %s" % (list(dfDataSet)))
if len(dfDataSet)==0:
raise Exception("LOST RECORDS POST MERGE")
dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True)
if len(columnsToRemove)>0:
print("columnsToRemove : %s {%s}" % (columnsToRemove, type(columnsToRemove)))
dfDataSet.drop(columnsToRemove, axis=1, inplace=True)
# endFor==dataset
else:
failedEntities.update({indx: "datasets section not found"})
else:
failedEntities.update({indx: "entityAccount section not found"})
# endFor==>entityType
# DataLoading and update Status
if len(failedEntities):
raise Exception("Failed entities: "+str({k:v for k,v in failedEntities.items()}))
elif len(dfDataSet)==0:
raise Exception("NO RECORDS TO PROCESS")
else:
print("dfDataSet : type{%s}" % type(list(dfDataSet)))
print(list(dfDataSet))
print("")
print("Total : %s" % len(dfDataSet))
dfDataSet_partyNbrNan = dfDataSet.dropna(subset=["hosa_party_nbr","hosa_customer_party_nbr"])
print("partyNbr Nan dropped: %s" % len(dfDataSet_partyNbrNan))
dfDataSet = dfDataSet.copy().loc[~((dfDataSet["hosa_party_nbr"].isnull()) | (dfDataSet["hosa_customer_party_nbr"].isnull()))]
print("Remaing for loading : %s" % len(dfDataSet))
if 'hosa_party_nbr' in list(dfDataSet):
print("dfDataSet_partyNbrNan: %s" % len(dfDataSet_partyNbrNan))
dfDataSet["hosa_party_nbr"]=dfDataSet["hosa_party_nbr"].astype(int)
if 'hosa_customer_party_nbr' in list(dfDataSet):
dfDataSet["hosa_customer_party_nbr"]=dfDataSet["hosa_customer_party_nbr"].astype(int)
col = dfDataSet.columns
print('Number of columns :',len(col))
tableName='hos_account'
if not loadDataFrameToCassandra(connection_id="cassandrahost"
,table_name=tableName
,df_data=dfDataSet
,load_type_cqlldr=True
,dag_name= kwargs['dag_run'].dag_id
,file_suffix="_"+str(batchId)):
raise Exception("ODS data store load failed.")
print("Cassandra dataload submitted")
tableName='hos_migration_master'
dfDataSet['homm_entity_value']=dfDataSet['hosa_account_id']
dfDataSet['homm_batch']=batchId
dfDataSet['homm_entity_id']='Account'
dfDataSet['homm_entity_status']='odsStaged'
dfDataSet['homm_updated_on']=datetime.strftime(datetime.utcnow(),dateTimeFormat)
if not loadDataFrameToCassandra(connection_id="cassandrahost"
,table_name=tableName
,df_data=dfDataSet[['homm_entity_id','homm_entity_status','homm_batch','homm_updated_on','homm_entity_value']]
,load_type_cqlldr=True
,dag_name= kwargs['dag_run'].dag_id
,file_suffix="_"+str(batchId)):
raise Exception("Staging master odsStaged update failed")
dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str)
dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']]
,how='left'
,on=['homm_entity_value']
,indicator = True
).loc[lambda x : x['_merge']!='both']
print("dfMigMasterFailed : %s" % len(dfMigMasterFailed))
dfMigMasterFailed['homm_batch']=batchId
dfMigMasterFailed['homm_entity_id']='Account'
dfMigMasterFailed['homm_entity_status']='odsFailed'
dfMigMasterFailed['homm_updated_on']=datetime.strftime(datetime.utcnow(),dateTimeFormat)
if not loadDataFrameToCassandra(connection_id="cassandrahost"
,table_name=tableName
,df_data=dfMigMasterFailed[['homm_entity_id','homm_entity_status','homm_batch','homm_updated_on','homm_entity_value']]
,load_type_cqlldr=True
,dag_name= kwargs['dag_run'].dag_id
,file_suffix="_"+str(batchId)+"_Failed"):
raise Exception("Staging master odsFailed update failed")
# endIf
print("SUCCESS")
gc.collect()
except Exception as exp:
gc.collect()
print("Error - {} . Line No - {} ".format(str(exp),str(sys.exc_info()[-1].tb_lineno)))
raise AirflowException(str(exp))
#********************* DAG Tasks********************************************************************************************#
default_args = {'owner': 'airflow'
,'depends_on_past': False
,'start_date': days_ago(2)
}
dag = DAG("odsBulkMigration_Account"
,catchup=False
,default_args=default_args
,description="odsBulkMigration_Account"
,schedule_interval="@once"
,tags=["odsBulkMigration"]
)
operatorStageData = PythonOperator(task_id='task_odsBulkMigration_Account'
,python_callable=fn_processAccount
,dag=dag
,provide_context=True
)
#********************* DAG Sequence*******************************************************************************************#
operatorStageData
"""
if __name__ == '__main__':
fn_processAccount()
"""