Untitled
unknown
plain_text
a year ago
2.6 kB
8
Indexable
from google.cloud import bigquery
import json
#Convert into Big Query Schema
def convert_json_to_valid_schema(json):
rec_to_insert = []
no_of_recs= len(json_data['features'][0])
print(f"{no_of_recs} to be processed!!!")
for i in range(0,no_of_recs):
schema_format = {
"geometry": json_data['features'][i]['geometry'],
"warningImpact": json_data['features'][i]['properties']['warningImpact'],
"validFromDate": json_data['features'][i]['properties']['validFromDate'],
"warningFurtherDetails": json_data['features'][i]['properties']['warningFurtherDetails'],
"warningHeadline": json_data['features'][i]['properties']['warningHeadline'],
"affectedAreas": json_data['features'][i]['properties']['affectedAreas'],
"issuedDate": json_data['features'][i]['properties']['issuedDate'],
"warningLevel": json_data['features'][i]['properties']['warningLevel'],
"validToDate": json_data['features'][i]['properties']['validToDate'],
"modifiedDate": json_data['features'][i]['properties']['modifiedDate'],
"warningId": json_data['features'][i]['properties']['warningId'],
"warningStatus": json_data['features'][i]['properties']['warningStatus'],
"warningLikelihood": json_data['features'][i]['properties']['warningLikelihood'],
"whatToExpect": json_data['features'][i]['properties']['whatToExpect'],
"weatherType": json_data['features'][i]['properties']['weatherType'],
"warningVersion": json_data['features'][i]['properties']['warningVersion'],
}
rec_to_insert.append(schema_format)
return rec_to_insert
# Initialize a BigQuery client
client = bigquery.Client()
# Set your dataset and table name
dataset_id = 'your_project.your_dataset'
table_id = f"{dataset_id}.your_table"
# Load the JSON data (path to your JSON file)
json_file_path = 'path/to/your/file.json'
# Read the JSON file
with open(json_file_path, 'r') as json_file:
json_data = json.load(json_file)
json_data = convert_json_to_valid_schema(json_data)
# Convert JSON data to newline-delimited format required by BigQuery
rows_to_insert = '\n'.join([json.dumps(record) for record in json_data])
#rows_to_insert = json_data
# Insert the rows into BigQuery
errors = client.insert_rows_json(table_id, rows_to_insert)
# Check if there were any errors
if errors == []:
print(f"Inserted {len(rows_to_insert)} rows successfully.")
else:
print("Encountered errors while inserting rows:", errors)Editor is loading...
Leave a Comment