Untitled

mail@pastecode.io avatar
unknown
plain_text
2 years ago
25 kB
1
Indexable
Never
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator
import pandas as pd
from datetime import datetime
import json
import gc
from os import environ as os_environ
from os import path as os_path
import sys
import uuid

# --Invoke utility based function
from utils.setCassandraDBConnection import setCassandraDBConnection
from utils.setDbEngine import setDbEngine


def getConfiguration(__file__):
    from configparser import ConfigParser as conf_ConfigParser
    print("DAG __file__      : %s" % __file__)

    # Entity-Interaction
    configProperties = conf_ConfigParser()
    configFile = __file__.replace('/dags/', '/config/').replace('.py', '.config')
    if not os_path.exists(configFile):
        print("Entity config file does not exist.")
    print("Entity config File: %s" % configFile)
    configProperties.read(configFile)

    # Entity-Generic
    configMainProperties = conf_ConfigParser()
    configMainFile = __file__.replace('/dags/', '/config/')
    configMainFile = configMainFile[:configMainFile.find("_")] + ".config"
    if not os_path.exists(configMainFile):
        print("Entity generic config file does not exist.")
    print("Config File       : %s" % configMainFile)
    configMainProperties.read(configMainFile)

    return (configProperties, configMainProperties)


def func_pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)


def checkDag():
    adminVariableValue = {}
    overide = ""
    adminVariableValue = None
    configProperties, configMainProperties = getConfiguration(__file__)
    if Variable.get("odsBulkMigration_Interaction", None):
        adminVariableValue = Variable.get("odsBulkMigration_Interaction", None)
        print('Airflow UI variable value assigned {odsBulkMigration_Interaction: %s}' % adminVariableValue)
    if not adminVariableValue:
        raise Exception("adminVariableValue{odsBulkMigration_Interaction} value not set.")
    adminVariableValue = json.loads(adminVariableValue)
    if "overRide" in adminVariableValue:
        if 'overRide' in adminVariableValue.keys() and adminVariableValue['overRide'] == 'YES' or 'NO' and \
                adminVariableValue['overRide'] != "":
            overide = adminVariableValue['overRide']
        elif configProperties.has_option("config", "overRide"):
            overide = configProperties.get("config", "overRide")
        elif adminVariableValue['overRide'] == "":
            overide = 'NO'
        else:
            print("overRide flag not set")
    elif configProperties.has_option("config", "overRide"):
        overide = configProperties.get("config", "overRide")
    else:
        overide = 'NO'
    return overide


def dataPopulate(tableName, dfValue, configMainProperties, fileSuffix=None):
    from subprocess import Popen, PIPE
    try:
        print("Preparing files for data population")
        fileSuffix = tableName + fileSuffix if fileSuffix else tableName
        cassPath = os_environ.get('CASSANDRA_HOME')
        cassCsvFile = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.csv'
        cassCqlldr = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.cqlldr'
        cassClog = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.clog'
        cassCErr = os_environ.get('AIRFLOW_HOME') + '/logs/odsBulkMigration_Interaction/' + fileSuffix + '.cerr'
        print("cassCsvFile: %s" % cassCsvFile)
        print("cassCqlldr : %s" % cassCqlldr)
        print("cassClog   : %s" % cassClog)
        print("cassCErr   : %s" % cassCErr)
        dfValue.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile)
        cassCols = list(dfValue)
        cassCols = ','.join(cassCols)

        cassCqlldr_W = open(cassCqlldr, "w")
        cassCqlldr_W.write("copy hobs_ods_staging." + tableName + " (" + cassCols + ") FROM '" + cassCsvFile + "'"
                           + " WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "'"
                           + " AND HEADER=True "
                           + " AND MAXATTEMPTS=1"
                           + " AND MINBATCHSIZE=1 AND MAXBATCHSIZE=1 AND PAGESIZE=10 "
                           + ";")
        cassCqlldr_W.close()

        print("Cassandra loader process")
        shellCommand = cassPath + '/bin/cqlsh ' + configMainProperties.get('cassandra', 'host', raw=True) + ' ' \
                       + configMainProperties.get('cassandra', 'port', raw=True) \
                       + ' -u ' + configMainProperties.get('cassandra', 'username', raw=True) \
                       + ' -p ' + configMainProperties.get('cassandra', 'password', raw=True) \
                       + ' -k ' + configMainProperties.get('cassandra', 'keyspace', raw=True) \
                       + ' -f ' + cassCqlldr + ' > ' + cassClog

        shellProcess = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
        shellOutput, shellErr = shellProcess.communicate()
        shellReturnCode = shellProcess.returncode
        shellPid = str(shellProcess.pid)
        print("cassandra dataload processId: %s" % shellPid)
        print("shellProcess.returncode     : %s" % shellReturnCode)
        print("shellOutput                 : %s" % shellOutput)
        print("shellErr                    : %s" % shellErr)

        if shellReturnCode == 0:
            return True
        else:
            raise Exception("ODS data store load failed")
        gc.collect()
        return True
    except Exception as expCDB:
        gc.collect()
        print("Error - {} . Line No - {} ".format(str(expCDB), str(sys.exc_info()[-1].tb_lineno)))
        return False


# def fn_processInteraction():
#    __file__="/app/server/HOBS-DataPipeline/dags/odsBulkMigration/odsBulkMigration_Interaction.py"
def fn_processInteraction(**kwargs):
    try:
        entity = 'Interaction'
        dagIdVar = os_path.basename(__file__).replace(".py", "")
        adminVariableValue = {}
        cassPath = os_environ.get('CASSANDRA_HOME')
        dfDataSet = pd.DataFrame()
        dfMigMasterFailed = pd.DataFrame()
        failedEntities = {}
        over_ride = checkDag()
        if not cassPath:
            raise Exception("CASSANDRA_HOME not configured.")
        # Configuration  Properties
        configProperties, configMainProperties = getConfiguration(__file__)
        print("configMainProperties")
        print(configMainProperties.sections())
        print("configProperties")
        print(configProperties.sections())

        opId = configMainProperties.get("config", "opId")
        buId = configMainProperties.get("config", "buId")
        dateTimeFormat = configMainProperties.get("config", "dateTimeFormat", raw=True)
        dateFormat = configMainProperties.get("config", "dateFormat", raw=True)
        entityType = json.loads(configProperties.get("config", "entityType"))
        adminVariableValue = None
        dfDataSet = pd.DataFrame()
        dfMigMasterFailed = pd.DataFrame()
        failedEntities = {}
        cassPath = os_environ.get('CASSANDRA_HOME')

        if Variable.get("odsBulkMigration_Interaction", None):
            adminVariableValue = Variable.get("odsBulkMigration_Interaction", None)
            print('Airflow UI variable value assigned {odsBulkMigration_Interaction: %s}' % adminVariableValue)

        if not adminVariableValue:
            raise Exception("adminVariableValue{odsBulkMigration_Interaction} value not set.")
        adminVariableValue = json.loads(adminVariableValue)

        if 'entityId' in adminVariableValue.keys() and adminVariableValue['entityId'] == 'Interaction':
            entityId = adminVariableValue['entityId']
        else:
            raise Exception("entityId not set or entityId is not Interaction.")

        if ('batchId' in adminVariableValue.keys() and adminVariableValue['batchId'] != ''):
            batchId = int(adminVariableValue['batchId'])
            print("batchId: %s [UI defined]" % batchId)
        else:
            raise Exception("batchId not present or batchId value is not valid")

        if not cassPath:
            raise Exception("CASSANDRA_HOME not configured.")

        if ('opId' in adminVariableValue.keys() and adminVariableValue['opId']):
            opId = adminVariableValue['opId']
            print("opId   : %s [UI defined]" % opId)

        if ('buId' in adminVariableValue.keys() and adminVariableValue['buId']):
            buId = adminVariableValue['buId']
            print("buId   : %s [UI defined]" % buId)

        # Cassandra database connection
        ##dbConnectRetVal = setCassandraDBConnection(configMainProperties)
        # --Invoke utility based function
        dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
        if not dbConnectRetVal[0]:
            raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1])
        cassDbSession = dbConnectRetVal[1]
        print("Cassandra database connection established")

        # Source database connection
        # dbConnectRetVal = setSourceConnection(configMainProperties)
        # --Invoke utility based function
        dbConnectRetVal = setDbEngine()
        if not dbConnectRetVal[0]:
            raise Exception("Source database connection failed: " + dbConnectRetVal[1])
        sourceDbEngine = dbConnectRetVal[1]
        print("Source database connection established")

        print("opId                : %s" % opId)
        print("buId                : %s" % buId)
        print("cassPath            : %s" % cassPath)

        for indx in entityType:
            if configProperties.has_section(indx):
                print("-------- PROCESSING ENTITY: %s ----------------" % indx)
                if configProperties.has_option(indx, "datasets"):
                    dataset = json.loads(configProperties.get(indx, "datasets"))
                    print("dataset: %s" % dataset)
                if configProperties.has_option(indx, "datasets"):
                    dataset = json.loads(configProperties.get(indx, "datasets"))
                    print("dataset: %s" % dataset)
                    for indxDS in dataset:
                        if over_ride == 'YES' and indxDS == 'stageData':
                            continue  # skip stageData dataset processing
                        print("--- Processing dataset: %s" % indxDS)
                        query = configProperties.get(indx, indxDS + ".query", raw=True)
                        columnsForComparison = json.loads(configProperties.get(indx, indxDS + ".columnsForComparison"))
                        lkupColumns = json.loads(configProperties.get(indx, indxDS + ".lkupColumns"))
                        joinCondition = configProperties.get(indx, indxDS + ".joinCondition", raw=True)
                        primaryKey = configProperties.get(indx, indxDS + ".primaryKey",
                                                          raw=True) if configProperties.has_option(indx,
                                                                                                   indxDS + ".primaryKey") else None
                        dateTimeColumns = json.loads(
                            configProperties.get(indx, indxDS + ".dateTimeColumns")) if configProperties.has_option(
                            indx, indxDS + ".dateTimeColumns") else []
                        dateColumns = json.loads(
                            configProperties.get(indx, indxDS + ".dateColumns")) if configProperties.has_option(indx,
                                                                                                                indxDS + ".dateColumns") else []
                        columnsToRemove = json.loads(
                            configProperties.get(indx, indxDS + ".columnsToRemove")) if configProperties.has_option(
                            indx, indxDS + ".columnsToRemove") else []
                        connDatabase = configProperties.get(indx, indxDS + ".database",
                                                            raw=True) if configProperties.has_option(indx,
                                                                                                     indxDS + ".database") else None
                        columnsToInt = json.loads(
                            configProperties.get(indx, indxDS + ".columnsToInt")) if configProperties.has_option(indx,
                                                                                                                 indxDS + ".columnsToInt") else []
                        columnsToText = json.loads(
                            configProperties.get(indx, indxDS + ".columnsToText")) if configProperties.has_option(indx,
                                                                                                                  indxDS + ".columnsToText") else []

                        if 'homm_batch' in query.lower():
                            query = (configProperties.get(indx, indxDS + ".query", raw=True) % (batchId))
                        elif "op_id='%s'" in query.lower().replace(' ', '') and "bu_id='%s'" in query.lower().replace(
                                ' ', ''):
                            query = (configProperties.get(indx, indxDS + ".query", raw=True) % (opId, buId))

                        print("Query               : %s" % query)
                        print("columnsForComparison: %s" % columnsForComparison)
                        print("lkupColumns         : %s" % lkupColumns)
                        print("joinCondition       : %s" % joinCondition)
                        print("primaryKey          : %s" % primaryKey)
                        print("dateTimeColumns     : %s" % dateTimeColumns)
                        print("dateColumns         : %s" % dateColumns)
                        print("columnsToInt        : %s" % columnsToInt)
                        print("columnsToText       : %s" % columnsToText)
                        print("connDatabase        : %s" % connDatabase)

                        if indxDS == "stageData":
                            dfSrcData = cassDbSession.execute(query, timeout=1000)
                            dfSrcData = dfSrcData._current_rows
                            dfSrcData.columns = dfSrcData.columns.str.lower()
                            print("dfSrcData", type(dfSrcData))
                            print("Extracted           : %s [%s]" % (len(dfSrcData), indxDS))
                            if len(dfSrcData) == 0:
                                raise Exception("Entity Interaction not stagged for batch[%s]" % batchId)
                            dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str)
                            dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status'] == 'staged')]
                            print("homm_entity_status  : %s [Filter]" % (len(dfDataSet)))
                            if len(dfDataSet) == 0:
                                raise Exception(
                                    "Entity Interaction not stagged for batch[%s] after filter=staged" % batchId)
                            dfMigMasterFailed = dfSrcData.copy()
                        else:
                            if not (len(lkupColumns) > 0 and len(columnsForComparison) > 0 and joinCondition):
                                raise Exception(
                                    "Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly")

                            dfSrcData = pd.read_sql(query, con=sourceDbEngine)
                            dfSrcData.columns = dfSrcData.columns.str.lower()
                            print("Extracted           : %s [%s]" % (len(dfSrcData), indxDS))
                            if len(dfSrcData) == 0:
                                raise Exception("Entity Interaction failed in traversing datasets.")

                            if 'hosi_account_extn_attributes' in list(dfSrcData):
                                for indx, row in dfSrcData.copy().iterrows():
                                    try:
                                        extnVal = {}
                                        temp = json.loads(row['hosi_account_extn_attributes']) if row[
                                            'hosi_account_extn_attributes'] else []
                                        for v in temp:
                                            try:
                                                if 'attributeName' in v and 'attributeValue' in v and v[
                                                    'attributeValue']:
                                                    extnVal.update({v['attributeName']: v['attributeValue']})
                                            except Exception as errx:
                                                print("Error - {} . Line No - {} ".format(str(errx), str(
                                                    sys.exc_info()[-1].tb_lineno)))
                                        if len(extnVal) > 0:
                                            dfSrcData.at[indx, 'hosi_account_extn_attributes'] = extnVal
                                    except Exception as err:
                                        print("Error - {} . Line No - {} ".format(str(err),
                                                                                  str(sys.exc_info()[-1].tb_lineno)))

                            if primaryKey:
                                dfSrcData[primaryKey] = uuid.uuid4()
                                dfSrcData[primaryKey] = dfSrcData[primaryKey].apply(lambda _: uuid.uuid4())

                            for indxDateTimCol in dateTimeColumns:
                                # dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat)
                                dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat).where(
                                    dfSrcData[indxDateTimCol].notnull(), None)
                            for indxDateCol in dateColumns:
                                dfSrcData[indxDateCol] = dfSrcData[indxDateCol].dt.strftime(dateFormat).where(
                                    dfSrcData[indxDateCol].notnull(), None)
                            for indxCharCol in columnsToInt:
                                dfSrcData[indxCharCol] = pd.to_numeric(dfSrcData[indxCharCol], errors='coerce')
                            for indxMiscelCol in columnsToText:
                                dfSrcData[indxMiscelCol] = dfSrcData[indxMiscelCol].astype(str)

                            print("dfDataSet           : %s [Before Merge]" % len(dfDataSet))

                            dfDataSet = dfDataSet.copy().merge(dfSrcData
                                                               , how=joinCondition
                                                               , left_on=lkupColumns
                                                               , right_on=columnsForComparison
                                                               )

                            print("dfDataSet           : %s [After Merge]" % len(dfDataSet))
                            print("Columns             : %s" % (list(dfDataSet)))
                            if len(dfDataSet) == 0:
                                raise Exception("LOST RECORDS POST MERGE")

                            dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True)
                            if indxDS != 'interaction':
                                if len(columnsToRemove) > 0:
                                    print("columnsToRemove     : %s {%s}" % (columnsToRemove, type(columnsToRemove)))
                                    dfDataSet.drop(columnsToRemove, axis=1, inplace=True)

                    # endFor==dataset
                else:
                    failedEntities.update({indx: "datasets section not found"})
            else:
                failedEntities.update({indx: "entityInteraction section not found"})
        # endFor==>entityType

        # DataLoading and update Status
        if len(failedEntities):
            raise Exception("Failed entities: " + str({k: v for k, v in failedEntities.items()}))
        elif len(dfDataSet) == 0:
            raise Exception("NO RECORDS TO PROCESS")
        else:
            print("dfDataSet : type{%s}" % type(list(dfDataSet)))
            print(list(dfDataSet))
            print("")

            print("Total               : %s" % len(dfDataSet))

            tableName = 'hos_interaction'
            if not dataPopulate(tableName
                    , dfDataSet
                    , configMainProperties
                    , "_" + str(batchId)):
                raise Exception("ODS data store load failed.")

            print("Cassandra dataload submitted")

            tableName = 'hos_migration_master'
            dfDataSet['homm_entity_value'] = dfDataSet['hosi_interaction_id']
            dfDataSet['homm_batch'] = batchId
            dfDataSet['homm_entity_id'] = 'interaction'
            dfDataSet['homm_entity_status'] = 'odsStaged'
            dfDataSet['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat)
            if not dataPopulate(tableName
                    , dfDataSet[['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on',
                                 'homm_entity_value']]
                    , configMainProperties
                    , "_" + str(batchId)):
                raise Exception("Staging master odsStaged update failed")

            dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str)
            dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']]
                                                                               , how='left'
                                                                               , on=['homm_entity_value']
                                                                               , indicator=True
                                                                               ).loc[lambda x: x['_merge'] != 'both']
            print("dfMigMasterFailed   : %s" % len(dfMigMasterFailed))
            dfMigMasterFailed['homm_batch'] = batchId
            dfMigMasterFailed['homm_entity_id'] = 'interaction'
            dfMigMasterFailed['homm_entity_status'] = 'odsFailed'
            dfMigMasterFailed['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat)
            if not dataPopulate(tableName
                    , dfMigMasterFailed[['homm_entity_id', 'homm_entity_status', 'homm_batch',
                                         'homm_entity_value']]
                    , configMainProperties
                    , "_" + str(batchId) + "_Failed"):
                raise Exception("Staging master odsFailed update failed")
        # endIf
        print("SUCCESS")
        gc.collect()
    except Exception as exp:
        gc.collect()
        print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
        raise AirflowException(str(exp))


# ********************* DAG Tasks********************************************************************************************#

default_args = {'owner': 'airflow'
    , 'depends_on_past': False
    , 'start_date': days_ago(2)
                }

dag = DAG("odsBulkMigration_Interaction"
          , catchup=False
          , default_args=default_args
          , description="odsBulkMigration_Interaction"
          , schedule_interval="@once"
          , tags=["odsBulkMigration"]
          )

operatorStageData = PythonOperator(task_id='task_odsBulkMigration_Interaction'
                                   , python_callable=fn_processInteraction
                                   , dag=dag
                                   , provide_context=True
                                   )

# ********************* DAG Sequence*******************************************************************************************#

operatorStageData

"""

if __name__ == '__main__':
    fn_processInteraction()
"""