Untitled
unknown
plain_text
a year ago
8.6 kB
8
Indexable
# file: json-schema-analyser.py
import json
from typing import Dict, List, Set, Any, Tuple
from datetime import datetime
import re
class JsonSchemaAnalyzer:
def __init__(self, max_depth: int = 10):
self.max_depth = max_depth
self.table_schemas: Dict[str, Set[str]] = {}
self.relationships: List[Dict[str, str]] = []
self.seen_arrays: Set[str] = set()
self.column_types: Dict[str, Set[str]] = {} # Track multiple types per column
def sanitize_name(self, name: str) -> str:
"""Sanitize column/table names for SQL compatibility"""
return re.sub(r'[^a-zA-Z0-9_]', '_', name.lower())
def get_sql_type(self, value: Any, column_name: str = '') -> str:
"""Determine SQL type based on Python value and column name hints"""
if isinstance(value, bool):
return 'BOOLEAN'
elif isinstance(value, (int, float)):
# Special handling for price/amount/cost columns
if any(term in column_name.lower() for term in ['price', 'amount', 'cost', 'fee', 'total']):
return 'DECIMAL(19,4)' # Suitable for currency values with more precision
elif isinstance(value, int):
if abs(value) > 2147483647: # Check if exceeds regular INTEGER range
return 'BIGINT'
return 'INTEGER'
else:
return 'DECIMAL(15,6)' # General decimal with good precision
elif isinstance(value, (list, dict)):
return 'JSONB' # Store complex structures as JSONB
elif isinstance(value, str):
# Try various date/time formats
date_formats = [
('%Y-%m-%d', 'DATE'),
('%Y/%m/%d', 'DATE'),
('%d-%m-%Y', 'DATE'),
('%d/%m/%Y', 'DATE'),
('%Y-%m-%dT%H:%M:%S', 'TIMESTAMP'),
('%Y-%m-%dT%H:%M:%S.%f', 'TIMESTAMP'),
('%Y-%m-%dT%H:%M:%S%z', 'TIMESTAMP WITH TIME ZONE'),
('%Y-%m-%d %H:%M:%S', 'TIMESTAMP'),
('%H:%M:%S', 'TIME')
]
for date_format, sql_type in date_formats:
try:
datetime.strptime(value, date_format)
return sql_type
except ValueError:
continue
# Check for special string patterns
if re.match(r'^[0-9]+$', value):
if len(value) <= 10: # Reasonable length for INTEGER
return 'INTEGER'
return 'TEXT' # Long numeric strings (like large IDs) better as TEXT
elif re.match(r'^[0-9]*\.[0-9]+$', value):
return 'DECIMAL(15,6)'
elif re.match(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$', value):
return 'UUID'
elif len(value) > 1000:
return 'TEXT'
else:
return 'VARCHAR(255)'
return 'TEXT'
def update_column_type(self, table_name: str, column_name: str, sql_type: str) -> str:
"""Determine the most appropriate SQL type when multiple types are encountered"""
if table_name not in self.column_types:
self.column_types[table_name] = {}
if column_name not in self.column_types[table_name]:
self.column_types[table_name][column_name] = set()
self.column_types[table_name][column_name].add(sql_type)
types = self.column_types[table_name][column_name]
# Type resolution rules
if len(types) > 1:
if 'TEXT' in types:
return 'TEXT' # Most flexible type
elif 'JSONB' in types:
return 'JSONB'
elif 'DECIMAL' in types and ('INTEGER' in types or 'BIGINT' in types):
return 'DECIMAL(19,4)' # Accommodate both integer and decimal
elif 'TIMESTAMP WITH TIME ZONE' in types and 'TIMESTAMP' in types:
return 'TIMESTAMP WITH TIME ZONE'
elif 'BIGINT' in types and 'INTEGER' in types:
return 'BIGINT'
return sql_type
def analyze_structure(self, data: Any, prefix: str = '', table_name: str = 'main', depth: int = 0) -> None:
"""Recursively analyze JSON structure"""
if depth > self.max_depth:
return
if table_name not in self.table_schemas:
self.table_schemas[table_name] = {
'id SERIAL PRIMARY KEY',
'created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
'updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
}
if isinstance(data, dict):
for key, value in data.items():
column_name = self.sanitize_name(f"{prefix}_{key}" if prefix else key)
if isinstance(value, dict):
# For small dictionaries, consider storing as JSONB instead of creating new table
if len(value) <= 3: # Arbitrary threshold
sql_type = 'JSONB'
self.table_schemas[table_name].add(f"{column_name} {sql_type}")
else:
self.analyze_structure(value, column_name, table_name, depth + 1)
elif isinstance(value, list):
if value and isinstance(value[0], dict):
# Create new table for array of objects
new_table = f"{table_name}_{column_name}"
if new_table not in self.seen_arrays:
self.seen_arrays.add(new_table)
self.relationships.append({
'from_table': table_name,
'to_table': new_table,
'type': 'one_to_many'
})
if new_table not in self.table_schemas:
self.table_schemas[new_table] = {
'id SERIAL PRIMARY KEY',
f'{table_name}_id INTEGER REFERENCES {table_name}(id)',
'created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
'updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
}
# Analyze all items to catch all possible field types
for item in value[:10]: # Limit to first 10 items for performance
self.analyze_structure(item, '', new_table, depth + 1)
else:
# Store simple arrays as JSONB
sql_type = 'JSONB'
self.table_schemas[table_name].add(f"{column_name} {sql_type}")
else:
sql_type = self.get_sql_type(value, column_name)
sql_type = self.update_column_type(table_name, column_name, sql_type)
self.table_schemas[table_name].add(f"{column_name} {sql_type}")
def generate_sql(self) -> Tuple[List[str], List[Dict[str, str]]]:
"""Generate SQL CREATE TABLE statements"""
create_statements = []
for table_name, columns in self.table_schemas.items():
columns_str = ',\n '.join(sorted(columns))
create_statements.append(
f"CREATE TABLE {table_name} (\n {columns_str}\n);"
)
return create_statements, self.relationships
def analyze_json_file(file_path: str) -> Tuple[List[str], List[Dict[str, str]]]:
"""Analyze JSON file and return SQL schema"""
try:
with open(file_path, 'r') as f:
data = json.load(f)
analyzer = JsonSchemaAnalyzer()
analyzer.analyze_structure(data)
return analyzer.generate_sql()
except Exception as e:
raise Exception(f"Error analyzing JSON file: {str(e)}")
# Example usage
if __name__ == "__main__":
try:
file_path = "sample_data.json"
create_statements, relationships = analyze_json_file(file_path)
print("=== SQL Create Statements ===")
for stmt in create_statements:
print(f"{stmt}\n")
print("=== Table Relationships ===")
for rel in relationships:
print(f"- {rel['from_table']} -> {rel['to_table']} ({rel['type']})")
except Exception as e:
print(f"Error: {str(e)}")
Editor is loading...
Leave a Comment