3 months ago
8.6 kB
# file: json-schema-analyser.py import json from typing import Dict, List, Set, Any, Tuple from datetime import datetime import re class JsonSchemaAnalyzer: def __init__(self, max_depth: int = 10): self.max_depth = max_depth self.table_schemas: Dict[str, Set[str]] = {} self.relationships: List[Dict[str, str]] = [] self.seen_arrays: Set[str] = set() self.column_types: Dict[str, Set[str]] = {} # Track multiple types per column def sanitize_name(self, name: str) -> str: """Sanitize column/table names for SQL compatibility""" return re.sub(r'[^a-zA-Z0-9_]', '_', name.lower()) def get_sql_type(self, value: Any, column_name: str = '') -> str: """Determine SQL type based on Python value and column name hints""" if isinstance(value, bool): return 'BOOLEAN' elif isinstance(value, (int, float)): # Special handling for price/amount/cost columns if any(term in column_name.lower() for term in ['price', 'amount', 'cost', 'fee', 'total']): return 'DECIMAL(19,4)' # Suitable for currency values with more precision elif isinstance(value, int): if abs(value) > 2147483647: # Check if exceeds regular INTEGER range return 'BIGINT' return 'INTEGER' else: return 'DECIMAL(15,6)' # General decimal with good precision elif isinstance(value, (list, dict)): return 'JSONB' # Store complex structures as JSONB elif isinstance(value, str): # Try various date/time formats date_formats = [ ('%Y-%m-%d', 'DATE'), ('%Y/%m/%d', 'DATE'), ('%d-%m-%Y', 'DATE'), ('%d/%m/%Y', 'DATE'), ('%Y-%m-%dT%H:%M:%S', 'TIMESTAMP'), ('%Y-%m-%dT%H:%M:%S.%f', 'TIMESTAMP'), ('%Y-%m-%dT%H:%M:%S%z', 'TIMESTAMP WITH TIME ZONE'), ('%Y-%m-%d %H:%M:%S', 'TIMESTAMP'), ('%H:%M:%S', 'TIME') ] for date_format, sql_type in date_formats: try: datetime.strptime(value, date_format) return sql_type except ValueError: continue # Check for special string patterns if re.match(r'^[0-9]+$', value): if len(value) <= 10: # Reasonable length for INTEGER return 'INTEGER' return 'TEXT' # Long numeric strings (like large IDs) better as TEXT elif re.match(r'^[0-9]*\.[0-9]+$', value): return 'DECIMAL(15,6)' elif re.match(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$', value): return 'UUID' elif len(value) > 1000: return 'TEXT' else: return 'VARCHAR(255)' return 'TEXT' def update_column_type(self, table_name: str, column_name: str, sql_type: str) -> str: """Determine the most appropriate SQL type when multiple types are encountered""" if table_name not in self.column_types: self.column_types[table_name] = {} if column_name not in self.column_types[table_name]: self.column_types[table_name][column_name] = set() self.column_types[table_name][column_name].add(sql_type) types = self.column_types[table_name][column_name] # Type resolution rules if len(types) > 1: if 'TEXT' in types: return 'TEXT' # Most flexible type elif 'JSONB' in types: return 'JSONB' elif 'DECIMAL' in types and ('INTEGER' in types or 'BIGINT' in types): return 'DECIMAL(19,4)' # Accommodate both integer and decimal elif 'TIMESTAMP WITH TIME ZONE' in types and 'TIMESTAMP' in types: return 'TIMESTAMP WITH TIME ZONE' elif 'BIGINT' in types and 'INTEGER' in types: return 'BIGINT' return sql_type def analyze_structure(self, data: Any, prefix: str = '', table_name: str = 'main', depth: int = 0) -> None: """Recursively analyze JSON structure""" if depth > self.max_depth: return if table_name not in self.table_schemas: self.table_schemas[table_name] = { 'id SERIAL PRIMARY KEY', 'created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP', 'updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP' } if isinstance(data, dict): for key, value in data.items(): column_name = self.sanitize_name(f"{prefix}_{key}" if prefix else key) if isinstance(value, dict): # For small dictionaries, consider storing as JSONB instead of creating new table if len(value) <= 3: # Arbitrary threshold sql_type = 'JSONB' self.table_schemas[table_name].add(f"{column_name} {sql_type}") else: self.analyze_structure(value, column_name, table_name, depth + 1) elif isinstance(value, list): if value and isinstance(value[0], dict): # Create new table for array of objects new_table = f"{table_name}_{column_name}" if new_table not in self.seen_arrays: self.seen_arrays.add(new_table) self.relationships.append({ 'from_table': table_name, 'to_table': new_table, 'type': 'one_to_many' }) if new_table not in self.table_schemas: self.table_schemas[new_table] = { 'id SERIAL PRIMARY KEY', f'{table_name}_id INTEGER REFERENCES {table_name}(id)', 'created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP', 'updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP' } # Analyze all items to catch all possible field types for item in value[:10]: # Limit to first 10 items for performance self.analyze_structure(item, '', new_table, depth + 1) else: # Store simple arrays as JSONB sql_type = 'JSONB' self.table_schemas[table_name].add(f"{column_name} {sql_type}") else: sql_type = self.get_sql_type(value, column_name) sql_type = self.update_column_type(table_name, column_name, sql_type) self.table_schemas[table_name].add(f"{column_name} {sql_type}") def generate_sql(self) -> Tuple[List[str], List[Dict[str, str]]]: """Generate SQL CREATE TABLE statements""" create_statements = [] for table_name, columns in self.table_schemas.items(): columns_str = ',\n '.join(sorted(columns)) create_statements.append( f"CREATE TABLE {table_name} (\n {columns_str}\n);" ) return create_statements, self.relationships def analyze_json_file(file_path: str) -> Tuple[List[str], List[Dict[str, str]]]: """Analyze JSON file and return SQL schema""" try: with open(file_path, 'r') as f: data = json.load(f) analyzer = JsonSchemaAnalyzer() analyzer.analyze_structure(data) return analyzer.generate_sql() except Exception as e: raise Exception(f"Error analyzing JSON file: {str(e)}") # Example usage if __name__ == "__main__": try: file_path = "sample_data.json" create_statements, relationships = analyze_json_file(file_path) print("=== SQL Create Statements ===") for stmt in create_statements: print(f"{stmt}\n") print("=== Table Relationships ===") for rel in relationships: print(f"- {rel['from_table']} -> {rel['to_table']} ({rel['type']})") except Exception as e: print(f"Error: {str(e)}")
Editor is loading...
Leave a Comment