Untitled

 avatar
unknown
python
3 months ago
1.5 kB
8
Indexable
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional
import json

# Models
class AccountInput(BaseModel):
    account_id: int
    debtor_uuid: str

# Exceptions
class LambdaError(Exception):
    def __init__(self, message: str, error_type: Optional[str] = None):
        self.error_type = error_type or self.__class__.__name__
        self.error_message = message
        super().__init__(message)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "errorType": self.error_type,
            "errorMessage": self.error_message
        }

class ValidationError(LambdaError):
    def __init__(self, message: str):
        super().__init__(message, "ValidationError")


def aggregate_input(event: Dict[str, Any]) -> AccountInput:
    try:
        body = json.loads(event.get('body', '{}'))
        return AccountInput(**body)
    except json.JSONDecodeError:
        raise ValidationError("Invalid JSON in request body")
    except ValueError as e:
        raise ValidationError(str(e))


def process_account(account: AccountInput) -> Dict[str, Any]:
    return {
        "account_id": account.account_id,
        "debtor_uuid": account.debtor_uuid,
        "status": "completed"
    }

# Lambda handler
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    try:
        account_input = aggregate_input(event)
        result = process_account(account_input)
        return result
    except LambdaError as e:
        raise Exception(json.dumps(e.to_dict()))
Editor is loading...
Leave a Comment