Untitled
unknown
plain_text
2 years ago
9.8 kB
18
Indexable
#********************* Function to insert data into NOSQL database ***********#
def loadDataFrameToCassandra( table_name, df_data, connection_id="cassandrahost", load_type_cqlldr=True, dag_name=None, file_suffix=None):
from airflow.hooks.base import BaseHook
from subprocess import Popen, PIPE
from os import environ as os_environ
from os import path as os_path
from cassandra import ConsistencyLevel
import sys
import gc
from utils.setCassandraDBConnection import setCassandraDBConnection
try:
#--Invoke utility based function
dbConnectRetVal = setCassandraDBConnection()
if not dbConnectRetVal[0]:
raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1])
cassDbSession = dbConnectRetVal[1]
print("Cassandra database connection established")
return_flag = True
conn = BaseHook.get_connection(connection_id)
connection_type = conn.conn_type
hostname = conn.host
schema = conn.schema
login_name = conn.login
login_password = conn.password
port_number = conn.port
columns=list(df_data)
columns = ','.join(columns)
print("Columns: %s" % columns)
cass_table_result = cassDbSession.execute("SELECT * FROM "+schema+"."+table_name+" LIMIT 1 ", timeout=None)
df_cass_table = cass_table_result._current_rows
print("Extracted ------------- ")
print(df_cass_table.dtypes)
print("Received --------------")
print(df_data.dtypes)
print("df_data : %s" % len(df_data))
if load_type_cqlldr:
if connection_type.upper()!="CASSANDRA":
raise Exception("CONNECTION TYPE INVALID.")
#print("connection_type: %s" % connection_type)
#print("conn : %s" % conn)
print("hostname : %s" % hostname)
print("schema : %s" % schema)
#print("login_name : %s" % login_name)
#print("login_password : %s" % login_password)
#print("port_number : %s" % port_number)
print("Preparing files for data population")
file_suffix=table_name+file_suffix if file_suffix else table_name
cassPath = os_environ.get('CASSANDRA_HOME')
log_path = os_environ.get('AIRFLOW_HOME')+'/logs'
cassCsvFile = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.csv'
cassCqlldr = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.cqlldr'
cassClog = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.clog'
cassCErr = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.cerr'
#print("cassCsvFile: %s" % cassCsvFile)
#print("cassCqlldr : %s" % cassCqlldr)
#print("cassClog : %s" % cassClog)
#print("cassCErr : %s" % cassCErr)
#print(cassPath)
"""
df_data.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile)
cassCqlldr_W = open(cassCqlldr, "w")
cassCqlldr_W.write("copy " + schema +"." + table_name + " (" + ', '.join(columns) + ") FROM '" + cassCsvFile + "'"
+ " WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "'"
+ " AND HEADER=True "
+ " AND MAXATTEMPTS=1"
+ " AND MINBATCHSIZE=1 AND PAGESIZE=10 "
+ " AND NUMPROCESSES=4 AND MAXBATCHSIZE=20 AND INGESTRATE=1000 AND CHUNKSIZE=50 AND PREPAREDSTATEMENTS=False"
+ ";")
cassCqlldr_W.close()
print("Cassandra loader process")
shellCommand = cassPath + '/bin/cqlsh ' + hostname + ' ' \
+ str(port_number) \
+ ' -u ' + login_name \
+ ' -p ' + login_password \
+ ' -k ' + schema \
+ ' -f ' + cassCqlldr + ' > ' + cassClog
shellProcess = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
shellOutput, shellErr = shellProcess.communicate()
shellReturnCode = shellProcess.returncode
shellPid = str(shellProcess.pid)
print("cassandra dataload processId: %s" % shellPid)
print("shellProcess.returncode : %s" % shellReturnCode)
print("shellOutput : %s" % shellOutput)
print("shellErr : %s" % shellErr)
if shellReturnCode != 0:
raise Exception("ODS data store load failed")
"""
# cassCsvFile
df_data.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile)
cassCqlldr_W = open(cassCqlldr, "w")
cassCqlldr_W.write("copy "+schema+"."+table_name+" (" + columns + ") FROM '" + cassCsvFile + "' WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "' AND HEADER=True AND CHUNKSIZE=50 AND INGESTRATE=10000 AND MAXBATCHSIZE=20 AND MAXATTEMPTS=1 AND NUMPROCESSES=1;")
# AND PREPAREDSTATEMENTS=False ##Error Failed to import 5000 rows: ParseError - list index out of range, given up without retries
cassCqlldr_W.close()
shellCommand = cassPath + '/bin/cqlsh ' + hostname + ' ' \
+ str(port_number) \
+ ' -u ' + login_name \
+ ' -p ' + login_password \
+ ' -k ' + schema \
+ ' -f ' + cassCqlldr + ' > ' + cassClog
#print("COPY command \n"+ shellCommand)
print("Data loading started into Cassandra via shell command")
shell = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
output, err = shell.communicate()
pid = str(shell.pid)
print("cassandra dataload processId:"+pid)
#print("output, err ")
print(shell.returncode)
print(output)
print(err)
if shell.returncode == 0:
print("cassandra dataload submitted")
pidfile = log_path + '/'+dag_name+'.'+pid + '.pid'
if os_path.isfile(pidfile):
print("cassandra dataload InProgress")
else:
print("cassandra dataload completed")
else:
raise Exception("ODS data store load failed")
# endIf==>_shell_returncode___
else:
prepared = cassDbSession.prepare("INSERT INTO "+schema+"."+ table_name + '('+', '.join(columns) + ')'
+ ' VALUES (' + ', '.join(['?'] * len(columns)) + ')'
)
#,consistency=ConsistencyLevel.QUORUM)
print("Interim query: %s" % prepared)
for index, row in df_data.head(2).iterrows():
#print(tuple(row[col] for col in list(df_data)))
#cassDbSession.execute(prepared, tuple(row[col] for col in list(df_data)))
cassDbSession.execute(prepared, (row['hosi_customer_id'],row['hosi_interaction_date'],row['hosi_interaction_id'],row['hosi_account_id'],row['hosi_bu_id'],row['hosi_category_id'],row['hosi_created_by'],row['hosi_created_date'],row['hosi_custom_created_date'],row['hosi_custom_interaction_date'],row['hosi_custom_updated_date'],row['hosi_customer_party_nbr'],row['hosi_external_ref_id1'],row['hosi_external_ref_id2'],row['hosi_external_ref_id3'],row['hosi_interaction_category_id'],row['hosi_interaction_channel'],row['hosi_interaction_correlation_id'],row['hosi_interaction_direction'],row['hosi_interaction_message'],row['hosi_interaction_mode'],row['hosi_interaction_module_nbr'],row['hosi_interaction_nbr'],row['hosi_interaction_outcome'],row['hosi_interaction_sentiment_score'],row['hosi_interaction_status'],row['hosi_interaction_subtype_id'],row['hosi_interaction_type_id'],row['hosi_op_id'],row['hosi_parent_interaction_id'],row['hosi_subscriber_id'],row['hosi_updated_by'],row['hosi_updated_date'],row['hosi_user_field1'],row['hosi_interaction_outcome_desc'],row['hosi_interaction_type_desc'],row['hosi_interaction_subtype_desc'],row['hosi_interaction_category_desc']))
count = count + 1
except Exception as expCDB:
print("Error - {} . Line No - {} ".format(str(expCDB),str(sys.exc_info()[-1].tb_lineno)))
return_flag = False
gc.collect()
return return_flag
if __name__ == '__main__':
import pandas as pd
df_data = pd.read_csv("/app/server/HOBS-DataPipeline/logs/odsBulkMigration_partymedium/hos_party_medium_0.csv"
,delimiter="|"
)
print("MESSAGE: : %s" % load_dataframe_into_cassandra(connection_id="cassandrahost", table_name="hos_party_medium"
,df_data=df_data
,load_type_cqlldr=False
,dag_name="odsBulkMigration_partymedium"
,file_suffix="tst")
) Editor is loading...