Untitled
unknown
plain_text
2 years ago
15 kB
1
Indexable
Never
def fn_processInteraction(**kwargs): try: entity = 'Interaction' dagIdVar = os_path.basename(__file__).replace(".py", "") adminVariableValue = {} cassPath = os_environ.get('CASSANDRA_HOME') dfDataSet = pd.DataFrame() dfMigMasterFailed = pd.DataFrame() failedEntities = {} over_ride = checkDag() if not cassPath: raise Exception("CASSANDRA_HOME not configured.") # Configuration Properties configProperties, configMainProperties = getConfiguration(__file__) print("configMainProperties") print(configMainProperties.sections()) print("configProperties") print(configProperties.sections()) opId = configMainProperties.get("config", "opId") buId = configMainProperties.get("config", "buId") dateTimeFormat = configMainProperties.get("config", "dateTimeFormat", raw=True) dateFormat = configMainProperties.get("config", "dateFormat", raw=True) entityType = json.loads(configProperties.get("config", "entityType")) adminVariableValue = None dfDataSet = pd.DataFrame() dfMigMasterFailed = pd.DataFrame() failedEntities = {} cassPath = os_environ.get('CASSANDRA_HOME') df_ref_inter_type = pd_DataFrame() if Variable.get("odsBulkMigration_Interaction", None): adminVariableValue = Variable.get("odsBulkMigration_Interaction", None) print('Airflow UI variable value assigned {odsBulkMigration_Interaction: %s}' % adminVariableValue) if not adminVariableValue: raise Exception("adminVariableValue{odsBulkMigration_Interaction} value not set.") adminVariableValue = json.loads(adminVariableValue) if 'entityId' in adminVariableValue.keys() and adminVariableValue['entityId'] == 'Interaction': entityId = adminVariableValue['entityId'] else: raise Exception("entityId not set or entityId is not Interaction.") if ('batchId' in adminVariableValue.keys() and adminVariableValue['batchId'] != ''): batchId = int(adminVariableValue['batchId']) print("batchId: %s [UI defined]" % batchId) else: raise Exception("batchId not present or batchId value is not valid") if not cassPath: raise Exception("CASSANDRA_HOME not configured.") if ('opId' in adminVariableValue.keys() and adminVariableValue['opId']): opId = adminVariableValue['opId'] print("opId : %s [UI defined]" % opId) if ('buId' in adminVariableValue.keys() and adminVariableValue['buId']): buId = adminVariableValue['buId'] print("buId : %s [UI defined]" % buId) # Cassandra database connection ##dbConnectRetVal = setCassandraDBConnection(configMainProperties) # --Invoke utility based function dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True) if not dbConnectRetVal[0]: raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1]) cassDbSession = dbConnectRetVal[1] print("Cassandra database connection established") # Source database connection # dbConnectRetVal = setSourceConnection(configMainProperties) # --Invoke utility based function dbConnectRetVal = setDbEngine() if not dbConnectRetVal[0]: raise Exception("Source database connection failed: " + dbConnectRetVal[1]) sourceDbEngine = dbConnectRetVal[1] print("Source database connection established") print("opId : %s" % opId) print("buId : %s" % buId) print("cassPath : %s" % cassPath) for indx in entityType: if configProperties.has_section(indx): print("-------- PROCESSING ENTITY: %s ----------------" % indx) if configProperties.has_option(indx, "datasets"): dataset = json.loads(configProperties.get(indx, "datasets")) print("dataset: %s" % dataset) if configProperties.has_option(indx, "datasets"): dataset = json.loads(configProperties.get(indx, "datasets")) print("dataset: %s" % dataset) for indxDS in dataset: if over_ride == 'YES' and indxDS == 'stageData': continue # skip stageData dataset processing print("--- Processing dataset: %s" % indxDS) query = configProperties.get(indx, indxDS + ".query", raw=True) columnsForComparison = json.loads(configProperties.get(indx, indxDS + ".columnsForComparison")) lkupColumns = json.loads(configProperties.get(indx, indxDS + ".lkupColumns")) joinCondition = configProperties.get(indx, indxDS + ".joinCondition", raw=True) primaryKey = configProperties.get(indx, indxDS + ".primaryKey", raw=True) if configProperties.has_option(indx, indxDS + ".primaryKey") else None dateTimeColumns = json.loads( configProperties.get(indx, indxDS + ".dateTimeColumns")) if configProperties.has_option( indx, indxDS + ".dateTimeColumns") else [] dateColumns = json.loads( configProperties.get(indx, indxDS + ".dateColumns")) if configProperties.has_option(indx, indxDS + ".dateColumns") else [] columnsToRemove = json.loads( configProperties.get(indx, indxDS + ".columnsToRemove")) if configProperties.has_option( indx, indxDS + ".columnsToRemove") else [] connDatabase = configProperties.get(indx, indxDS + ".database", raw=True) if configProperties.has_option(indx, indxDS + ".database") else None columnsToInt = json.loads( configProperties.get(indx, indxDS + ".columnsToInt")) if configProperties.has_option(indx, indxDS + ".columnsToInt") else [] columnsToText = json.loads( configProperties.get(indx, indxDS + ".columnsToText")) if configProperties.has_option(indx, indxDS + ".columnsToText") else [] if 'homm_batch' in query.lower(): query = (configProperties.get(indx, indxDS + ".query", raw=True) % (batchId)) elif "op_id='%s'" in query.lower().replace(' ', '') and "bu_id='%s'" in query.lower().replace( ' ', ''): query = (configProperties.get(indx, indxDS + ".query", raw=True) % (opId, buId)) if indxDS == "stageData": dfSrcData = cassDbSession.execute(query, timeout=1000) dfSrcData = dfSrcData._current_rows dfSrcData.columns = dfSrcData.columns.str.lower() print("dfSrcData", type(dfSrcData)) print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if len(dfSrcData) == 0: raise Exception("Entity Interaction not stagged for batch[%s]" % batchId) dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str) dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status'] == 'staged')] print("homm_entity_status : %s [Filter]" % (len(dfDataSet))) if len(dfDataSet) == 0: raise Exception( "Entity Interaction not stagged for batch[%s] after filter=staged" % batchId) dfMigMasterFailed = dfSrcData.copy() else: if not (len(lkupColumns) > 0 and len(columnsForComparison) > 0 and joinCondition): raise Exception( "Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly") dfSrcData = pd.read_sql(query, con=sourceDbEngine) dfSrcData.columns = dfSrcData.columns.str.lower() print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if over_ride != 'YES' and indxDS != 'interaction': if len(dfSrcData) == 0: raise Exception("Entity Interaction failed in traversing datasets.") print("dfDataSet : %s [Before Merge]" % len(dfDataSet)) if over_ride == 'YES' and indxDS != 'interaction': dfDataSet = dfDataSet.copy().merge(dfSrcData , how=joinCondition , left_on=lkupColumns , right_on=columnsForComparison ) if over_ride == 'YES' and indxDS == 'interaction': dfDataSet=dfSrcData.copy() print("dfDataSet : %s [After Merge]" % len(dfDataSet)) print("Columns : %s" % (list(dfDataSet))) if len(dfDataSet) == 0: raise Exception("LOST RECORDS POST MERGE") dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True) if over_ride != 'YES' and indxDS != 'interaction': if len(columnsToRemove) > 0: print("columnsToRemove : %s {%s}" % (columnsToRemove, type(columnsToRemove))) dfDataSet.drop(columnsToRemove, axis=1, inplace=True) # endFor==dataset else: failedEntities.update({indx: "datasets section not found"}) else: failedEntities.update({indx: "entityInteraction section not found"}) # endFor==>entityType # DataLoading and update Status if len(failedEntities): raise Exception("Failed entities: " + str({k: v for k, v in failedEntities.items()})) elif len(dfDataSet) == 0: raise Exception("NO RECORDS TO PROCESS") else: print("dfDataSet : type{%s}" % type(list(dfDataSet))) print(list(dfDataSet)) print("") print("Total : %s" % len(dfDataSet)) tableName = 'hos_interaction' if not dataPopulate(tableName , dfDataSet , configMainProperties , "_" + str(batchId)): raise Exception("ODS data store load failed.") print("Cassandra dataload submitted") tableName = 'hos_migration_master' dfDataSet['homm_entity_value'] = dfDataSet['hosi_interaction_id'] dfDataSet['homm_batch'] = batchId dfDataSet['homm_entity_id'] = 'interaction' dfDataSet['homm_entity_status'] = 'odsStaged' dfDataSet['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat) if not dataPopulate(tableName , dfDataSet[['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on', 'homm_entity_value']] , configMainProperties , "_" + str(batchId)): raise Exception("Staging master odsStaged update failed") dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str) dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']] , how='left' , on=['homm_entity_value'] , indicator=True ).loc[lambda x: x['_merge'] != 'both'] print("dfMigMasterFailed : %s" % len(dfMigMasterFailed)) dfMigMasterFailed['homm_batch'] = batchId dfMigMasterFailed['homm_entity_id'] = 'interaction' dfMigMasterFailed['homm_entity_status'] = 'odsFailed' dfMigMasterFailed['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat) if not dataPopulate(tableName , dfMigMasterFailed[['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_entity_value']] , configMainProperties , "_" + str(batchId) + "_Failed"): raise Exception("Staging master odsFailed update failed") # endIf print("SUCCESS") gc.collect() except Exception as exp: gc.collect() print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) raise AirflowException(str(exp))