import csv
import glob
import json
import os
import time
from concurrent.futures import ProcessPoolExecutor
output_dir = "src/daas/bulk_extract/files"
def convert_to_csv():
files = glob.glob(f"{output_dir}/*.json")
with ProcessPoolExecutor() as pool:
pool.map(process_file, files)
def process_file(file):
with open(file, 'r') as json_file:
data = json.load(json_file)
if not data:
return
for key, value in data.items():
if isinstance(value, dict):
for subkey, subvalue in value.items():
csv_file_path = f"{output_dir}/{key}__{subkey}.csv"
write_to_csv(subvalue, csv_file_path)
else:
csv_file_path = f"{output_dir}/{key}.csv"
write_to_csv(value, csv_file_path)
def write_to_csv(data, csv_file_path):
if not data:
return
mode = 'a' if os.path.exists(csv_file_path) else 'w'
with open(csv_file_path, mode, newline='') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=data[0].keys())
if mode == 'w' or os.path.getsize(csv_file_path) == 0:
writer.writeheader()
for row in data:
writer.writerow(row)
if __name__ == '__main__':
start_time = time.time()
convert_to_csv()
end_time = time.time()
print(f"Conversion took {end_time - start_time} seconds")