Untitled
unknown
plain_text
3 years ago
12 kB
5
Indexable
def fn_processInteraction(**kwargs):
try:
configProperties = getConfiguration(__file__)
adminVariableValue = {}
df_src_interaction = pd_DataFrame()
df_tgt_interaction = pd_DataFrame()
df_ref_outcome_ref = pd_DataFrame()
df_ref_inter_ref = pd_DataFrame()
df_ref_inter_type = pd_DataFrame()
df_ref_inter_subtype = pd_DataFrame()
df_ref_inter_catg = pd_DataFrame()
validkeys = json_loads(configProperties.get('dagconfiguration', 'validkeys', raw=True))
time_stamp = time.strftime("%Y%m%d-%H%M%S")
log_path = os_environ.get('AIRFLOW_HOME')
if not log_path:
raise Exception("AIRFLOW_HOME not configured.")
cassandra_path = os_environ.get('CASSANDRA_HOME')
if not cassandra_path:
raise Exception("CASSANDRA_HOME not configured.")
dagIdVar = os_path.basename(__file__).replace(".py", "")
if Variable.get(dagIdVar, None):
adminVariableValue = Variable.get(dagIdVar, None)
print('Airflow UI variable value assigned {%s: %s}' % (dagIdVar, adminVariableValue))
if not adminVariableValue:
raise Exception("adminVariableValue{%s} value not set." % dagIdVar)
adminVariableValue = json_loads(adminVariableValue)
args = adminVariableValue.copy()
print("Type of validkeys : " + str(validkeys) + "type(validkeys): " + str(type(validkeys)))
print("Type of args.keys(): " + str(set(list(args.keys()))))
if not (len(list(args.keys())) > 0 and set(list(args.keys())).issubset(set(validkeys))):
print("ERROR: Arguments are invalid and invalid arguments are %s." % ','.join(
set(list(args.keys())) - set(validkeys)) + ".")
raise Exception(("Arguments are invalid and invalid arguments are %s." % ','.join(
set(list(args.keys())) - set(validkeys)) + "."))
# ------------------- Target Connection Cassandra
dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
if not dbConnectRetVal[0]:
raise Exception("Cassandra database connection failed: %s" % dbConnectRetVal[1])
cassDbSession = dbConnectRetVal[1]
print("Cassandra Connected %s" % cassDbSession)
dbConnectRetVal = setDbEngine(pac_encoding=configProperties.get('dagconfiguration', 'charset', raw=True))
if not dbConnectRetVal[0]:
raise Exception("Source database connection failed: %s" % dbConnectRetVal[1])
db_engine = dbConnectRetVal[1]
print("Source database connection established")
print(db_engine)
return_val = formatSQLQuery(configProperties.get('queries', 'outcome_ref', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
query_outcome_ref = return_val[1]
return_val = formatSQLQuery(configProperties.get('queries', 'inter_type', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: query_inter_type failed: %s" % return_val[1])
query_inter_type = return_val[1]
return_val = formatSQLQuery(configProperties.get('queries', 'source_interaction', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: query_src_intrctn failed: %s" % return_val[1])
query_src_intrctn = return_val[1]
return_val = formatSQLQuery(configProperties.get('queries', 'interaction_ref', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
query_interaction_ref = return_val[1]
df_ref_outcome_ref = pd_read_sql(query_outcome_ref, con=db_engine)
df_ref_outcome_ref.columns = df_ref_outcome_ref.columns.str.lower()
print("Outcome Ref : %s " % len(df_ref_outcome_ref))
df_ref_interaction_ref = pd_read_sql(query_interaction_ref, con=db_engine)
df_ref_interaction_ref.columns = df_ref_interaction_ref.columns.str.lower()
print("Outcome Ref : %s " % len(df_ref_outcome_ref))
df_ref_inter_type = pd_read_sql(query_inter_type, con=db_engine)
df_ref_inter_type.columns = df_ref_inter_type.columns.str.lower()
print("interType Ref : %s " % len(df_ref_inter_type))
df_ref_inter_subtype = df_ref_inter_type.copy()
df_ref_inter_subtype.rename(columns={"hosi_interaction_type_id": "hosi_interaction_subtype_id",
"hosi_interaction_type_desc": "hosi_interaction_subtype_desc"},
inplace=True)
print("interSubType Ref : %s " % len(df_ref_inter_subtype))
df_ref_inter_catg = df_ref_inter_type.copy()
df_ref_inter_catg.rename(columns={"hosi_interaction_type_id": "hosi_interaction_category_id",
"hosi_interaction_type_desc": "hosi_interaction_category_desc"}, inplace=True)
print("interCateg Ref : %s " % len(df_ref_inter_catg))
df_src_interaction = pd_read_sql(query_src_intrctn, con=db_engine)
df_src_interaction.columns = df_src_interaction.columns.str.lower()
df_src_interaction.rename(columns={"hosi_interaction_correl_id": "hosi_interaction_correlation_id",
"hosi_interaction_senti_score": "hosi_interaction_sentiment_score"},
inplace=True)
df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace(r'\\n',
' ',
regex=True)
df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace('"', '')
print("[01]Source Interaction : %s" % len(df_src_interaction))
# tcareinteractionoutcome
df_src_interaction = pd_merge(df_src_interaction
, df_ref_outcome_ref
, how='left'
, on=['hosi_interaction_outcome', 'hosi_op_id', 'hosi_bu_id']
)
print("[02]tcareinteractionoutcome : %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
# tcareinteractiontype
df_src_interaction = pd_merge(df_src_interaction
, df_ref_inter_type
, how='left'
, on=['hosi_op_id', 'hosi_bu_id']
)
print("[03]tcareinteractiontype : %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
df_src_interaction = pd_merge(df_src_interaction
, df_ref_interaction_ref
, how='left'
, on=['hosi_customer_id', 'hosi_op_id', 'hosi_bu_id']
)
# tcareinteractionsubtype
df_src_interaction = pd_merge(df_src_interaction
, df_ref_inter_subtype
, how='left'
, on=[ 'hosi_op_id', 'hosi_bu_id']
)
print("[04]tcareinteractionsubtype : %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
# tcareinteractioncategory
df_src_interaction = pd_merge(df_src_interaction
, df_ref_inter_catg
, how='left'
, on=['hosi_interaction_category_id', 'hosi_op_id', 'hosi_bu_id']
)
print("[05]tcareinteractioncategory: %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
skip_target_check = configProperties.get('dagconfiguration', 'skip_target_check',
raw=True).upper() if configProperties.has_option('dagconfiguration',
'skip_target_check') else "YES"
print("skip_target_check : %s" % skip_target_check)
if skip_target_check == "NO":
return_val = formatSQLQuery(configProperties.get('queries', 'target_interaction', raw=True), args)
if not return_val[0]:
raise Exception("Target database connection failed: " + return_val[1])
query_tgt_intrctn = return_val[1]
df_tgt_interaction = cassDbSession.execute(query_tgt_intrctn, timeout=None)
df_tgt_interaction = df_tgt_interaction._current_rows
df_tgt_interaction.columns = df_tgt_interaction.columns.str.lower()
print("Target Interaction : %s " % len(df_tgt_interaction))
df_final = pd_merge(df_src_interaction
, df_tgt_interaction[
['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']]
, how='left'
, left_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
, right_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
, indicator='tc_migration_status'
)
df_final = df_final.loc[df_final['tc_migration_status'] == 'left_only']
df_final.drop(['tc_migration_status'], axis=1, inplace=True)
print("df_final Interaction : %s " % len(df_final))
else:
df_final = df_src_interaction.copy()
# endIf==>skip_target_check
if len(df_final) > 0:
df_final['hosi_customer_party_nbr'] = df_final['hosi_customer_party_nbr'].fillna(-99999).astype('int64')
# df_final['hosi_interaction_date'] = pd_to_datetime(df_final['hosi_interaction_date'], format="%Y-%m-%d %H%:M:%S")
# df_final['hosi_created_date'] = pd_to_datetime(df_final['hosi_created_date'], format="%Y-%m-%d %H%:M:%S")
# df_final['hosi_updated_date'] = pd_to_datetime(df_final['hosi_updated_date'], format="%Y-%m-%d %H%:M:%S")
print("Interaction: %s [Selected]" % len(df_final))
df_final.dropna(how='all', inplace=True)
print("Interaction: %s [DropNa-All]" % len(df_final))
# df_final.dropna(subset = ['hosi_interaction_id'])
df_final = df_final.copy()[df_final['hosi_interaction_id'].notnull()]
print("Interaction: %s [DropNa-hosi_interaction_id]" % len(df_final))
print("Final Interaction: %s " % len(df_final))
print("type(df_final.columns): %s %s" % (type(list(df_final.columns)), len(list(df_final.columns))))
__cass_cols = ','.join(df_final.columns)
# CSV COPY Loading
print("Preparing data to load into Cassandra via shell command")
if not loadDataFrameToCassandra(connection_id="cassandrahost"
, table_name="hos_interaction"
, df_data=df_final
, load_type_cqlldr=True
, dag_name="migrateInteraction"
, file_suffix="_" + str(time_stamp)
):
raise Exception("ODS data store load failed.")
else:
print("NO DATA TO LOAD")
# endIf==>_df_final_greaterthan_zero___
print("Successfully Completed the process")
except Exception as exp:
print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))Editor is loading...