Untitled
unknown
plain_text
2 years ago
15 kB
4
Indexable
if over_ride == 'NO': dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True) if not dbConnectRetVal[0]: raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1]) cassDbSession = dbConnectRetVal[1] print("Cassandra database connection established") print(over_ride) # Source database connection # dbConnectRetVal = setSourceConnection(configMainProperties) # --Invoke utility based function dbConnectRetVal = setDbEngine() if not dbConnectRetVal[0]: raise Exception("Source database connection failed: " + dbConnectRetVal[1]) sourceDbEngine = dbConnectRetVal[1] print("Source database connection established") print("opId : %s" % opId) print("buId : %s" % buId) print("cassPath : %s" % cassPath) for indx in entityType: if configProperties.has_section(indx): print("-------- PROCESSING ENTITY: %s ----------------" % indx) if configProperties.has_option(indx, "datasets"): dataset = json.loads(configProperties.get(indx, "datasets")) print("dataset: %s" % dataset) for indxDS in dataset: print("--- Processing dataset: %s" % indxDS) query = configProperties.get(indx, indxDS + ".query", raw=True) columnsForComparison = json.loads(configProperties.get(indx, indxDS + ".columnsForComparison")) lkupColumns = json.loads(configProperties.get(indx, indxDS + ".lkupColumns")) joinCondition = configProperties.get(indx, indxDS + ".joinCondition", raw=True) primaryKey = configProperties.get(indx, indxDS + ".primaryKey", raw=True) if configProperties.has_option(indx, indxDS + ".primaryKey") else None dateTimeColumns = json.loads( configProperties.get(indx, indxDS + ".dateTimeColumns")) if configProperties.has_option( indx, indxDS + ".dateTimeColumns") else [] dateColumns = json.loads( configProperties.get(indx, indxDS + ".dateColumns")) if configProperties.has_option(indx, indxDS + ".dateColumns") else [] columnsToRemove = json.loads( configProperties.get(indx, indxDS + ".columnsToRemove")) if configProperties.has_option( indx, indxDS + ".columnsToRemove") else [] connDatabase = configProperties.get(indx, indxDS + ".database", raw=True) if configProperties.has_option(indx, indxDS + ".database") else None columnsToInt = json.loads( configProperties.get(indx, indxDS + ".columnsToInt")) if configProperties.has_option(indx, indxDS + ".columnsToInt") else [] columnsToText = json.loads( configProperties.get(indx, indxDS + ".columnsToText")) if configProperties.has_option(indx, indxDS + ".columnsToText") else [] if 'homm_batch' in query.lower(): query = (configProperties.get(indx, indxDS + ".query", raw=True) % (batchId)) elif "op_id='%s'" in query.lower().replace(' ', '') and "bu_id='%s'" in query.lower().replace( ' ', ''): query = (configProperties.get(indx, indxDS + ".query", raw=True) % (opId, buId)) print("Query : %s" % query) print("columnsForComparison: %s" % columnsForComparison) print("lkupColumns : %s" % lkupColumns) print("joinCondition : %s" % joinCondition) print("primaryKey : %s" % primaryKey) print("dateTimeColumns : %s" % dateTimeColumns) print("dateColumns : %s" % dateColumns) print("columnsToInt : %s" % columnsToInt) print("columnsToText : %s" % columnsToText) print("connDatabase : %s" % connDatabase) if indxDS == "stageData": dfSrcData = cassDbSession.execute(query, timeout=1000) dfSrcData = dfSrcData._current_rows dfSrcData.columns = dfSrcData.columns.str.lower() print("dfSrcData", type(dfSrcData)) print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if len(dfSrcData) == 0: raise Exception("Entity Interaction not stagged for batch[%s]" % batchId) dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str) dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status'] == 'staged')] print("homm_entity_status : %s [Filter]" % (len(dfDataSet))) if len(dfDataSet) == 0: raise Exception( "Entity Interaction not stagged for batch[%s] after filter=staged" % batchId) dfMigMasterFailed = dfSrcData.copy() else: if not (len(lkupColumns) > 0 and len(columnsForComparison) > 0 and joinCondition): raise Exception( "Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly") dfSrcData = pd.read_sql(query, con=sourceDbEngine) dfSrcData.columns = dfSrcData.columns.str.lower() print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if len(dfSrcData) == 0: raise Exception("Entity Interaction failed in traversing datasets.") if 'hosi_account_extn_attributes' in list(dfSrcData): for indx, row in dfSrcData.copy().iterrows(): try: extnVal = {} temp = json.loads(row['hosi_account_extn_attributes']) if row[ 'hosi_account_extn_attributes'] else [] for v in temp: try: if 'attributeName' in v and 'attributeValue' in v and v[ 'attributeValue']: extnVal.update({v['attributeName']: v['attributeValue']}) except Exception as errx: print("Error - {} . Line No - {} ".format(str(errx), str( sys.exc_info()[-1].tb_lineno))) if len(extnVal) > 0: dfSrcData.at[indx, 'hosi_account_extn_attributes'] = extnVal except Exception as err: print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) if primaryKey: dfSrcData[primaryKey] = uuid.uuid4() dfSrcData[primaryKey] = dfSrcData[primaryKey].apply(lambda _: uuid.uuid4()) for indxDateTimCol in dateTimeColumns: # dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat) dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat).where( dfSrcData[indxDateTimCol].notnull(), None) for indxDateCol in dateColumns: dfSrcData[indxDateCol] = dfSrcData[indxDateCol].dt.strftime(dateFormat).where( dfSrcData[indxDateCol].notnull(), None) for indxCharCol in columnsToInt: dfSrcData[indxCharCol] = pd.to_numeric(dfSrcData[indxCharCol], errors='coerce') for indxMiscelCol in columnsToText: dfSrcData[indxMiscelCol] = dfSrcData[indxMiscelCol].astype(str) print("dfDataSet : %s [Before Merge]" % len(dfDataSet)) dfDataSet = dfDataSet.copy().merge(dfSrcData , how=joinCondition , left_on=lkupColumns , right_on=columnsForComparison ) print("dfDataSet : %s [After Merge]" % len(dfDataSet)) print("Columns : %s" % (list(dfDataSet))) if len(dfDataSet) == 0: raise Exception("LOST RECORDS POST MERGE") dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True) if len(columnsToRemove) > 0: print("columnsToRemove : %s {%s}" % (columnsToRemove, type(columnsToRemove))) dfDataSet.drop(columnsToRemove, axis=1, inplace=True) # endFor==dataset else: failedEntities.update({indx: "datasets section not found"}) else: failedEntities.update({indx: "entityInteraction section not found"}) # endFor==>entityType # DataLoading and update Status if len(failedEntities): raise Exception("Failed entities: " + str({k: v for k, v in failedEntities.items()})) elif len(dfDataSet) == 0: raise Exception("NO RECORDS TO PROCESS") else: print("dfDataSet : type{%s}" % type(list(dfDataSet))) print(list(dfDataSet)) print("") print("Total : %s" % len(dfDataSet)) tableName = 'hos_interaction' if not loadDataFrameToCassandra(connection_id="cassandrahost" , table_name=tableName , df_data=dfDataSet , load_type_cqlldr=True , dag_name=kwargs['dag_run'].dag_id , file_suffix="_" + str(batchId)): raise Exception("ODS data store load failed.") print("Cassandra dataload submitted") tableName = 'hos_migration_master' dfDataSet['homm_entity_value'] = dfDataSet['hosi_interaction_id'] dfDataSet['homm_batch'] = batchId dfDataSet['homm_entity_id'] = 'interaction' dfDataSet['homm_entity_status'] = 'odsStaged' dfDataSet['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat) if not loadDataFrameToCassandra(connection_id="cassandrahost" , table_name=tableName , df_data=dfDataSet[ ['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on', 'homm_entity_value']] , load_type_cqlldr=True , dag_name=kwargs['dag_run'].dag_id , file_suffix="_" + str(batchId)): raise Exception("Staging master odsStaged update failed") dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str) dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']] , how='left' , on=['homm_entity_value'] , indicator=True ).loc[lambda x: x['_merge'] != 'both'] print("dfMigMasterFailed : %s" % len(dfMigMasterFailed)) dfMigMasterFailed['homm_batch'] = batchId dfMigMasterFailed['homm_entity_id'] = 'interaction' dfMigMasterFailed['homm_entity_status'] = 'odsFailed' dfMigMasterFailed['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat) if not loadDataFrameToCassandra(connection_id="cassandrahost" , table_name=tableName , df_data=dfMigMasterFailed[ ['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on', 'homm_entity_value']] , load_type_cqlldr=True , dag_name='odsBulkMigration_Interaction' , file_suffix="_" + str(batchId) + "_Failed"): raise Exception("Staging master odsFailed update failed") # endIf print("SUCCESS")
Editor is loading...