Untitled
unknown
plain_text
2 years ago
12 kB
4
Indexable
def fn_processInteraction(**kwargs): try: configProperties = getConfiguration(__file__) adminVariableValue = {} df_src_interaction = pd_DataFrame() df_tgt_interaction = pd_DataFrame() df_ref_outcome_ref = pd_DataFrame() df_ref_inter_ref = pd_DataFrame() df_ref_inter_type = pd_DataFrame() df_ref_inter_subtype = pd_DataFrame() df_ref_inter_catg = pd_DataFrame() validkeys = json_loads(configProperties.get('dagconfiguration', 'validkeys', raw=True)) time_stamp = time.strftime("%Y%m%d-%H%M%S") log_path = os_environ.get('AIRFLOW_HOME') if not log_path: raise Exception("AIRFLOW_HOME not configured.") cassandra_path = os_environ.get('CASSANDRA_HOME') if not cassandra_path: raise Exception("CASSANDRA_HOME not configured.") dagIdVar = os_path.basename(__file__).replace(".py", "") if Variable.get(dagIdVar, None): adminVariableValue = Variable.get(dagIdVar, None) print('Airflow UI variable value assigned {%s: %s}' % (dagIdVar, adminVariableValue)) if not adminVariableValue: raise Exception("adminVariableValue{%s} value not set." % dagIdVar) adminVariableValue = json_loads(adminVariableValue) args = adminVariableValue.copy() print("Type of validkeys : " + str(validkeys) + "type(validkeys): " + str(type(validkeys))) print("Type of args.keys(): " + str(set(list(args.keys())))) if not (len(list(args.keys())) > 0 and set(list(args.keys())).issubset(set(validkeys))): print("ERROR: Arguments are invalid and invalid arguments are %s." % ','.join( set(list(args.keys())) - set(validkeys)) + ".") raise Exception(("Arguments are invalid and invalid arguments are %s." % ','.join( set(list(args.keys())) - set(validkeys)) + ".")) # ------------------- Target Connection Cassandra dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True) if not dbConnectRetVal[0]: raise Exception("Cassandra database connection failed: %s" % dbConnectRetVal[1]) cassDbSession = dbConnectRetVal[1] print("Cassandra Connected %s" % cassDbSession) dbConnectRetVal = setDbEngine(pac_encoding=configProperties.get('dagconfiguration', 'charset', raw=True)) if not dbConnectRetVal[0]: raise Exception("Source database connection failed: %s" % dbConnectRetVal[1]) db_engine = dbConnectRetVal[1] print("Source database connection established") print(db_engine) return_val = formatSQLQuery(configProperties.get('queries', 'outcome_ref', raw=True), args) if not return_val[0]: raise Exception("ERROR: outcome_ref failed: %s" % return_val[1]) query_outcome_ref = return_val[1] return_val = formatSQLQuery(configProperties.get('queries', 'inter_type', raw=True), args) if not return_val[0]: raise Exception("ERROR: query_inter_type failed: %s" % return_val[1]) query_inter_type = return_val[1] return_val = formatSQLQuery(configProperties.get('queries', 'source_interaction', raw=True), args) if not return_val[0]: raise Exception("ERROR: query_src_intrctn failed: %s" % return_val[1]) query_src_intrctn = return_val[1] return_val = formatSQLQuery(configProperties.get('queries', 'interaction_ref', raw=True), args) if not return_val[0]: raise Exception("ERROR: outcome_ref failed: %s" % return_val[1]) query_interaction_ref = return_val[1] df_ref_outcome_ref = pd_read_sql(query_outcome_ref, con=db_engine) df_ref_outcome_ref.columns = df_ref_outcome_ref.columns.str.lower() print("Outcome Ref : %s " % len(df_ref_outcome_ref)) df_ref_interaction_ref = pd_read_sql(query_interaction_ref, con=db_engine) df_ref_interaction_ref.columns = df_ref_interaction_ref.columns.str.lower() print("Outcome Ref : %s " % len(df_ref_outcome_ref)) df_ref_inter_type = pd_read_sql(query_inter_type, con=db_engine) df_ref_inter_type.columns = df_ref_inter_type.columns.str.lower() print("interType Ref : %s " % len(df_ref_inter_type)) df_ref_inter_subtype = df_ref_inter_type.copy() df_ref_inter_subtype.rename(columns={"hosi_interaction_type_id": "hosi_interaction_subtype_id", "hosi_interaction_type_desc": "hosi_interaction_subtype_desc"}, inplace=True) print("interSubType Ref : %s " % len(df_ref_inter_subtype)) df_ref_inter_catg = df_ref_inter_type.copy() df_ref_inter_catg.rename(columns={"hosi_interaction_type_id": "hosi_interaction_category_id", "hosi_interaction_type_desc": "hosi_interaction_category_desc"}, inplace=True) print("interCateg Ref : %s " % len(df_ref_inter_catg)) df_src_interaction = pd_read_sql(query_src_intrctn, con=db_engine) df_src_interaction.columns = df_src_interaction.columns.str.lower() df_src_interaction.rename(columns={"hosi_interaction_correl_id": "hosi_interaction_correlation_id", "hosi_interaction_senti_score": "hosi_interaction_sentiment_score"}, inplace=True) df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace(r'\\n', ' ', regex=True) df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace('"', '') print("[01]Source Interaction : %s" % len(df_src_interaction)) # tcareinteractionoutcome df_src_interaction = pd_merge(df_src_interaction , df_ref_outcome_ref , how='left' , on=['hosi_interaction_outcome', 'hosi_op_id', 'hosi_bu_id'] ) print("[02]tcareinteractionoutcome : %s" % len(df_src_interaction)) # print("Columns\n%s" % list(df_src_interaction.columns)) # tcareinteractiontype df_src_interaction = pd_merge(df_src_interaction , df_ref_inter_type , how='left' , on=['hosi_op_id', 'hosi_bu_id'] ) print("[03]tcareinteractiontype : %s" % len(df_src_interaction)) # print("Columns\n%s" % list(df_src_interaction.columns)) df_src_interaction = pd_merge(df_src_interaction , df_ref_interaction_ref , how='left' , on=['hosi_customer_id', 'hosi_op_id', 'hosi_bu_id'] ) # tcareinteractionsubtype df_src_interaction = pd_merge(df_src_interaction , df_ref_inter_subtype , how='left' , on=[ 'hosi_op_id', 'hosi_bu_id'] ) print("[04]tcareinteractionsubtype : %s" % len(df_src_interaction)) # print("Columns\n%s" % list(df_src_interaction.columns)) # tcareinteractioncategory df_src_interaction = pd_merge(df_src_interaction , df_ref_inter_catg , how='left' , on=['hosi_interaction_category_id', 'hosi_op_id', 'hosi_bu_id'] ) print("[05]tcareinteractioncategory: %s" % len(df_src_interaction)) # print("Columns\n%s" % list(df_src_interaction.columns)) skip_target_check = configProperties.get('dagconfiguration', 'skip_target_check', raw=True).upper() if configProperties.has_option('dagconfiguration', 'skip_target_check') else "YES" print("skip_target_check : %s" % skip_target_check) if skip_target_check == "NO": return_val = formatSQLQuery(configProperties.get('queries', 'target_interaction', raw=True), args) if not return_val[0]: raise Exception("Target database connection failed: " + return_val[1]) query_tgt_intrctn = return_val[1] df_tgt_interaction = cassDbSession.execute(query_tgt_intrctn, timeout=None) df_tgt_interaction = df_tgt_interaction._current_rows df_tgt_interaction.columns = df_tgt_interaction.columns.str.lower() print("Target Interaction : %s " % len(df_tgt_interaction)) df_final = pd_merge(df_src_interaction , df_tgt_interaction[ ['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']] , how='left' , left_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id'] , right_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id'] , indicator='tc_migration_status' ) df_final = df_final.loc[df_final['tc_migration_status'] == 'left_only'] df_final.drop(['tc_migration_status'], axis=1, inplace=True) print("df_final Interaction : %s " % len(df_final)) else: df_final = df_src_interaction.copy() # endIf==>skip_target_check if len(df_final) > 0: df_final['hosi_customer_party_nbr'] = df_final['hosi_customer_party_nbr'].fillna(-99999).astype('int64') # df_final['hosi_interaction_date'] = pd_to_datetime(df_final['hosi_interaction_date'], format="%Y-%m-%d %H%:M:%S") # df_final['hosi_created_date'] = pd_to_datetime(df_final['hosi_created_date'], format="%Y-%m-%d %H%:M:%S") # df_final['hosi_updated_date'] = pd_to_datetime(df_final['hosi_updated_date'], format="%Y-%m-%d %H%:M:%S") print("Interaction: %s [Selected]" % len(df_final)) df_final.dropna(how='all', inplace=True) print("Interaction: %s [DropNa-All]" % len(df_final)) # df_final.dropna(subset = ['hosi_interaction_id']) df_final = df_final.copy()[df_final['hosi_interaction_id'].notnull()] print("Interaction: %s [DropNa-hosi_interaction_id]" % len(df_final)) print("Final Interaction: %s " % len(df_final)) print("type(df_final.columns): %s %s" % (type(list(df_final.columns)), len(list(df_final.columns)))) __cass_cols = ','.join(df_final.columns) # CSV COPY Loading print("Preparing data to load into Cassandra via shell command") if not loadDataFrameToCassandra(connection_id="cassandrahost" , table_name="hos_interaction" , df_data=df_final , load_type_cqlldr=True , dag_name="migrateInteraction" , file_suffix="_" + str(time_stamp) ): raise Exception("ODS data store load failed.") else: print("NO DATA TO LOAD") # endIf==>_df_final_greaterthan_zero___ print("Successfully Completed the process") except Exception as exp: print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
Editor is loading...