Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
12 kB
1
Indexable
Never
def fn_processInteraction(**kwargs):
    try:
        configProperties = getConfiguration(__file__)
        adminVariableValue = {}
        df_src_interaction = pd_DataFrame()
        df_tgt_interaction = pd_DataFrame()
        df_ref_outcome_ref = pd_DataFrame()
        df_ref_inter_ref = pd_DataFrame()
        df_ref_inter_type = pd_DataFrame()
        df_ref_inter_subtype = pd_DataFrame()
        df_ref_inter_catg = pd_DataFrame()
        validkeys = json_loads(configProperties.get('dagconfiguration', 'validkeys', raw=True))
        time_stamp = time.strftime("%Y%m%d-%H%M%S")

        log_path = os_environ.get('AIRFLOW_HOME')
        if not log_path:
            raise Exception("AIRFLOW_HOME not configured.")

        cassandra_path = os_environ.get('CASSANDRA_HOME')
        if not cassandra_path:
            raise Exception("CASSANDRA_HOME not configured.")

        dagIdVar = os_path.basename(__file__).replace(".py", "")
        if Variable.get(dagIdVar, None):
            adminVariableValue = Variable.get(dagIdVar, None)
            print('Airflow UI variable value assigned {%s: %s}' % (dagIdVar, adminVariableValue))

        if not adminVariableValue:
            raise Exception("adminVariableValue{%s} value not set." % dagIdVar)
        adminVariableValue = json_loads(adminVariableValue)
        args = adminVariableValue.copy()

        print("Type of validkeys  : " + str(validkeys) + "type(validkeys): " + str(type(validkeys)))
        print("Type of args.keys(): " + str(set(list(args.keys()))))

        if not (len(list(args.keys())) > 0 and set(list(args.keys())).issubset(set(validkeys))):
            print("ERROR: Arguments are invalid and invalid arguments are %s." % ','.join(
                set(list(args.keys())) - set(validkeys)) + ".")
            raise Exception(("Arguments are invalid and invalid arguments are %s." % ','.join(
                set(list(args.keys())) - set(validkeys)) + "."))

        # ------------------- Target Connection Cassandra
        dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
        if not dbConnectRetVal[0]:
            raise Exception("Cassandra database connection failed: %s" % dbConnectRetVal[1])
        cassDbSession = dbConnectRetVal[1]
        print("Cassandra Connected %s" % cassDbSession)

        dbConnectRetVal = setDbEngine(pac_encoding=configProperties.get('dagconfiguration', 'charset', raw=True))
        if not dbConnectRetVal[0]:
            raise Exception("Source database connection failed: %s" % dbConnectRetVal[1])
        db_engine = dbConnectRetVal[1]
        print("Source database connection established")
        print(db_engine)



        return_val = formatSQLQuery(configProperties.get('queries', 'outcome_ref', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
        query_outcome_ref = return_val[1]

        return_val = formatSQLQuery(configProperties.get('queries', 'inter_type', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: query_inter_type failed: %s" % return_val[1])
        query_inter_type = return_val[1]

        return_val = formatSQLQuery(configProperties.get('queries', 'source_interaction', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: query_src_intrctn failed: %s" % return_val[1])
        query_src_intrctn = return_val[1]

        return_val = formatSQLQuery(configProperties.get('queries', 'interaction_ref', raw=True), args)
        if not return_val[0]:
            raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
        query_interaction_ref = return_val[1]

        df_ref_outcome_ref = pd_read_sql(query_outcome_ref, con=db_engine)
        df_ref_outcome_ref.columns = df_ref_outcome_ref.columns.str.lower()
        print("Outcome Ref          : %s " % len(df_ref_outcome_ref))

        df_ref_interaction_ref = pd_read_sql(query_interaction_ref, con=db_engine)
        df_ref_interaction_ref.columns = df_ref_interaction_ref.columns.str.lower()
        print("Outcome Ref          : %s " % len(df_ref_outcome_ref))

        df_ref_inter_type = pd_read_sql(query_inter_type, con=db_engine)
        df_ref_inter_type.columns = df_ref_inter_type.columns.str.lower()
        print("interType Ref        : %s " % len(df_ref_inter_type))

        df_ref_inter_subtype = df_ref_inter_type.copy()
        df_ref_inter_subtype.rename(columns={"hosi_interaction_type_id": "hosi_interaction_subtype_id",
                                             "hosi_interaction_type_desc": "hosi_interaction_subtype_desc"},
                                    inplace=True)
        print("interSubType Ref     : %s " % len(df_ref_inter_subtype))

        df_ref_inter_catg = df_ref_inter_type.copy()
        df_ref_inter_catg.rename(columns={"hosi_interaction_type_id": "hosi_interaction_category_id",
                                          "hosi_interaction_type_desc": "hosi_interaction_category_desc"}, inplace=True)
        print("interCateg Ref       : %s " % len(df_ref_inter_catg))

        df_src_interaction = pd_read_sql(query_src_intrctn, con=db_engine)
        df_src_interaction.columns = df_src_interaction.columns.str.lower()
        df_src_interaction.rename(columns={"hosi_interaction_correl_id": "hosi_interaction_correlation_id",
                                           "hosi_interaction_senti_score": "hosi_interaction_sentiment_score"},
                                  inplace=True)
        df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace(r'\\n',
                                                                                                                ' ',
                                                                                                                regex=True)
        df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace('"', '')
        print("[01]Source Interaction      : %s" % len(df_src_interaction))

        #  tcareinteractionoutcome
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_outcome_ref
                                      , how='left'
                                      , on=['hosi_interaction_outcome', 'hosi_op_id', 'hosi_bu_id']
                                      )
        print("[02]tcareinteractionoutcome : %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        #  tcareinteractiontype
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_inter_type
                                      , how='left'
                                      , on=['hosi_op_id', 'hosi_bu_id']
                                      )
        print("[03]tcareinteractiontype    : %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_interaction_ref
                                      , how='left'
                                      , on=['hosi_customer_id', 'hosi_op_id', 'hosi_bu_id']
                                      )

        # tcareinteractionsubtype
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_inter_subtype
                                      , how='left'
                                      , on=[ 'hosi_op_id', 'hosi_bu_id']
                                      )
        print("[04]tcareinteractionsubtype : %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        # tcareinteractioncategory
        df_src_interaction = pd_merge(df_src_interaction
                                      , df_ref_inter_catg
                                      , how='left'
                                      , on=['hosi_interaction_category_id', 'hosi_op_id', 'hosi_bu_id']
                                      )
        print("[05]tcareinteractioncategory: %s" % len(df_src_interaction))
        # print("Columns\n%s" % list(df_src_interaction.columns))

        skip_target_check = configProperties.get('dagconfiguration', 'skip_target_check',
                                                 raw=True).upper() if configProperties.has_option('dagconfiguration',
                                                                                                  'skip_target_check') else "YES"
        print("skip_target_check           : %s" % skip_target_check)
        if skip_target_check == "NO":
            return_val = formatSQLQuery(configProperties.get('queries', 'target_interaction', raw=True), args)
            if not return_val[0]:
                raise Exception("Target database connection failed: " + return_val[1])
            query_tgt_intrctn = return_val[1]

            df_tgt_interaction = cassDbSession.execute(query_tgt_intrctn, timeout=None)
            df_tgt_interaction = df_tgt_interaction._current_rows
            df_tgt_interaction.columns = df_tgt_interaction.columns.str.lower()
            print("Target Interaction          : %s " % len(df_tgt_interaction))

            df_final = pd_merge(df_src_interaction
                                , df_tgt_interaction[
                                    ['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']]
                                , how='left'
                                , left_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
                                , right_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
                                , indicator='tc_migration_status'
                                )
            df_final = df_final.loc[df_final['tc_migration_status'] == 'left_only']
            df_final.drop(['tc_migration_status'], axis=1, inplace=True)
            print("df_final Interaction        : %s " % len(df_final))
        else:
            df_final = df_src_interaction.copy()
        # endIf==>skip_target_check

        if len(df_final) > 0:
            df_final['hosi_customer_party_nbr'] = df_final['hosi_customer_party_nbr'].fillna(-99999).astype('int64')
            # df_final['hosi_interaction_date'] = pd_to_datetime(df_final['hosi_interaction_date'], format="%Y-%m-%d %H%:M:%S")
            # df_final['hosi_created_date'] = pd_to_datetime(df_final['hosi_created_date'], format="%Y-%m-%d %H%:M:%S")
            # df_final['hosi_updated_date'] = pd_to_datetime(df_final['hosi_updated_date'], format="%Y-%m-%d %H%:M:%S")
            print("Interaction: %s [Selected]" % len(df_final))
            df_final.dropna(how='all', inplace=True)
            print("Interaction: %s [DropNa-All]" % len(df_final))
            # df_final.dropna(subset = ['hosi_interaction_id'])
            df_final = df_final.copy()[df_final['hosi_interaction_id'].notnull()]
            print("Interaction: %s [DropNa-hosi_interaction_id]" % len(df_final))
            print("Final Interaction: %s " % len(df_final))
            print("type(df_final.columns): %s %s" % (type(list(df_final.columns)), len(list(df_final.columns))))
            __cass_cols = ','.join(df_final.columns)

            # CSV COPY Loading
            print("Preparing data to load into Cassandra via shell command")
            if not loadDataFrameToCassandra(connection_id="cassandrahost"
                    , table_name="hos_interaction"
                    , df_data=df_final
                    , load_type_cqlldr=True
                    , dag_name="migrateInteraction"
                    , file_suffix="_" + str(time_stamp)
                                            ):
                raise Exception("ODS data store load failed.")
        else:
            print("NO DATA TO LOAD")
        # endIf==>_df_final_greaterthan_zero___
        print("Successfully Completed the process")
    except Exception as exp:
        print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))