Untitled
unknown
plain_text
2 years ago
9.8 kB
5
Indexable
#********************* Function to insert data into NOSQL database ***********# def loadDataFrameToCassandra( table_name, df_data, connection_id="cassandrahost", load_type_cqlldr=True, dag_name=None, file_suffix=None): from airflow.hooks.base import BaseHook from subprocess import Popen, PIPE from os import environ as os_environ from os import path as os_path from cassandra import ConsistencyLevel import sys import gc from utils.setCassandraDBConnection import setCassandraDBConnection try: #--Invoke utility based function dbConnectRetVal = setCassandraDBConnection() if not dbConnectRetVal[0]: raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1]) cassDbSession = dbConnectRetVal[1] print("Cassandra database connection established") return_flag = True conn = BaseHook.get_connection(connection_id) connection_type = conn.conn_type hostname = conn.host schema = conn.schema login_name = conn.login login_password = conn.password port_number = conn.port columns=list(df_data) columns = ','.join(columns) print("Columns: %s" % columns) cass_table_result = cassDbSession.execute("SELECT * FROM "+schema+"."+table_name+" LIMIT 1 ", timeout=None) df_cass_table = cass_table_result._current_rows print("Extracted ------------- ") print(df_cass_table.dtypes) print("Received --------------") print(df_data.dtypes) print("df_data : %s" % len(df_data)) if load_type_cqlldr: if connection_type.upper()!="CASSANDRA": raise Exception("CONNECTION TYPE INVALID.") #print("connection_type: %s" % connection_type) #print("conn : %s" % conn) print("hostname : %s" % hostname) print("schema : %s" % schema) #print("login_name : %s" % login_name) #print("login_password : %s" % login_password) #print("port_number : %s" % port_number) print("Preparing files for data population") file_suffix=table_name+file_suffix if file_suffix else table_name cassPath = os_environ.get('CASSANDRA_HOME') log_path = os_environ.get('AIRFLOW_HOME')+'/logs' cassCsvFile = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.csv' cassCqlldr = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.cqlldr' cassClog = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.clog' cassCErr = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.cerr' #print("cassCsvFile: %s" % cassCsvFile) #print("cassCqlldr : %s" % cassCqlldr) #print("cassClog : %s" % cassClog) #print("cassCErr : %s" % cassCErr) #print(cassPath) """ df_data.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile) cassCqlldr_W = open(cassCqlldr, "w") cassCqlldr_W.write("copy " + schema +"." + table_name + " (" + ', '.join(columns) + ") FROM '" + cassCsvFile + "'" + " WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "'" + " AND HEADER=True " + " AND MAXATTEMPTS=1" + " AND MINBATCHSIZE=1 AND PAGESIZE=10 " + " AND NUMPROCESSES=4 AND MAXBATCHSIZE=20 AND INGESTRATE=1000 AND CHUNKSIZE=50 AND PREPAREDSTATEMENTS=False" + ";") cassCqlldr_W.close() print("Cassandra loader process") shellCommand = cassPath + '/bin/cqlsh ' + hostname + ' ' \ + str(port_number) \ + ' -u ' + login_name \ + ' -p ' + login_password \ + ' -k ' + schema \ + ' -f ' + cassCqlldr + ' > ' + cassClog shellProcess = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True) shellOutput, shellErr = shellProcess.communicate() shellReturnCode = shellProcess.returncode shellPid = str(shellProcess.pid) print("cassandra dataload processId: %s" % shellPid) print("shellProcess.returncode : %s" % shellReturnCode) print("shellOutput : %s" % shellOutput) print("shellErr : %s" % shellErr) if shellReturnCode != 0: raise Exception("ODS data store load failed") """ # cassCsvFile df_data.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile) cassCqlldr_W = open(cassCqlldr, "w") cassCqlldr_W.write("copy "+schema+"."+table_name+" (" + columns + ") FROM '" + cassCsvFile + "' WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "' AND HEADER=True AND CHUNKSIZE=50 AND INGESTRATE=10000 AND MAXBATCHSIZE=20 AND MAXATTEMPTS=1 AND NUMPROCESSES=1;") # AND PREPAREDSTATEMENTS=False ##Error Failed to import 5000 rows: ParseError - list index out of range, given up without retries cassCqlldr_W.close() shellCommand = cassPath + '/bin/cqlsh ' + hostname + ' ' \ + str(port_number) \ + ' -u ' + login_name \ + ' -p ' + login_password \ + ' -k ' + schema \ + ' -f ' + cassCqlldr + ' > ' + cassClog #print("COPY command \n"+ shellCommand) print("Data loading started into Cassandra via shell command") shell = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True) output, err = shell.communicate() pid = str(shell.pid) print("cassandra dataload processId:"+pid) #print("output, err ") print(shell.returncode) print(output) print(err) if shell.returncode == 0: print("cassandra dataload submitted") pidfile = log_path + '/'+dag_name+'.'+pid + '.pid' if os_path.isfile(pidfile): print("cassandra dataload InProgress") else: print("cassandra dataload completed") else: raise Exception("ODS data store load failed") # endIf==>_shell_returncode___ else: prepared = cassDbSession.prepare("INSERT INTO "+schema+"."+ table_name + '('+', '.join(columns) + ')' + ' VALUES (' + ', '.join(['?'] * len(columns)) + ')' ) #,consistency=ConsistencyLevel.QUORUM) print("Interim query: %s" % prepared) for index, row in df_data.head(2).iterrows(): #print(tuple(row[col] for col in list(df_data))) #cassDbSession.execute(prepared, tuple(row[col] for col in list(df_data))) cassDbSession.execute(prepared, (row['hosi_customer_id'],row['hosi_interaction_date'],row['hosi_interaction_id'],row['hosi_account_id'],row['hosi_bu_id'],row['hosi_category_id'],row['hosi_created_by'],row['hosi_created_date'],row['hosi_custom_created_date'],row['hosi_custom_interaction_date'],row['hosi_custom_updated_date'],row['hosi_customer_party_nbr'],row['hosi_external_ref_id1'],row['hosi_external_ref_id2'],row['hosi_external_ref_id3'],row['hosi_interaction_category_id'],row['hosi_interaction_channel'],row['hosi_interaction_correlation_id'],row['hosi_interaction_direction'],row['hosi_interaction_message'],row['hosi_interaction_mode'],row['hosi_interaction_module_nbr'],row['hosi_interaction_nbr'],row['hosi_interaction_outcome'],row['hosi_interaction_sentiment_score'],row['hosi_interaction_status'],row['hosi_interaction_subtype_id'],row['hosi_interaction_type_id'],row['hosi_op_id'],row['hosi_parent_interaction_id'],row['hosi_subscriber_id'],row['hosi_updated_by'],row['hosi_updated_date'],row['hosi_user_field1'],row['hosi_interaction_outcome_desc'],row['hosi_interaction_type_desc'],row['hosi_interaction_subtype_desc'],row['hosi_interaction_category_desc'])) count = count + 1 except Exception as expCDB: print("Error - {} . Line No - {} ".format(str(expCDB),str(sys.exc_info()[-1].tb_lineno))) return_flag = False gc.collect() return return_flag if __name__ == '__main__': import pandas as pd df_data = pd.read_csv("/app/server/HOBS-DataPipeline/logs/odsBulkMigration_partymedium/hos_party_medium_0.csv" ,delimiter="|" ) print("MESSAGE: : %s" % load_dataframe_into_cassandra(connection_id="cassandrahost", table_name="hos_party_medium" ,df_data=df_data ,load_type_cqlldr=False ,dag_name="odsBulkMigration_partymedium" ,file_suffix="tst") )
Editor is loading...