Untitled
unknown
plain_text
2 years ago
1.7 kB
4
Indexable
import os
import json
import uuid
def loadDataFrameToCassandra(table_name, df_data, connection_id="cassandrahost", load_type_cqlldr=True, dag_name=None, file_suffix=None):
from airflow.hooks.base import BaseHook
from subprocess import Popen, PIPE
from cassandra import ConsistencyLevel
import sys
import gc
from utils.setCassandraDBConnection import setCassandraDBConnection
try:
#data_list = df_data.to_dict(orient='records')
data_list = df_data.applymap(lambda x: str(x) if isinstance(x, uuid.UUID) else x).to_dict(orient='records')
json_file_path = os.path.join(os.environ.get('AIRFLOW_HOME'), 'logs', dag_name, f"{file_suffix}.json")
with open(json_file_path, 'w') as json_file:
json.dump(data_list, json_file)
print("Preparing files for data population in json")
file_suffix = table_name + file_suffix if file_suffix else table_name
cassPath = os.environ.get('CASSANDRA_HOME')
log_path = os.environ.get('AIRFLOW_HOME') + '/logs'
cassCqlldr = os.path.join(log_path, dag_name, f"{file_suffix}.cqlldr")
cassClog = os.path.join(log_path, dag_name, f"{file_suffix}.clog")
cassCErr = os.path.join(log_path, dag_name, f"{file_suffix}.cerr")
# Update the Cassandra loader section to use the JSON file
print("json file placement completed")
except Exception as expCDB:
print("Error - {} . Line No - {} ".format(str(expCDB), str(sys.exc_info()[-1].tb_lineno)))
return_flag = False
gc.collect()
Object of type Timestamp is not JSON serializable . Line No - 18 Editor is loading...