def fn_processInteraction(**kwargs):
try:
entity = 'Interaction'
dagIdVar = os_path.basename(__file__).replace(".py", "")
adminVariableValue = {}
cassPath = os_environ.get('CASSANDRA_HOME')
dfDataSet = pd.DataFrame()
dfMigMasterFailed = pd.DataFrame()
failedEntities = {}
over_ride = checkDag()
if not cassPath:
raise Exception("CASSANDRA_HOME not configured.")
# Configuration Properties
configProperties, configMainProperties = getConfiguration(__file__)
print("configMainProperties")
print(configMainProperties.sections())
print("configProperties")
print(configProperties.sections())
opId = configMainProperties.get("config", "opId")
buId = configMainProperties.get("config", "buId")
dateTimeFormat = configMainProperties.get("config", "dateTimeFormat", raw=True)
dateFormat = configMainProperties.get("config", "dateFormat", raw=True)
entityType = json.loads(configProperties.get("config", "entityType"))
adminVariableValue = None
dfDataSet = pd.DataFrame()
dfMigMasterFailed = pd.DataFrame()
failedEntities = {}
cassPath = os_environ.get('CASSANDRA_HOME')
df_ref_inter_type = pd_DataFrame()
if Variable.get("odsBulkMigration_Interaction", None):
adminVariableValue = Variable.get("odsBulkMigration_Interaction", None)
print('Airflow UI variable value assigned {odsBulkMigration_Interaction: %s}' % adminVariableValue)
if not adminVariableValue:
raise Exception("adminVariableValue{odsBulkMigration_Interaction} value not set.")
adminVariableValue = json.loads(adminVariableValue)
if 'entityId' in adminVariableValue.keys() and adminVariableValue['entityId'] == 'Interaction':
entityId = adminVariableValue['entityId']
else:
raise Exception("entityId not set or entityId is not Interaction.")
if ('batchId' in adminVariableValue.keys() and adminVariableValue['batchId'] != ''):
batchId = int(adminVariableValue['batchId'])
print("batchId: %s [UI defined]" % batchId)
else:
raise Exception("batchId not present or batchId value is not valid")
if not cassPath:
raise Exception("CASSANDRA_HOME not configured.")
if ('opId' in adminVariableValue.keys() and adminVariableValue['opId']):
opId = adminVariableValue['opId']
print("opId : %s [UI defined]" % opId)
if ('buId' in adminVariableValue.keys() and adminVariableValue['buId']):
buId = adminVariableValue['buId']
print("buId : %s [UI defined]" % buId)
# Cassandra database connection
##dbConnectRetVal = setCassandraDBConnection(configMainProperties)
# --Invoke utility based function
dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
if not dbConnectRetVal[0]:
raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1])
cassDbSession = dbConnectRetVal[1]
print("Cassandra database connection established")
# Source database connection
# dbConnectRetVal = setSourceConnection(configMainProperties)
# --Invoke utility based function
dbConnectRetVal = setDbEngine()
if not dbConnectRetVal[0]:
raise Exception("Source database connection failed: " + dbConnectRetVal[1])
sourceDbEngine = dbConnectRetVal[1]
print("Source database connection established")
print("opId : %s" % opId)
print("buId : %s" % buId)
print("cassPath : %s" % cassPath)
for indx in entityType:
if configProperties.has_section(indx):
print("-------- PROCESSING ENTITY: %s ----------------" % indx)
if configProperties.has_option(indx, "datasets"):
dataset = json.loads(configProperties.get(indx, "datasets"))
print("dataset: %s" % dataset)
if configProperties.has_option(indx, "datasets"):
dataset = json.loads(configProperties.get(indx, "datasets"))
print("dataset: %s" % dataset)
for indxDS in dataset:
if over_ride == 'YES' and indxDS == 'stageData':
continue # skip stageData dataset processing
print("--- Processing dataset: %s" % indxDS)
query = configProperties.get(indx, indxDS + ".query", raw=True)
columnsForComparison = json.loads(configProperties.get(indx, indxDS + ".columnsForComparison"))
lkupColumns = json.loads(configProperties.get(indx, indxDS + ".lkupColumns"))
joinCondition = configProperties.get(indx, indxDS + ".joinCondition", raw=True)
primaryKey = configProperties.get(indx, indxDS + ".primaryKey",
raw=True) if configProperties.has_option(indx,
indxDS + ".primaryKey") else None
dateTimeColumns = json.loads(
configProperties.get(indx, indxDS + ".dateTimeColumns")) if configProperties.has_option(
indx, indxDS + ".dateTimeColumns") else []
dateColumns = json.loads(
configProperties.get(indx, indxDS + ".dateColumns")) if configProperties.has_option(indx,
indxDS + ".dateColumns") else []
columnsToRemove = json.loads(
configProperties.get(indx, indxDS + ".columnsToRemove")) if configProperties.has_option(
indx, indxDS + ".columnsToRemove") else []
connDatabase = configProperties.get(indx, indxDS + ".database",
raw=True) if configProperties.has_option(indx,
indxDS + ".database") else None
columnsToInt = json.loads(
configProperties.get(indx, indxDS + ".columnsToInt")) if configProperties.has_option(indx,
indxDS + ".columnsToInt") else []
columnsToText = json.loads(
configProperties.get(indx, indxDS + ".columnsToText")) if configProperties.has_option(indx,
indxDS + ".columnsToText") else []
if 'homm_batch' in query.lower():
query = (configProperties.get(indx, indxDS + ".query", raw=True) % (batchId))
elif "op_id='%s'" in query.lower().replace(' ', '') and "bu_id='%s'" in query.lower().replace(
' ', ''):
query = (configProperties.get(indx, indxDS + ".query", raw=True) % (opId, buId))
if indxDS == "stageData":
dfSrcData = cassDbSession.execute(query, timeout=1000)
dfSrcData = dfSrcData._current_rows
dfSrcData.columns = dfSrcData.columns.str.lower()
print("dfSrcData", type(dfSrcData))
print("Extracted : %s [%s]" % (len(dfSrcData), indxDS))
if len(dfSrcData) == 0:
raise Exception("Entity Interaction not stagged for batch[%s]" % batchId)
dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str)
dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status'] == 'staged')]
print("homm_entity_status : %s [Filter]" % (len(dfDataSet)))
if len(dfDataSet) == 0:
raise Exception(
"Entity Interaction not stagged for batch[%s] after filter=staged" % batchId)
dfMigMasterFailed = dfSrcData.copy()
else:
if not (len(lkupColumns) > 0 and len(columnsForComparison) > 0 and joinCondition):
raise Exception(
"Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly")
dfSrcData = pd.read_sql(query, con=sourceDbEngine)
dfSrcData.columns = dfSrcData.columns.str.lower()
print("Extracted : %s [%s]" % (len(dfSrcData), indxDS))
if over_ride != 'YES' and indxDS != 'interaction':
if len(dfSrcData) == 0:
raise Exception("Entity Interaction failed in traversing datasets.")
print("dfDataSet : %s [Before Merge]" % len(dfDataSet))
if over_ride == 'YES' and indxDS != 'interaction':
dfDataSet = dfDataSet.copy().merge(dfSrcData
, how=joinCondition
, left_on=lkupColumns
, right_on=columnsForComparison
)
if over_ride == 'YES' and indxDS == 'interaction':
dfDataSet=dfSrcData.copy()
print("dfDataSet : %s [After Merge]" % len(dfDataSet))
print("Columns : %s" % (list(dfDataSet)))
if len(dfDataSet) == 0:
raise Exception("LOST RECORDS POST MERGE")
dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True)
if over_ride != 'YES' and indxDS != 'interaction':
if len(columnsToRemove) > 0:
print("columnsToRemove : %s {%s}" % (columnsToRemove, type(columnsToRemove)))
dfDataSet.drop(columnsToRemove, axis=1, inplace=True)
# endFor==dataset
else:
failedEntities.update({indx: "datasets section not found"})
else:
failedEntities.update({indx: "entityInteraction section not found"})
# endFor==>entityType
# DataLoading and update Status
if len(failedEntities):
raise Exception("Failed entities: " + str({k: v for k, v in failedEntities.items()}))
elif len(dfDataSet) == 0:
raise Exception("NO RECORDS TO PROCESS")
else:
print("dfDataSet : type{%s}" % type(list(dfDataSet)))
print(list(dfDataSet))
print("")
print("Total : %s" % len(dfDataSet))
tableName = 'hos_interaction'
if not dataPopulate(tableName
, dfDataSet
, configMainProperties
, "_" + str(batchId)):
raise Exception("ODS data store load failed.")
print("Cassandra dataload submitted")
tableName = 'hos_migration_master'
dfDataSet['homm_entity_value'] = dfDataSet['hosi_interaction_id']
dfDataSet['homm_batch'] = batchId
dfDataSet['homm_entity_id'] = 'interaction'
dfDataSet['homm_entity_status'] = 'odsStaged'
dfDataSet['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat)
if not dataPopulate(tableName
, dfDataSet[['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on',
'homm_entity_value']]
, configMainProperties
, "_" + str(batchId)):
raise Exception("Staging master odsStaged update failed")
dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str)
dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']]
, how='left'
, on=['homm_entity_value']
, indicator=True
).loc[lambda x: x['_merge'] != 'both']
print("dfMigMasterFailed : %s" % len(dfMigMasterFailed))
dfMigMasterFailed['homm_batch'] = batchId
dfMigMasterFailed['homm_entity_id'] = 'interaction'
dfMigMasterFailed['homm_entity_status'] = 'odsFailed'
dfMigMasterFailed['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat)
if not dataPopulate(tableName
, dfMigMasterFailed[['homm_entity_id', 'homm_entity_status', 'homm_batch',
'homm_entity_value']]
, configMainProperties
, "_" + str(batchId) + "_Failed"):
raise Exception("Staging master odsFailed update failed")
# endIf
print("SUCCESS")
gc.collect()
except Exception as exp:
gc.collect()
print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
raise AirflowException(str(exp))