Untitled

 avatar
unknown
plain_text
2 years ago
1.7 kB
3
Indexable
import os
import json
import uuid

def loadDataFrameToCassandra(table_name, df_data, connection_id="cassandrahost", load_type_cqlldr=True, dag_name=None, file_suffix=None):
    from airflow.hooks.base import BaseHook
    from subprocess import Popen, PIPE
    from cassandra import ConsistencyLevel
    import sys
    import gc

    from utils.setCassandraDBConnection import setCassandraDBConnection
    try:
        #data_list = df_data.to_dict(orient='records')
        data_list = df_data.applymap(lambda x: str(x) if isinstance(x, uuid.UUID) else x).to_dict(orient='records')
        json_file_path = os.path.join(os.environ.get('AIRFLOW_HOME'), 'logs', dag_name, f"{file_suffix}.json")
        with open(json_file_path, 'w') as json_file:
            json.dump(data_list, json_file)


        print("Preparing files for data population in json")
        file_suffix = table_name + file_suffix if file_suffix else table_name
        cassPath = os.environ.get('CASSANDRA_HOME')
        log_path = os.environ.get('AIRFLOW_HOME') + '/logs'
        cassCqlldr = os.path.join(log_path, dag_name, f"{file_suffix}.cqlldr")
        cassClog = os.path.join(log_path, dag_name, f"{file_suffix}.clog")
        cassCErr = os.path.join(log_path, dag_name, f"{file_suffix}.cerr")

        # Update the Cassandra loader section to use the JSON file
        print("json file placement completed")

    except Exception as expCDB:
        print("Error - {} . Line No - {} ".format(str(expCDB), str(sys.exc_info()[-1].tb_lineno)))
        return_flag = False

    gc.collect()

Object of type Timestamp is not JSON serializable . Line No - 18 
Editor is loading...