Untitled

 avatar
unknown
plain_text
2 years ago
19 kB
4
Indexable
import uuid
import time
from configparser import ConfigParser as conf_ConfigParser
from json import loads as json_loads
from os import environ as os_environ
from os import path as os_path
from pandas import DataFrame as pd_DataFrame
from pandas import merge as pd_merge
from pandas import datetime as pd_to_datetime
from pandas import read_sql as pd_read_sql
from subprocess import Popen, PIPE
from airflow import DAG

from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator

from utils.formatSQLQuery import formatSQLQuery
from utils.loadDataFrameToCassandra import loadDataFrameToCassandra
from utils.setCassandraDBConnection import setCassandraDBConnection
from utils.setDbEngine import setDbEngine
import sys


def getConfiguration(__file__):
    from configparser import ConfigParser as conf_ConfigParser
    print("DAG __file__      : %s" % __file__)

    # Entity-Customer
    configProperties = conf_ConfigParser()
    configFile = __file__.replace('/dags/', '/config/').replace('.py', '.config')
    if not os_path.exists(configFile):
        raise Exception("Entity config file does not exist. [%s]" % configFile)
    print("Entity config File: %s" % configFile)
    configProperties.read(configFile)

    return (configProperties)


def func_pandas_factory(__col_names, __rows):
    print("Row_Factory")

    from cassandra import util as cass_util
    # Convert tuple items of '__rows' into list (elements of tuples cannot be replaced)
    __rows = [list(i) for i in __rows]
    # Convert only 'OrderedMapSerializedKey' type list elements into dict
    for idx_row, i_row in enumerate(__rows):
        for idx_value, i_value in enumerate(i_row):
            if type(i_value) is cass_util.OrderedMapSerializedKey:
                __rows[idx_row][idx_value] = dict(__rows[idx_row][idx_value])
    return pd_DataFrame(__rows, columns=__col_names)


# endDef==>func_pandas_factory()


def fn_processInteraction(**kwargs):
    # def fn_processInteraction(args):
    # __file__="/app/server/HOBS-DataPipeline/modules/crms/dags/migrateInteraction.py"
    try:
        configProperties = getConfiguration(__file__)
        adminVariableValue = {}
        df_src_interaction = pd_DataFrame()
        df_tgt_interaction = pd_DataFrame()
        df_ref_outcome_ref = pd_DataFrame()
        df_ref_inter_ref = pd_DataFrame()
        df_ref_inter_type = pd_DataFrame()
        df_ref_inter_subtype = pd_DataFrame()
        df_ref_inter_catg = pd_DataFrame()
        validkeys = json_loads(configProperties.get('dagconfiguration', 'validkeys', raw=True))
        time_stamp = time.strftime("%Y%m%d-%H%M%S")

        log_path = os_environ.get('AIRFLOW_HOME')
        if not log_path:
            raise Exception("AIRFLOW_HOME not configured.")

        cassandra_path = os_environ.get('CASSANDRA_HOME')
        if not cassandra_path:
            raise Exception("CASSANDRA_HOME not configured.")

        dagIdVar = os_path.basename(__file__).replace(".py", "")
        if Variable.get(dagIdVar, None):
            adminVariableValue = Variable.get(dagIdVar, None)
            print('Airflow UI variable value assigned {%s: %s}' % (dagIdVar, adminVariableValue))

        if not adminVariableValue:
            raise Exception("adminVariableValue{%s} value not set." % dagIdVar)
        adminVariableValue = json_loads(adminVariableValue)
        args = adminVariableValue.copy()

        print("Type of validkeys  : " + str(validkeys) + "type(validkeys): " + str(type(validkeys)))
        print("Type of args.keys(): " + str(set(list(args.keys()))))

        if not (len(list(args.keys())) > 0 and set(list(args.keys())).issubset(set(validkeys))):
            print("ERROR: Arguments are invalid and invalid arguments are %s." % ','.join(
                set(list(args.keys())) - set(validkeys)) + ".")
            raise Exception(("Arguments are invalid and invalid arguments are %s." % ','.join(
                set(list(args.keys())) - set(validkeys)) + "."))

        # ------------------- Target Connection Cassandra
        dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
        if not dbConnectRetVal[0]:
            raise Exception("Cassandra database connection failed: %s" % dbConnectRetVal[1])
        cassDbSession = dbConnectRetVal[1]
        print("Cassandra Connected %s" % cassDbSession)

        dbConnectRetVal = setDbEngine(pac_encoding=configProperties.get('dagconfiguration', 'charset', raw=True))
        if not dbConnectRetVal[0]:
            raise Exception("Source database connection failed: %s" % dbConnectRetVal[1])
        db_engine = dbConnectRetVal[1]
        print("Source database connection established")
        print(db_engine)



        return_val = formatSQLQuery(configProperties.get('queries', 'outcome_ref', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
        query_outcome_ref = return_val[1]

        return_val = formatSQLQuery(configProperties.get('queries', 'inter_type', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: query_inter_type failed: %s" % return_val[1])
        query_inter_type = return_val[1]

        return_val = formatSQLQuery(configProperties.get('queries', 'source_interaction', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: query_src_intrctn failed: %s" % return_val[1])
        query_src_intrctn = return_val[1]

        return_val = formatSQLQuery(configProperties.get('queries', 'interaction_ref', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
        query_interaction_ref = return_val[1]

        df_ref_outcome_ref = pd_read_sql(query_outcome_ref, con=db_engine)
        df_ref_outcome_ref.columns = df_ref_outcome_ref.columns.str.lower()
        print("Outcome Ref          : %s " % len(df_ref_outcome_ref))

        df_ref_interaction_ref = pd_read_sql(query_interaction_ref, con=db_engine)
        df_ref_interaction_ref.columns = df_ref_interaction_ref.columns.str.lower()
        print("Outcome Ref          : %s " % len(df_ref_outcome_ref))

        df_ref_inter_type = pd_read_sql(query_inter_type, con=db_engine)
        df_ref_inter_type.columns = df_ref_inter_type.columns.str.lower()
        print("interType Ref        : %s " % len(df_ref_inter_type))

        df_ref_inter_subtype = df_ref_inter_type.copy()
        df_ref_inter_subtype.rename(columns={"hosi_interaction_type_id": "hosi_interaction_subtype_id",
                                             "hosi_interaction_type_desc": "hosi_interaction_subtype_desc"},
                                    inplace=True)
        print("interSubType Ref     : %s " % len(df_ref_inter_subtype))

        df_ref_inter_catg = df_ref_inter_type.copy()
        df_ref_inter_catg.rename(columns={"hosi_interaction_type_id": "hosi_interaction_category_id",
                                          "hosi_interaction_type_desc": "hosi_interaction_category_desc"}, inplace=True)
        print("interCateg Ref       : %s " % len(df_ref_inter_catg))

        df_src_interaction = pd_read_sql(query_src_intrctn, con=db_engine)
        df_src_interaction.columns = df_src_interaction.columns.str.lower()
        df_src_interaction.rename(columns={"hosi_interaction_correl_id": "hosi_interaction_correlation_id",
                                           "hosi_interaction_senti_score": "hosi_interaction_sentiment_score"},
                                  inplace=True)
        # if 'hosi_updated_by' in df_src_interaction.columns:
        #    df_src_interaction['hosi_updated_by'] = 'migrationJob'
        df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace(r'\\n',
                                                                                                                ' ',
                                                                                                                regex=True)
        df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace('"', '')
        print("[01]Source Interaction      : %s" % len(df_src_interaction))

        #  tcareinteractionoutcome
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_outcome_ref
                                      , how='left'
                                      , on=['hosi_interaction_outcome', 'hosi_op_id', 'hosi_bu_id']
                                      )
        print("[02]tcareinteractionoutcome : %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        #  tcareinteractiontype
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_inter_type
                                      , how='left'
                                      , on=['hosi_op_id', 'hosi_bu_id']
                                      )
        print("[03]tcareinteractiontype    : %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_interaction_ref
                                      , how='left'
                                      , on=['hosi_customer_id', 'hosi_op_id', 'hosi_bu_id']
                                      )

        # tcareinteractionsubtype
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_inter_subtype
                                      , how='left'
                                      , on=[ 'hosi_op_id', 'hosi_bu_id']
                                      )
        print("[04]tcareinteractionsubtype : %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        # tcareinteractioncategory
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_inter_catg
                                      , how='left'
                                      , on=['hosi_interaction_category_id', 'hosi_op_id', 'hosi_bu_id']
                                      )
        print("[05]tcareinteractioncategory: %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        skip_target_check = configProperties.get('dagconfiguration', 'skip_target_check',
                                                 raw=True).upper() if configProperties.has_option('dagconfiguration',
                                                                                                  'skip_target_check') else "YES"
        print("skip_target_check           : %s" % skip_target_check)
        if skip_target_check == "NO":
            return_val = formatSQLQuery(configProperties.get('queries', 'target_interaction', raw=True), args)
            if not return_val[0]:
                raise Exception("Target database connection failed: " + return_val[1])
            query_tgt_intrctn = return_val[1]

            df_tgt_interaction = cassDbSession.execute(query_tgt_intrctn, timeout=None)
            df_tgt_interaction = df_tgt_interaction._current_rows
            df_tgt_interaction.columns = df_tgt_interaction.columns.str.lower()
            print("Target Interaction          : %s " % len(df_tgt_interaction))

            df_final = pd_merge(df_src_interaction
                                , df_tgt_interaction[
                                    ['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']]
                                , how='left'
                                , left_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
                                , right_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
                                , indicator='tc_migration_status'
                                )
            df_final = df_final.loc[df_final['tc_migration_status'] == 'left_only']
            df_final.drop(['tc_migration_status'], axis=1, inplace=True)
            print("df_final Interaction        : %s " % len(df_final))
        else:
            df_final = df_src_interaction.copy()
        # endIf==>skip_target_check

        if len(df_final) > 0:
            df_final['hosi_customer_party_nbr'] = df_final['hosi_customer_party_nbr'].fillna(-99999).astype('int64')
            # df_final['hosi_interaction_date'] = pd_to_datetime(df_final['hosi_interaction_date'], format="%Y-%m-%d %H%:M:%S")
            # df_final['hosi_created_date'] = pd_to_datetime(df_final['hosi_created_date'], format="%Y-%m-%d %H%:M:%S")
            # df_final['hosi_updated_date'] = pd_to_datetime(df_final['hosi_updated_date'], format="%Y-%m-%d %H%:M:%S")
            print("Interaction: %s [Selected]" % len(df_final))
            df_final.dropna(how='all', inplace=True)
            print("Interaction: %s [DropNa-All]" % len(df_final))
            # df_final.dropna(subset = ['hosi_interaction_id'])
            df_final = df_final.copy()[df_final['hosi_interaction_id'].notnull()]
            print("Interaction: %s [DropNa-hosi_interaction_id]" % len(df_final))
            print("Final Interaction: %s " % len(df_final))
            print("type(df_final.columns): %s %s" % (type(list(df_final.columns)), len(list(df_final.columns))))
            __cass_cols = ','.join(df_final.columns)

            # CSV COPY Loading
            print("Preparing data to load into Cassandra via shell command")
            if not loadDataFrameToCassandra(connection_id="cassandrahost"
                    , table_name="hos_interaction"
                    , df_data=df_final
                    , load_type_cqlldr=True
                    , dag_name="migrateInteraction"
                    , file_suffix="_" + str(time_stamp)
                                            ):
                raise Exception("ODS data store load failed.")

            """
            log_path = log_path + '/logs'
            cass_file_csv = log_path + '/hos_interaction_'+time_stamp+'.csv'
            cass_file_loader = log_path + '/hos_interaction_'+time_stamp+'.cqlldr'
            cass_file_log = log_path + '/hos_interaction_'+time_stamp+'.log'
            cass_file_err = log_path + '/hos_interaction_'+time_stamp+'.err'
            api_err = log_path + '/migrateInteraction_'+time_stamp+'.err'
            print("\n%s\n%s\n%s\n%s\n%s" % (log_path, cass_file_csv, cass_file_loader, cass_file_log, cassandra_path))

            print("Generating csv file for loading data")
            df_final.to_csv(sep='|', index=False, header=True, path_or_buf=cass_file_csv)

            __cass_cslldr = open(cass_file_loader, "w")
            #__cass_cslldr.write("copy hobs_ods_staging.hos_interaction (" + __cass_cols + ") FROM '" + cass_file_csv + "' WITH DELIMITER='|' AND ERRFILE='" + cass_file_err + "'AND HEADER=True AND PREPAREDSTATEMENTS=False AND CHUNKSIZE=1000 AND INGESTRATE=100000 AND MAXBATCHSIZE=20 AND MAXATTEMPTS=1 AND NUMPROCESSES=1;")
            __cass_cslldr.write("copy hobs_ods_staging.hos_interaction (" + __cass_cols + ") FROM '" + cass_file_csv + "' WITH DELIMITER='|' AND ERRFILE='" + cass_file_err + "'AND HEADER=True AND PREPAREDSTATEMENTS=False AND CHUNKSIZE=50 AND INGESTRATE=10000 AND MAXBATCHSIZE=20 AND MAXATTEMPTS=1 AND NUMPROCESSES=1;")
            __cass_cslldr.close()

            from airflow.hooks.base import BaseHook
            conn = BaseHook.get_connection("cassandrahost")
            connection_type = conn.conn_type
            hostname = conn.host
            schema = conn.schema
            login_name = conn.login
            login_password = conn.password
            port_number = conn.port

            __cmd = cassandra_path + '/bin/cqlsh ' + hostname + ' ' \
                     + str(port_number) \
                     + ' -u ' + login_name \
                     + ' -p ' + login_password \
                     + ' -k ' + schema \
                     + ' -f ' + cass_file_loader + ' > '  + cass_file_log

            print("COPY command \n"+ __cmd)
            print("Data loading started into Cassandra via shell command")

            __shell = Popen(__cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
            __output, __err = __shell.communicate()
            __pid = str(__shell.pid)
            print("cassandra dataload processId:"+__pid)
            print("__output, __err ")
            print(__shell.returncode)
            print(__output)

            print("Reportdetails-start-interactionData")
            print("Reportdetails-rows-migratedCount-%s" % str(len(df_final)))

            if __shell.returncode == 0:
                print("cassandra dataload submitted")
                __pidfile = log_path + '/migrateInteraction.'+__pid + '.pid'
                if os_path.isfile(__pidfile):
                    print("cassandra dataload InProgress")
                else:
                    print("cassandra dataload completed")
            else:
                raise Exception("ODS data store load failed")
            # endIf==>___shell_returncode___
            """
        else:
            print("NO DATA TO LOAD")
        # endIf==>_df_final_greaterthan_zero___
        print("Successfully Completed the process")
    except Exception as exp:
        print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))


# ********************* DAG Tasks********************************************************************************************#

default_args = {'owner': 'airflow'
    , 'depends_on_past': False
    , 'start_date': days_ago(2)
                }

dag = DAG("migrateInteraction"
          , catchup=False
          , default_args=default_args
          , description="migrateInteraction"
          , schedule_interval="@once"
          , tags=["odsBulkMigration"]
          , params={"language": "ENG", "opId": "HOB", "buId": "DEFAULT"}
          )

operatorStageData = PythonOperator(task_id='task_migrateInteraction'
                                   , python_callable=fn_processInteraction
                                   , dag=dag
                                   , provide_context=True
                                   )

# ********************* DAG Sequence*******************************************************************************************#

operatorStageData

"""
if __name__ == '__main__':
    fn_processInteraction({"language":"ENG","opId":"HOB","buId":"DEFAULT"})
"""
Editor is loading...