Untitled
unknown
plain_text
5 months ago
2.6 kB
6
Indexable
from google.cloud import bigquery import json #Convert into Big Query Schema def convert_json_to_valid_schema(json): rec_to_insert = [] no_of_recs= len(json_data['features'][0]) print(f"{no_of_recs} to be processed!!!") for i in range(0,no_of_recs): schema_format = { "geometry": json_data['features'][i]['geometry'], "warningImpact": json_data['features'][i]['properties']['warningImpact'], "validFromDate": json_data['features'][i]['properties']['validFromDate'], "warningFurtherDetails": json_data['features'][i]['properties']['warningFurtherDetails'], "warningHeadline": json_data['features'][i]['properties']['warningHeadline'], "affectedAreas": json_data['features'][i]['properties']['affectedAreas'], "issuedDate": json_data['features'][i]['properties']['issuedDate'], "warningLevel": json_data['features'][i]['properties']['warningLevel'], "validToDate": json_data['features'][i]['properties']['validToDate'], "modifiedDate": json_data['features'][i]['properties']['modifiedDate'], "warningId": json_data['features'][i]['properties']['warningId'], "warningStatus": json_data['features'][i]['properties']['warningStatus'], "warningLikelihood": json_data['features'][i]['properties']['warningLikelihood'], "whatToExpect": json_data['features'][i]['properties']['whatToExpect'], "weatherType": json_data['features'][i]['properties']['weatherType'], "warningVersion": json_data['features'][i]['properties']['warningVersion'], } rec_to_insert.append(schema_format) return rec_to_insert # Initialize a BigQuery client client = bigquery.Client() # Set your dataset and table name dataset_id = 'your_project.your_dataset' table_id = f"{dataset_id}.your_table" # Load the JSON data (path to your JSON file) json_file_path = 'path/to/your/file.json' # Read the JSON file with open(json_file_path, 'r') as json_file: json_data = json.load(json_file) json_data = convert_json_to_valid_schema(json_data) # Convert JSON data to newline-delimited format required by BigQuery rows_to_insert = '\n'.join([json.dumps(record) for record in json_data]) #rows_to_insert = json_data # Insert the rows into BigQuery errors = client.insert_rows_json(table_id, rows_to_insert) # Check if there were any errors if errors == []: print(f"Inserted {len(rows_to_insert)} rows successfully.") else: print("Encountered errors while inserting rows:", errors)
Editor is loading...
Leave a Comment