Untitled

 avatar
unknown
plain_text
2 months ago
8.6 kB
4
Indexable
# file: json-schema-analyser.py
import json
from typing import Dict, List, Set, Any, Tuple
from datetime import datetime
import re

class JsonSchemaAnalyzer:
    def __init__(self, max_depth: int = 10):
        self.max_depth = max_depth
        self.table_schemas: Dict[str, Set[str]] = {}
        self.relationships: List[Dict[str, str]] = []
        self.seen_arrays: Set[str] = set()
        self.column_types: Dict[str, Set[str]] = {}  # Track multiple types per column

    def sanitize_name(self, name: str) -> str:
        """Sanitize column/table names for SQL compatibility"""
        return re.sub(r'[^a-zA-Z0-9_]', '_', name.lower())

    def get_sql_type(self, value: Any, column_name: str = '') -> str:
        """Determine SQL type based on Python value and column name hints"""
        if isinstance(value, bool):
            return 'BOOLEAN'
        elif isinstance(value, (int, float)):
            # Special handling for price/amount/cost columns
            if any(term in column_name.lower() for term in ['price', 'amount', 'cost', 'fee', 'total']):
                return 'DECIMAL(19,4)'  # Suitable for currency values with more precision
            elif isinstance(value, int):
                if abs(value) > 2147483647:  # Check if exceeds regular INTEGER range
                    return 'BIGINT'
                return 'INTEGER'
            else:
                return 'DECIMAL(15,6)'  # General decimal with good precision
        elif isinstance(value, (list, dict)):
            return 'JSONB'  # Store complex structures as JSONB
        elif isinstance(value, str):
            # Try various date/time formats
            date_formats = [
                ('%Y-%m-%d', 'DATE'),
                ('%Y/%m/%d', 'DATE'),
                ('%d-%m-%Y', 'DATE'),
                ('%d/%m/%Y', 'DATE'),
                ('%Y-%m-%dT%H:%M:%S', 'TIMESTAMP'),
                ('%Y-%m-%dT%H:%M:%S.%f', 'TIMESTAMP'),
                ('%Y-%m-%dT%H:%M:%S%z', 'TIMESTAMP WITH TIME ZONE'),
                ('%Y-%m-%d %H:%M:%S', 'TIMESTAMP'),
                ('%H:%M:%S', 'TIME')
            ]
            
            for date_format, sql_type in date_formats:
                try:
                    datetime.strptime(value, date_format)
                    return sql_type
                except ValueError:
                    continue

            # Check for special string patterns
            if re.match(r'^[0-9]+$', value):
                if len(value) <= 10:  # Reasonable length for INTEGER
                    return 'INTEGER'
                return 'TEXT'  # Long numeric strings (like large IDs) better as TEXT
            elif re.match(r'^[0-9]*\.[0-9]+$', value):
                return 'DECIMAL(15,6)'
            elif re.match(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$', value):
                return 'UUID'
            elif len(value) > 1000:
                return 'TEXT'
            else:
                return 'VARCHAR(255)'
        return 'TEXT'

    def update_column_type(self, table_name: str, column_name: str, sql_type: str) -> str:
        """Determine the most appropriate SQL type when multiple types are encountered"""
        if table_name not in self.column_types:
            self.column_types[table_name] = {}
        
        if column_name not in self.column_types[table_name]:
            self.column_types[table_name][column_name] = set()
        
        self.column_types[table_name][column_name].add(sql_type)
        types = self.column_types[table_name][column_name]
        
        # Type resolution rules
        if len(types) > 1:
            if 'TEXT' in types:
                return 'TEXT'  # Most flexible type
            elif 'JSONB' in types:
                return 'JSONB'
            elif 'DECIMAL' in types and ('INTEGER' in types or 'BIGINT' in types):
                return 'DECIMAL(19,4)'  # Accommodate both integer and decimal
            elif 'TIMESTAMP WITH TIME ZONE' in types and 'TIMESTAMP' in types:
                return 'TIMESTAMP WITH TIME ZONE'
            elif 'BIGINT' in types and 'INTEGER' in types:
                return 'BIGINT'
        
        return sql_type

    def analyze_structure(self, data: Any, prefix: str = '', table_name: str = 'main', depth: int = 0) -> None:
        """Recursively analyze JSON structure"""
        if depth > self.max_depth:
            return

        if table_name not in self.table_schemas:
            self.table_schemas[table_name] = {
                'id SERIAL PRIMARY KEY',
                'created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
                'updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
            }

        if isinstance(data, dict):
            for key, value in data.items():
                column_name = self.sanitize_name(f"{prefix}_{key}" if prefix else key)
                
                if isinstance(value, dict):
                    # For small dictionaries, consider storing as JSONB instead of creating new table
                    if len(value) <= 3:  # Arbitrary threshold
                        sql_type = 'JSONB'
                        self.table_schemas[table_name].add(f"{column_name} {sql_type}")
                    else:
                        self.analyze_structure(value, column_name, table_name, depth + 1)
                elif isinstance(value, list):
                    if value and isinstance(value[0], dict):
                        # Create new table for array of objects
                        new_table = f"{table_name}_{column_name}"
                        if new_table not in self.seen_arrays:
                            self.seen_arrays.add(new_table)
                            self.relationships.append({
                                'from_table': table_name,
                                'to_table': new_table,
                                'type': 'one_to_many'
                            })
                            if new_table not in self.table_schemas:
                                self.table_schemas[new_table] = {
                                    'id SERIAL PRIMARY KEY',
                                    f'{table_name}_id INTEGER REFERENCES {table_name}(id)',
                                    'created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
                                    'updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
                                }
                            # Analyze all items to catch all possible field types
                            for item in value[:10]:  # Limit to first 10 items for performance
                                self.analyze_structure(item, '', new_table, depth + 1)
                    else:
                        # Store simple arrays as JSONB
                        sql_type = 'JSONB'
                        self.table_schemas[table_name].add(f"{column_name} {sql_type}")
                else:
                    sql_type = self.get_sql_type(value, column_name)
                    sql_type = self.update_column_type(table_name, column_name, sql_type)
                    self.table_schemas[table_name].add(f"{column_name} {sql_type}")

    def generate_sql(self) -> Tuple[List[str], List[Dict[str, str]]]:
        """Generate SQL CREATE TABLE statements"""
        create_statements = []
        for table_name, columns in self.table_schemas.items():
            columns_str = ',\n    '.join(sorted(columns))
            create_statements.append(
                f"CREATE TABLE {table_name} (\n    {columns_str}\n);"
            )
        return create_statements, self.relationships

def analyze_json_file(file_path: str) -> Tuple[List[str], List[Dict[str, str]]]:
    """Analyze JSON file and return SQL schema"""
    try:
        with open(file_path, 'r') as f:
            data = json.load(f)
        
        analyzer = JsonSchemaAnalyzer()
        analyzer.analyze_structure(data)
        return analyzer.generate_sql()
    except Exception as e:
        raise Exception(f"Error analyzing JSON file: {str(e)}")

# Example usage
if __name__ == "__main__":
    try:
        file_path = "sample_data.json"
        create_statements, relationships = analyze_json_file(file_path)
        
        print("=== SQL Create Statements ===")
        for stmt in create_statements:
            print(f"{stmt}\n")
        
        print("=== Table Relationships ===")
        for rel in relationships:
            print(f"- {rel['from_table']} -> {rel['to_table']} ({rel['type']})")
            
    except Exception as e:
        print(f"Error: {str(e)}")
Leave a Comment