Untitled
unknown
plain_text
2 years ago
25 kB
1
Indexable
Never
from airflow import DAG from airflow.utils.dates import days_ago from airflow.models import Variable from airflow import AirflowException from airflow.operators.python_operator import PythonOperator import pandas as pd from datetime import datetime import json import gc from os import environ as os_environ from os import path as os_path import sys import uuid # --Invoke utility based function from utils.setCassandraDBConnection import setCassandraDBConnection from utils.setDbEngine import setDbEngine def getConfiguration(__file__): from configparser import ConfigParser as conf_ConfigParser print("DAG __file__ : %s" % __file__) # Entity-Interaction configProperties = conf_ConfigParser() configFile = __file__.replace('/dags/', '/config/').replace('.py', '.config') if not os_path.exists(configFile): print("Entity config file does not exist.") print("Entity config File: %s" % configFile) configProperties.read(configFile) # Entity-Generic configMainProperties = conf_ConfigParser() configMainFile = __file__.replace('/dags/', '/config/') configMainFile = configMainFile[:configMainFile.find("_")] + ".config" if not os_path.exists(configMainFile): print("Entity generic config file does not exist.") print("Config File : %s" % configMainFile) configMainProperties.read(configMainFile) return (configProperties, configMainProperties) def func_pandas_factory(colnames, rows): return pd.DataFrame(rows, columns=colnames) def checkDag(): adminVariableValue = {} overide = "" adminVariableValue = None configProperties, configMainProperties = getConfiguration(__file__) if Variable.get("odsBulkMigration_Interaction", None): adminVariableValue = Variable.get("odsBulkMigration_Interaction", None) print('Airflow UI variable value assigned {odsBulkMigration_Interaction: %s}' % adminVariableValue) if not adminVariableValue: raise Exception("adminVariableValue{odsBulkMigration_Interaction} value not set.") adminVariableValue = json.loads(adminVariableValue) if "overRide" in adminVariableValue: if 'overRide' in adminVariableValue.keys() and adminVariableValue['overRide'] == 'YES' or 'NO' and \ adminVariableValue['overRide'] != "": overide = adminVariableValue['overRide'] elif configProperties.has_option("config", "overRide"): overide = configProperties.get("config", "overRide") elif adminVariableValue['overRide'] == "": overide = 'NO' else: print("overRide flag not set") elif configProperties.has_option("config", "overRide"): overide = configProperties.get("config", "overRide") else: overide = 'NO' return overide def dataPopulate(tableName, dfValue, configMainProperties, fileSuffix=None): from subprocess import Popen, PIPE try: print("Preparing files for data population") fileSuffix = tableName + fileSuffix if fileSuffix else tableName cassPath = os_environ.get('CASSANDRA_HOME') cassCsvFile = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.csv' cassCqlldr = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.cqlldr' cassClog = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.clog' cassCErr = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.cerr' print("cassCsvFile: %s" % cassCsvFile) print("cassCqlldr : %s" % cassCqlldr) print("cassClog : %s" % cassClog) print("cassCErr : %s" % cassCErr) dfValue.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile) cassCols = list(dfValue) cassCols = ','.join(cassCols) cassCqlldr_W = open(cassCqlldr, "w") cassCqlldr_W.write("copy hobs_ods_staging." + tableName + " (" + cassCols + ") FROM '" + cassCsvFile + "'" + " WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "'" + " AND HEADER=True " + " AND MAXATTEMPTS=1" + " AND MINBATCHSIZE=1 AND MAXBATCHSIZE=1 AND PAGESIZE=10 " + ";") cassCqlldr_W.close() print("Cassandra loader process") shellCommand = cassPath + '/bin/cqlsh ' + configMainProperties.get('cassandra', 'host', raw=True) + ' ' \ + configMainProperties.get('cassandra', 'port', raw=True) \ + ' -u ' + configMainProperties.get('cassandra', 'username', raw=True) \ + ' -p ' + configMainProperties.get('cassandra', 'password', raw=True) \ + ' -k ' + configMainProperties.get('cassandra', 'keyspace', raw=True) \ + ' -f ' + cassCqlldr + ' > ' + cassClog shellProcess = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True) shellOutput, shellErr = shellProcess.communicate() shellReturnCode = shellProcess.returncode shellPid = str(shellProcess.pid) print("cassandra dataload processId: %s" % shellPid) print("shellProcess.returncode : %s" % shellReturnCode) print("shellOutput : %s" % shellOutput) print("shellErr : %s" % shellErr) if shellReturnCode == 0: return True else: raise Exception("ODS data store load failed") gc.collect() return True except Exception as expCDB: gc.collect() print("Error - {} . Line No - {} ".format(str(expCDB), str(sys.exc_info()[-1].tb_lineno))) return False # def fn_processInteraction(): # __file__="/app/server/HOBS-DataPipeline/dags/odsBulkMigration/odsBulkMigration_Interaction.py" def fn_processInteraction(**kwargs): try: entity = 'Interaction' dagIdVar = os_path.basename(__file__).replace(".py", "") adminVariableValue = {} cassPath = os_environ.get('CASSANDRA_HOME') dfDataSet = pd.DataFrame() dfMigMasterFailed = pd.DataFrame() failedEntities = {} over_ride = checkDag() if not cassPath: raise Exception("CASSANDRA_HOME not configured.") # Configuration Properties configProperties, configMainProperties = getConfiguration(__file__) print("configMainProperties") print(configMainProperties.sections()) print("configProperties") print(configProperties.sections()) opId = configMainProperties.get("config", "opId") buId = configMainProperties.get("config", "buId") dateTimeFormat = configMainProperties.get("config", "dateTimeFormat", raw=True) dateFormat = configMainProperties.get("config", "dateFormat", raw=True) entityType = json.loads(configProperties.get("config", "entityType")) adminVariableValue = None dfDataSet = pd.DataFrame() dfMigMasterFailed = pd.DataFrame() failedEntities = {} cassPath = os_environ.get('CASSANDRA_HOME') if Variable.get("odsBulkMigration_Interaction", None): adminVariableValue = Variable.get("odsBulkMigration_Interaction", None) print('Airflow UI variable value assigned {odsBulkMigration_Interaction: %s}' % adminVariableValue) if not adminVariableValue: raise Exception("adminVariableValue{odsBulkMigration_Interaction} value not set.") adminVariableValue = json.loads(adminVariableValue) if 'entityId' in adminVariableValue.keys() and adminVariableValue['entityId'] == 'Interaction': entityId = adminVariableValue['entityId'] else: raise Exception("entityId not set or entityId is not Interaction.") if ('batchId' in adminVariableValue.keys() and adminVariableValue['batchId'] != ''): batchId = int(adminVariableValue['batchId']) print("batchId: %s [UI defined]" % batchId) else: raise Exception("batchId not present or batchId value is not valid") if not cassPath: raise Exception("CASSANDRA_HOME not configured.") if ('opId' in adminVariableValue.keys() and adminVariableValue['opId']): opId = adminVariableValue['opId'] print("opId : %s [UI defined]" % opId) if ('buId' in adminVariableValue.keys() and adminVariableValue['buId']): buId = adminVariableValue['buId'] print("buId : %s [UI defined]" % buId) # Cassandra database connection ##dbConnectRetVal = setCassandraDBConnection(configMainProperties) # --Invoke utility based function dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True) if not dbConnectRetVal[0]: raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1]) cassDbSession = dbConnectRetVal[1] print("Cassandra database connection established") # Source database connection # dbConnectRetVal = setSourceConnection(configMainProperties) # --Invoke utility based function dbConnectRetVal = setDbEngine() if not dbConnectRetVal[0]: raise Exception("Source database connection failed: " + dbConnectRetVal[1]) sourceDbEngine = dbConnectRetVal[1] print("Source database connection established") print("opId : %s" % opId) print("buId : %s" % buId) print("cassPath : %s" % cassPath) for indx in entityType: if configProperties.has_section(indx): print("-------- PROCESSING ENTITY: %s ----------------" % indx) if configProperties.has_option(indx, "datasets"): dataset = json.loads(configProperties.get(indx, "datasets")) print("dataset: %s" % dataset) if configProperties.has_option(indx, "datasets"): dataset = json.loads(configProperties.get(indx, "datasets")) print("dataset: %s" % dataset) for indxDS in dataset: if over_ride == 'YES' and indxDS == 'stageData': continue # skip stageData dataset processing print("--- Processing dataset: %s" % indxDS) query = configProperties.get(indx, indxDS + ".query", raw=True) columnsForComparison = json.loads(configProperties.get(indx, indxDS + ".columnsForComparison")) lkupColumns = json.loads(configProperties.get(indx, indxDS + ".lkupColumns")) joinCondition = configProperties.get(indx, indxDS + ".joinCondition", raw=True) primaryKey = configProperties.get(indx, indxDS + ".primaryKey", raw=True) if configProperties.has_option(indx, indxDS + ".primaryKey") else None dateTimeColumns = json.loads( configProperties.get(indx, indxDS + ".dateTimeColumns")) if configProperties.has_option( indx, indxDS + ".dateTimeColumns") else [] dateColumns = json.loads( configProperties.get(indx, indxDS + ".dateColumns")) if configProperties.has_option(indx, indxDS + ".dateColumns") else [] columnsToRemove = json.loads( configProperties.get(indx, indxDS + ".columnsToRemove")) if configProperties.has_option( indx, indxDS + ".columnsToRemove") else [] connDatabase = configProperties.get(indx, indxDS + ".database", raw=True) if configProperties.has_option(indx, indxDS + ".database") else None columnsToInt = json.loads( configProperties.get(indx, indxDS + ".columnsToInt")) if configProperties.has_option(indx, indxDS + ".columnsToInt") else [] columnsToText = json.loads( configProperties.get(indx, indxDS + ".columnsToText")) if configProperties.has_option(indx, indxDS + ".columnsToText") else [] if 'homm_batch' in query.lower(): query = (configProperties.get(indx, indxDS + ".query", raw=True) % (batchId)) elif "op_id='%s'" in query.lower().replace(' ', '') and "bu_id='%s'" in query.lower().replace( ' ', ''): query = (configProperties.get(indx, indxDS + ".query", raw=True) % (opId, buId)) print("Query : %s" % query) print("columnsForComparison: %s" % columnsForComparison) print("lkupColumns : %s" % lkupColumns) print("joinCondition : %s" % joinCondition) print("primaryKey : %s" % primaryKey) print("dateTimeColumns : %s" % dateTimeColumns) print("dateColumns : %s" % dateColumns) print("columnsToInt : %s" % columnsToInt) print("columnsToText : %s" % columnsToText) print("connDatabase : %s" % connDatabase) if indxDS == "stageData": dfSrcData = cassDbSession.execute(query, timeout=1000) dfSrcData = dfSrcData._current_rows dfSrcData.columns = dfSrcData.columns.str.lower() print("dfSrcData", type(dfSrcData)) print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if len(dfSrcData) == 0: raise Exception("Entity Interaction not stagged for batch[%s]" % batchId) dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str) dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status'] == 'staged')] print("homm_entity_status : %s [Filter]" % (len(dfDataSet))) if len(dfDataSet) == 0: raise Exception( "Entity Interaction not stagged for batch[%s] after filter=staged" % batchId) dfMigMasterFailed = dfSrcData.copy() else: if not (len(lkupColumns) > 0 and len(columnsForComparison) > 0 and joinCondition): raise Exception( "Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly") dfSrcData = pd.read_sql(query, con=sourceDbEngine) dfSrcData.columns = dfSrcData.columns.str.lower() print("Extracted : %s [%s]" % (len(dfSrcData), indxDS)) if len(dfSrcData) == 0: raise Exception("Entity Interaction failed in traversing datasets.") if 'hosi_account_extn_attributes' in list(dfSrcData): for indx, row in dfSrcData.copy().iterrows(): try: extnVal = {} temp = json.loads(row['hosi_account_extn_attributes']) if row[ 'hosi_account_extn_attributes'] else [] for v in temp: try: if 'attributeName' in v and 'attributeValue' in v and v[ 'attributeValue']: extnVal.update({v['attributeName']: v['attributeValue']}) except Exception as errx: print("Error - {} . Line No - {} ".format(str(errx), str( sys.exc_info()[-1].tb_lineno))) if len(extnVal) > 0: dfSrcData.at[indx, 'hosi_account_extn_attributes'] = extnVal except Exception as err: print("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) if primaryKey: dfSrcData[primaryKey] = uuid.uuid4() dfSrcData[primaryKey] = dfSrcData[primaryKey].apply(lambda _: uuid.uuid4()) for indxDateTimCol in dateTimeColumns: # dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat) dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat).where( dfSrcData[indxDateTimCol].notnull(), None) for indxDateCol in dateColumns: dfSrcData[indxDateCol] = dfSrcData[indxDateCol].dt.strftime(dateFormat).where( dfSrcData[indxDateCol].notnull(), None) for indxCharCol in columnsToInt: dfSrcData[indxCharCol] = pd.to_numeric(dfSrcData[indxCharCol], errors='coerce') for indxMiscelCol in columnsToText: dfSrcData[indxMiscelCol] = dfSrcData[indxMiscelCol].astype(str) print("dfDataSet : %s [Before Merge]" % len(dfDataSet)) dfDataSet = dfDataSet.copy().merge(dfSrcData , how=joinCondition , left_on=lkupColumns , right_on=columnsForComparison ) print("dfDataSet : %s [After Merge]" % len(dfDataSet)) print("Columns : %s" % (list(dfDataSet))) if len(dfDataSet) == 0: raise Exception("LOST RECORDS POST MERGE") dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True) if indxDS != 'interaction': if len(columnsToRemove) > 0: print("columnsToRemove : %s {%s}" % (columnsToRemove, type(columnsToRemove))) dfDataSet.drop(columnsToRemove, axis=1, inplace=True) # endFor==dataset else: failedEntities.update({indx: "datasets section not found"}) else: failedEntities.update({indx: "entityInteraction section not found"}) # endFor==>entityType # DataLoading and update Status if len(failedEntities): raise Exception("Failed entities: " + str({k: v for k, v in failedEntities.items()})) elif len(dfDataSet) == 0: raise Exception("NO RECORDS TO PROCESS") else: print("dfDataSet : type{%s}" % type(list(dfDataSet))) print(list(dfDataSet)) print("") print("Total : %s" % len(dfDataSet)) tableName = 'hos_interaction' if not dataPopulate(tableName , dfDataSet , configMainProperties , "_" + str(batchId)): raise Exception("ODS data store load failed.") print("Cassandra dataload submitted") tableName = 'hos_migration_master' dfDataSet['homm_entity_value'] = dfDataSet['hosi_interaction_id'] dfDataSet['homm_batch'] = batchId dfDataSet['homm_entity_id'] = 'interaction' dfDataSet['homm_entity_status'] = 'odsStaged' dfDataSet['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat) if not dataPopulate(tableName , dfDataSet[['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on', 'homm_entity_value']] , configMainProperties , "_" + str(batchId)): raise Exception("Staging master odsStaged update failed") dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str) dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']] , how='left' , on=['homm_entity_value'] , indicator=True ).loc[lambda x: x['_merge'] != 'both'] print("dfMigMasterFailed : %s" % len(dfMigMasterFailed)) dfMigMasterFailed['homm_batch'] = batchId dfMigMasterFailed['homm_entity_id'] = 'interaction' dfMigMasterFailed['homm_entity_status'] = 'odsFailed' dfMigMasterFailed['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat) if not dataPopulate(tableName , dfMigMasterFailed[['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_entity_value']] , configMainProperties , "_" + str(batchId) + "_Failed"): raise Exception("Staging master odsFailed update failed") # endIf print("SUCCESS") gc.collect() except Exception as exp: gc.collect() print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno))) raise AirflowException(str(exp)) # ********************* DAG Tasks********************************************************************************************# default_args = {'owner': 'airflow' , 'depends_on_past': False , 'start_date': days_ago(2) } dag = DAG("odsBulkMigration_Interaction" , catchup=False , default_args=default_args , description="odsBulkMigration_Interaction" , schedule_interval="@once" , tags=["odsBulkMigration"] ) operatorStageData = PythonOperator(task_id='task_odsBulkMigration_Interaction' , python_callable=fn_processInteraction , dag=dag , provide_context=True ) # ********************* DAG Sequence*******************************************************************************************# operatorStageData """ if __name__ == '__main__': fn_processInteraction() """