Untitled

mail@pastecode.io avatar
unknown
python
a year ago
1.4 kB
6
Indexable
import csv
import glob
import json
import os
import time
from concurrent.futures import ProcessPoolExecutor

output_dir = "src/daas/bulk_extract/files"


def convert_to_csv():
    files = glob.glob(f"{output_dir}/*.json")
    with ProcessPoolExecutor() as pool:
        pool.map(process_file, files)

def process_file(file):
    with open(file, 'r') as json_file:
        data = json.load(json_file)
        if not data:
            return
        for key, value in data.items():
            if isinstance(value, dict):
                for subkey, subvalue in value.items():
                    csv_file_path = f"{output_dir}/{key}__{subkey}.csv"
                    write_to_csv(subvalue, csv_file_path)
            else:
                csv_file_path = f"{output_dir}/{key}.csv"
                write_to_csv(value, csv_file_path)


def write_to_csv(data, csv_file_path):
    if not data:
        return
    mode = 'a' if os.path.exists(csv_file_path) else 'w'
    with open(csv_file_path, mode, newline='') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=data[0].keys())
        if mode == 'w' or os.path.getsize(csv_file_path) == 0:
            writer.writeheader()
        for row in data:
            writer.writerow(row)


if __name__ == '__main__':
    start_time = time.time()
    convert_to_csv()
    end_time = time.time()
    print(f"Conversion took {end_time - start_time} seconds")