Untitled

mail@pastecode.io avatar
unknown
plain_text
8 months ago
9.8 kB
1
Indexable
Never
#********************* Function to insert data into NOSQL database  ***********#

def loadDataFrameToCassandra( table_name, df_data, connection_id="cassandrahost", load_type_cqlldr=True, dag_name=None, file_suffix=None):
    from airflow.hooks.base import BaseHook
    from subprocess import Popen, PIPE
    from os import environ as os_environ
    from os import path as os_path
    from cassandra import ConsistencyLevel
    import sys
    import gc
    
    from utils.setCassandraDBConnection import setCassandraDBConnection
    try:
        #--Invoke utility based function
        dbConnectRetVal = setCassandraDBConnection()
        if not dbConnectRetVal[0]:
            raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1])
        cassDbSession = dbConnectRetVal[1]
        print("Cassandra database connection established")
        
        return_flag = True
        conn = BaseHook.get_connection(connection_id)
        connection_type = conn.conn_type
        hostname = conn.host
        schema = conn.schema
        login_name = conn.login
        login_password = conn.password
        port_number = conn.port
        
        columns=list(df_data)
        columns = ','.join(columns)
        print("Columns: %s" % columns)
        
        cass_table_result = cassDbSession.execute("SELECT * FROM "+schema+"."+table_name+" LIMIT 1 ", timeout=None)
        df_cass_table = cass_table_result._current_rows
        
        print("Extracted ------------- ")
        print(df_cass_table.dtypes)
        print("Received --------------")
        print(df_data.dtypes)
        print("df_data        : %s" % len(df_data))
        
        if load_type_cqlldr:
            if connection_type.upper()!="CASSANDRA":
                raise Exception("CONNECTION TYPE INVALID.")
            
            #print("connection_type: %s" % connection_type)
            #print("conn           : %s" % conn)
            print("hostname       : %s" % hostname)
            print("schema         : %s" % schema)
            #print("login_name     : %s" % login_name)
            #print("login_password : %s" % login_password)
            #print("port_number    : %s" % port_number)
            
            print("Preparing files for data population")
            file_suffix=table_name+file_suffix if file_suffix else table_name
            cassPath = os_environ.get('CASSANDRA_HOME')
            log_path = os_environ.get('AIRFLOW_HOME')+'/logs'
            cassCsvFile = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.csv'
            cassCqlldr = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.cqlldr'
            cassClog = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.clog'
            cassCErr = os_environ.get('AIRFLOW_HOME')+'/logs/'+dag_name+'/'+file_suffix+'.cerr'
            #print("cassCsvFile: %s" % cassCsvFile)
            #print("cassCqlldr : %s" % cassCqlldr)
            #print("cassClog   : %s" % cassClog)
            #print("cassCErr   : %s" % cassCErr)
            #print(cassPath)
            """
            df_data.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile)
            cassCqlldr_W = open(cassCqlldr, "w")
            cassCqlldr_W.write("copy " + schema +"." + table_name + " (" + ', '.join(columns)  + ") FROM '" + cassCsvFile + "'" 
                                                + " WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "'" 
                                                + " AND HEADER=True "
                                                + " AND MAXATTEMPTS=1" 
                                                + " AND MINBATCHSIZE=1 AND PAGESIZE=10 "
                                                + " AND NUMPROCESSES=4 AND MAXBATCHSIZE=20 AND INGESTRATE=1000 AND CHUNKSIZE=50 AND PREPAREDSTATEMENTS=False"
                                                + ";")
            cassCqlldr_W.close()
            
            print("Cassandra loader process")
            shellCommand = cassPath + '/bin/cqlsh ' + hostname + ' ' \
                             + str(port_number) \
                             + ' -u ' + login_name \
                             + ' -p ' + login_password \
                             + ' -k ' + schema \
                             + ' -f ' + cassCqlldr + ' > '  + cassClog
            shellProcess = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
            shellOutput, shellErr = shellProcess.communicate()
            shellReturnCode = shellProcess.returncode
            shellPid = str(shellProcess.pid)
            print("cassandra dataload processId: %s" % shellPid)
            print("shellProcess.returncode     : %s" % shellReturnCode)
            print("shellOutput                 : %s" % shellOutput)
            print("shellErr                    : %s" % shellErr)
            
            if shellReturnCode != 0:
                raise Exception("ODS data store load failed")
            """
            # cassCsvFile
            df_data.to_csv(sep='|', header=True, index=False, path_or_buf=cassCsvFile)
            cassCqlldr_W = open(cassCqlldr, "w")
            cassCqlldr_W.write("copy "+schema+"."+table_name+" (" + columns + ") FROM '" + cassCsvFile + "' WITH DELIMITER='|' AND ERRFILE='" + cassCErr + "' AND HEADER=True AND CHUNKSIZE=50 AND INGESTRATE=10000 AND MAXBATCHSIZE=20 AND MAXATTEMPTS=1 AND NUMPROCESSES=1;")
            # AND PREPAREDSTATEMENTS=False ##Error Failed to import 5000 rows: ParseError - list index out of range,  given up without retries
            cassCqlldr_W.close()
            
            shellCommand = cassPath + '/bin/cqlsh ' + hostname + ' ' \
                     + str(port_number) \
                     + ' -u ' + login_name \
                     + ' -p ' + login_password \
                     + ' -k ' + schema \
                     + ' -f ' + cassCqlldr + ' > '  + cassClog
            
            #print("COPY command \n"+ shellCommand)
            print("Data loading started into Cassandra via shell command")
            
            shell = Popen(shellCommand, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
            output, err = shell.communicate()
            pid = str(shell.pid)
            print("cassandra dataload processId:"+pid)
            #print("output, err ")
            print(shell.returncode)
            print(output)
            print(err)
            
            if shell.returncode == 0:
                print("cassandra dataload submitted")
                pidfile = log_path + '/'+dag_name+'.'+pid + '.pid'
                if os_path.isfile(pidfile):
                    print("cassandra dataload InProgress")
                else:
                    print("cassandra dataload completed")
            else:
                raise Exception("ODS data store load failed")
            # endIf==>_shell_returncode___
            
        else:
            prepared = cassDbSession.prepare("INSERT INTO "+schema+"."+ table_name + '('+', '.join(columns) + ')'
                                            + ' VALUES (' + ', '.join(['?'] * len(columns)) + ')'
                                            )
                                            #,consistency=ConsistencyLevel.QUORUM)
            print("Interim query: %s" % prepared)
            for index, row in df_data.head(2).iterrows():
                #print(tuple(row[col] for col in list(df_data)))
                #cassDbSession.execute(prepared, tuple(row[col] for col in list(df_data)))
                cassDbSession.execute(prepared, (row['hosi_customer_id'],row['hosi_interaction_date'],row['hosi_interaction_id'],row['hosi_account_id'],row['hosi_bu_id'],row['hosi_category_id'],row['hosi_created_by'],row['hosi_created_date'],row['hosi_custom_created_date'],row['hosi_custom_interaction_date'],row['hosi_custom_updated_date'],row['hosi_customer_party_nbr'],row['hosi_external_ref_id1'],row['hosi_external_ref_id2'],row['hosi_external_ref_id3'],row['hosi_interaction_category_id'],row['hosi_interaction_channel'],row['hosi_interaction_correlation_id'],row['hosi_interaction_direction'],row['hosi_interaction_message'],row['hosi_interaction_mode'],row['hosi_interaction_module_nbr'],row['hosi_interaction_nbr'],row['hosi_interaction_outcome'],row['hosi_interaction_sentiment_score'],row['hosi_interaction_status'],row['hosi_interaction_subtype_id'],row['hosi_interaction_type_id'],row['hosi_op_id'],row['hosi_parent_interaction_id'],row['hosi_subscriber_id'],row['hosi_updated_by'],row['hosi_updated_date'],row['hosi_user_field1'],row['hosi_interaction_outcome_desc'],row['hosi_interaction_type_desc'],row['hosi_interaction_subtype_desc'],row['hosi_interaction_category_desc']))
                count = count + 1
            
    except Exception as expCDB:
        print("Error - {} . Line No - {} ".format(str(expCDB),str(sys.exc_info()[-1].tb_lineno)))
        return_flag = False
    
    gc.collect()
    return return_flag


if __name__ == '__main__':
    import pandas as pd
    df_data = pd.read_csv("/app/server/HOBS-DataPipeline/logs/odsBulkMigration_partymedium/hos_party_medium_0.csv"
                         ,delimiter="|"
                         )
    print("MESSAGE: : %s" % load_dataframe_into_cassandra(connection_id="cassandrahost", table_name="hos_party_medium"
                                ,df_data=df_data
                                ,load_type_cqlldr=False
                                ,dag_name="odsBulkMigration_partymedium"
                                ,file_suffix="tst")
                        )