Untitled
unknown
plain_text
3 years ago
19 kB
7
Indexable
import uuid
import time
from configparser import ConfigParser as conf_ConfigParser
from json import loads as json_loads
from os import environ as os_environ
from os import path as os_path
from pandas import DataFrame as pd_DataFrame
from pandas import merge as pd_merge
from pandas import datetime as pd_to_datetime
from pandas import read_sql as pd_read_sql
from subprocess import Popen, PIPE
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from utils.formatSQLQuery import formatSQLQuery
from utils.loadDataFrameToCassandra import loadDataFrameToCassandra
from utils.setCassandraDBConnection import setCassandraDBConnection
from utils.setDbEngine import setDbEngine
import sys
def getConfiguration(__file__):
from configparser import ConfigParser as conf_ConfigParser
print("DAG __file__ : %s" % __file__)
# Entity-Customer
configProperties = conf_ConfigParser()
configFile = __file__.replace('/dags/', '/config/').replace('.py', '.config')
if not os_path.exists(configFile):
raise Exception("Entity config file does not exist. [%s]" % configFile)
print("Entity config File: %s" % configFile)
configProperties.read(configFile)
return (configProperties)
def func_pandas_factory(__col_names, __rows):
print("Row_Factory")
from cassandra import util as cass_util
# Convert tuple items of '__rows' into list (elements of tuples cannot be replaced)
__rows = [list(i) for i in __rows]
# Convert only 'OrderedMapSerializedKey' type list elements into dict
for idx_row, i_row in enumerate(__rows):
for idx_value, i_value in enumerate(i_row):
if type(i_value) is cass_util.OrderedMapSerializedKey:
__rows[idx_row][idx_value] = dict(__rows[idx_row][idx_value])
return pd_DataFrame(__rows, columns=__col_names)
# endDef==>func_pandas_factory()
def fn_processInteraction(**kwargs):
# def fn_processInteraction(args):
# __file__="/app/server/HOBS-DataPipeline/modules/crms/dags/migrateInteraction.py"
try:
configProperties = getConfiguration(__file__)
adminVariableValue = {}
df_src_interaction = pd_DataFrame()
df_tgt_interaction = pd_DataFrame()
df_ref_outcome_ref = pd_DataFrame()
df_ref_inter_ref = pd_DataFrame()
df_ref_inter_type = pd_DataFrame()
df_ref_inter_subtype = pd_DataFrame()
df_ref_inter_catg = pd_DataFrame()
validkeys = json_loads(configProperties.get('dagconfiguration', 'validkeys', raw=True))
time_stamp = time.strftime("%Y%m%d-%H%M%S")
log_path = os_environ.get('AIRFLOW_HOME')
if not log_path:
raise Exception("AIRFLOW_HOME not configured.")
cassandra_path = os_environ.get('CASSANDRA_HOME')
if not cassandra_path:
raise Exception("CASSANDRA_HOME not configured.")
dagIdVar = os_path.basename(__file__).replace(".py", "")
if Variable.get(dagIdVar, None):
adminVariableValue = Variable.get(dagIdVar, None)
print('Airflow UI variable value assigned {%s: %s}' % (dagIdVar, adminVariableValue))
if not adminVariableValue:
raise Exception("adminVariableValue{%s} value not set." % dagIdVar)
adminVariableValue = json_loads(adminVariableValue)
args = adminVariableValue.copy()
print("Type of validkeys : " + str(validkeys) + "type(validkeys): " + str(type(validkeys)))
print("Type of args.keys(): " + str(set(list(args.keys()))))
if not (len(list(args.keys())) > 0 and set(list(args.keys())).issubset(set(validkeys))):
print("ERROR: Arguments are invalid and invalid arguments are %s." % ','.join(
set(list(args.keys())) - set(validkeys)) + ".")
raise Exception(("Arguments are invalid and invalid arguments are %s." % ','.join(
set(list(args.keys())) - set(validkeys)) + "."))
# ------------------- Target Connection Cassandra
dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
if not dbConnectRetVal[0]:
raise Exception("Cassandra database connection failed: %s" % dbConnectRetVal[1])
cassDbSession = dbConnectRetVal[1]
print("Cassandra Connected %s" % cassDbSession)
dbConnectRetVal = setDbEngine(pac_encoding=configProperties.get('dagconfiguration', 'charset', raw=True))
if not dbConnectRetVal[0]:
raise Exception("Source database connection failed: %s" % dbConnectRetVal[1])
db_engine = dbConnectRetVal[1]
print("Source database connection established")
print(db_engine)
return_val = formatSQLQuery(configProperties.get('queries', 'outcome_ref', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
query_outcome_ref = return_val[1]
return_val = formatSQLQuery(configProperties.get('queries', 'inter_type', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: query_inter_type failed: %s" % return_val[1])
query_inter_type = return_val[1]
return_val = formatSQLQuery(configProperties.get('queries', 'source_interaction', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: query_src_intrctn failed: %s" % return_val[1])
query_src_intrctn = return_val[1]
return_val = formatSQLQuery(configProperties.get('queries', 'interaction_ref', raw=True), args)
if not return_val[0]:
raise Exception("ERROR: outcome_ref failed: %s" % return_val[1])
query_interaction_ref = return_val[1]
df_ref_outcome_ref = pd_read_sql(query_outcome_ref, con=db_engine)
df_ref_outcome_ref.columns = df_ref_outcome_ref.columns.str.lower()
print("Outcome Ref : %s " % len(df_ref_outcome_ref))
df_ref_interaction_ref = pd_read_sql(query_interaction_ref, con=db_engine)
df_ref_interaction_ref.columns = df_ref_interaction_ref.columns.str.lower()
print("Outcome Ref : %s " % len(df_ref_outcome_ref))
df_ref_inter_type = pd_read_sql(query_inter_type, con=db_engine)
df_ref_inter_type.columns = df_ref_inter_type.columns.str.lower()
print("interType Ref : %s " % len(df_ref_inter_type))
df_ref_inter_subtype = df_ref_inter_type.copy()
df_ref_inter_subtype.rename(columns={"hosi_interaction_type_id": "hosi_interaction_subtype_id",
"hosi_interaction_type_desc": "hosi_interaction_subtype_desc"},
inplace=True)
print("interSubType Ref : %s " % len(df_ref_inter_subtype))
df_ref_inter_catg = df_ref_inter_type.copy()
df_ref_inter_catg.rename(columns={"hosi_interaction_type_id": "hosi_interaction_category_id",
"hosi_interaction_type_desc": "hosi_interaction_category_desc"}, inplace=True)
print("interCateg Ref : %s " % len(df_ref_inter_catg))
df_src_interaction = pd_read_sql(query_src_intrctn, con=db_engine)
df_src_interaction.columns = df_src_interaction.columns.str.lower()
df_src_interaction.rename(columns={"hosi_interaction_correl_id": "hosi_interaction_correlation_id",
"hosi_interaction_senti_score": "hosi_interaction_sentiment_score"},
inplace=True)
# if 'hosi_updated_by' in df_src_interaction.columns:
# df_src_interaction['hosi_updated_by'] = 'migrationJob'
df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace(r'\\n',
' ',
regex=True)
df_src_interaction['hosi_interaction_message'] = df_src_interaction['hosi_interaction_message'].replace('"', '')
print("[01]Source Interaction : %s" % len(df_src_interaction))
# tcareinteractionoutcome
df_src_interaction = pd_merge(df_src_interaction
, df_ref_outcome_ref
, how='left'
, on=['hosi_interaction_outcome', 'hosi_op_id', 'hosi_bu_id']
)
print("[02]tcareinteractionoutcome : %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
# tcareinteractiontype
df_src_interaction = pd_merge(df_src_interaction
, df_ref_inter_type
, how='left'
, on=['hosi_op_id', 'hosi_bu_id']
)
print("[03]tcareinteractiontype : %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
df_src_interaction = pd_merge(df_src_interaction
, df_ref_interaction_ref
, how='left'
, on=['hosi_customer_id', 'hosi_op_id', 'hosi_bu_id']
)
# tcareinteractionsubtype
df_src_interaction = pd_merge(df_src_interaction
, df_ref_inter_subtype
, how='left'
, on=[ 'hosi_op_id', 'hosi_bu_id']
)
print("[04]tcareinteractionsubtype : %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
# tcareinteractioncategory
df_src_interaction = pd_merge(df_src_interaction
, df_ref_inter_catg
, how='left'
, on=['hosi_interaction_category_id', 'hosi_op_id', 'hosi_bu_id']
)
print("[05]tcareinteractioncategory: %s" % len(df_src_interaction))
# print("Columns\n%s" % list(df_src_interaction.columns))
skip_target_check = configProperties.get('dagconfiguration', 'skip_target_check',
raw=True).upper() if configProperties.has_option('dagconfiguration',
'skip_target_check') else "YES"
print("skip_target_check : %s" % skip_target_check)
if skip_target_check == "NO":
return_val = formatSQLQuery(configProperties.get('queries', 'target_interaction', raw=True), args)
if not return_val[0]:
raise Exception("Target database connection failed: " + return_val[1])
query_tgt_intrctn = return_val[1]
df_tgt_interaction = cassDbSession.execute(query_tgt_intrctn, timeout=None)
df_tgt_interaction = df_tgt_interaction._current_rows
df_tgt_interaction.columns = df_tgt_interaction.columns.str.lower()
print("Target Interaction : %s " % len(df_tgt_interaction))
df_final = pd_merge(df_src_interaction
, df_tgt_interaction[
['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']]
, how='left'
, left_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
, right_on=['hosi_customer_id', 'hosi_interaction_id', 'hosi_op_id', 'hosi_bu_id']
, indicator='tc_migration_status'
)
df_final = df_final.loc[df_final['tc_migration_status'] == 'left_only']
df_final.drop(['tc_migration_status'], axis=1, inplace=True)
print("df_final Interaction : %s " % len(df_final))
else:
df_final = df_src_interaction.copy()
# endIf==>skip_target_check
if len(df_final) > 0:
df_final['hosi_customer_party_nbr'] = df_final['hosi_customer_party_nbr'].fillna(-99999).astype('int64')
# df_final['hosi_interaction_date'] = pd_to_datetime(df_final['hosi_interaction_date'], format="%Y-%m-%d %H%:M:%S")
# df_final['hosi_created_date'] = pd_to_datetime(df_final['hosi_created_date'], format="%Y-%m-%d %H%:M:%S")
# df_final['hosi_updated_date'] = pd_to_datetime(df_final['hosi_updated_date'], format="%Y-%m-%d %H%:M:%S")
print("Interaction: %s [Selected]" % len(df_final))
df_final.dropna(how='all', inplace=True)
print("Interaction: %s [DropNa-All]" % len(df_final))
# df_final.dropna(subset = ['hosi_interaction_id'])
df_final = df_final.copy()[df_final['hosi_interaction_id'].notnull()]
print("Interaction: %s [DropNa-hosi_interaction_id]" % len(df_final))
print("Final Interaction: %s " % len(df_final))
print("type(df_final.columns): %s %s" % (type(list(df_final.columns)), len(list(df_final.columns))))
__cass_cols = ','.join(df_final.columns)
# CSV COPY Loading
print("Preparing data to load into Cassandra via shell command")
if not loadDataFrameToCassandra(connection_id="cassandrahost"
, table_name="hos_interaction"
, df_data=df_final
, load_type_cqlldr=True
, dag_name="migrateInteraction"
, file_suffix="_" + str(time_stamp)
):
raise Exception("ODS data store load failed.")
"""
log_path = log_path + '/logs'
cass_file_csv = log_path + '/hos_interaction_'+time_stamp+'.csv'
cass_file_loader = log_path + '/hos_interaction_'+time_stamp+'.cqlldr'
cass_file_log = log_path + '/hos_interaction_'+time_stamp+'.log'
cass_file_err = log_path + '/hos_interaction_'+time_stamp+'.err'
api_err = log_path + '/migrateInteraction_'+time_stamp+'.err'
print("\n%s\n%s\n%s\n%s\n%s" % (log_path, cass_file_csv, cass_file_loader, cass_file_log, cassandra_path))
print("Generating csv file for loading data")
df_final.to_csv(sep='|', index=False, header=True, path_or_buf=cass_file_csv)
__cass_cslldr = open(cass_file_loader, "w")
#__cass_cslldr.write("copy hobs_ods_staging.hos_interaction (" + __cass_cols + ") FROM '" + cass_file_csv + "' WITH DELIMITER='|' AND ERRFILE='" + cass_file_err + "'AND HEADER=True AND PREPAREDSTATEMENTS=False AND CHUNKSIZE=1000 AND INGESTRATE=100000 AND MAXBATCHSIZE=20 AND MAXATTEMPTS=1 AND NUMPROCESSES=1;")
__cass_cslldr.write("copy hobs_ods_staging.hos_interaction (" + __cass_cols + ") FROM '" + cass_file_csv + "' WITH DELIMITER='|' AND ERRFILE='" + cass_file_err + "'AND HEADER=True AND PREPAREDSTATEMENTS=False AND CHUNKSIZE=50 AND INGESTRATE=10000 AND MAXBATCHSIZE=20 AND MAXATTEMPTS=1 AND NUMPROCESSES=1;")
__cass_cslldr.close()
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection("cassandrahost")
connection_type = conn.conn_type
hostname = conn.host
schema = conn.schema
login_name = conn.login
login_password = conn.password
port_number = conn.port
__cmd = cassandra_path + '/bin/cqlsh ' + hostname + ' ' \
+ str(port_number) \
+ ' -u ' + login_name \
+ ' -p ' + login_password \
+ ' -k ' + schema \
+ ' -f ' + cass_file_loader + ' > ' + cass_file_log
print("COPY command \n"+ __cmd)
print("Data loading started into Cassandra via shell command")
__shell = Popen(__cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
__output, __err = __shell.communicate()
__pid = str(__shell.pid)
print("cassandra dataload processId:"+__pid)
print("__output, __err ")
print(__shell.returncode)
print(__output)
print("Reportdetails-start-interactionData")
print("Reportdetails-rows-migratedCount-%s" % str(len(df_final)))
if __shell.returncode == 0:
print("cassandra dataload submitted")
__pidfile = log_path + '/migrateInteraction.'+__pid + '.pid'
if os_path.isfile(__pidfile):
print("cassandra dataload InProgress")
else:
print("cassandra dataload completed")
else:
raise Exception("ODS data store load failed")
# endIf==>___shell_returncode___
"""
else:
print("NO DATA TO LOAD")
# endIf==>_df_final_greaterthan_zero___
print("Successfully Completed the process")
except Exception as exp:
print("Error - {} . Line No - {} ".format(str(exp), str(sys.exc_info()[-1].tb_lineno)))
# ********************* DAG Tasks********************************************************************************************#
default_args = {'owner': 'airflow'
, 'depends_on_past': False
, 'start_date': days_ago(2)
}
dag = DAG("migrateInteraction"
, catchup=False
, default_args=default_args
, description="migrateInteraction"
, schedule_interval="@once"
, tags=["odsBulkMigration"]
, params={"language": "ENG", "opId": "HOB", "buId": "DEFAULT"}
)
operatorStageData = PythonOperator(task_id='task_migrateInteraction'
, python_callable=fn_processInteraction
, dag=dag
, provide_context=True
)
# ********************* DAG Sequence*******************************************************************************************#
operatorStageData
"""
if __name__ == '__main__':
fn_processInteraction({"language":"ENG","opId":"HOB","buId":"DEFAULT"})
"""Editor is loading...