Untitled
unknown
plain_text
2 years ago
1.7 kB
3
Indexable
import os import json import uuid def loadDataFrameToCassandra(table_name, df_data, connection_id="cassandrahost", load_type_cqlldr=True, dag_name=None, file_suffix=None): from airflow.hooks.base import BaseHook from subprocess import Popen, PIPE from cassandra import ConsistencyLevel import sys import gc from utils.setCassandraDBConnection import setCassandraDBConnection try: #data_list = df_data.to_dict(orient='records') data_list = df_data.applymap(lambda x: str(x) if isinstance(x, uuid.UUID) else x).to_dict(orient='records') json_file_path = os.path.join(os.environ.get('AIRFLOW_HOME'), 'logs', dag_name, f"{file_suffix}.json") with open(json_file_path, 'w') as json_file: json.dump(data_list, json_file) print("Preparing files for data population in json") file_suffix = table_name + file_suffix if file_suffix else table_name cassPath = os.environ.get('CASSANDRA_HOME') log_path = os.environ.get('AIRFLOW_HOME') + '/logs' cassCqlldr = os.path.join(log_path, dag_name, f"{file_suffix}.cqlldr") cassClog = os.path.join(log_path, dag_name, f"{file_suffix}.clog") cassCErr = os.path.join(log_path, dag_name, f"{file_suffix}.cerr") # Update the Cassandra loader section to use the JSON file print("json file placement completed") except Exception as expCDB: print("Error - {} . Line No - {} ".format(str(expCDB), str(sys.exc_info()[-1].tb_lineno))) return_flag = False gc.collect() Object of type Timestamp is not JSON serializable . Line No - 18
Editor is loading...