Untitled
unknown
python
3 months ago
1.5 kB
8
Indexable
from pydantic import BaseModel, Field from typing import Dict, Any, Optional import json # Models class AccountInput(BaseModel): account_id: int debtor_uuid: str # Exceptions class LambdaError(Exception): def __init__(self, message: str, error_type: Optional[str] = None): self.error_type = error_type or self.__class__.__name__ self.error_message = message super().__init__(message) def to_dict(self) -> Dict[str, Any]: return { "errorType": self.error_type, "errorMessage": self.error_message } class ValidationError(LambdaError): def __init__(self, message: str): super().__init__(message, "ValidationError") def aggregate_input(event: Dict[str, Any]) -> AccountInput: try: body = json.loads(event.get('body', '{}')) return AccountInput(**body) except json.JSONDecodeError: raise ValidationError("Invalid JSON in request body") except ValueError as e: raise ValidationError(str(e)) def process_account(account: AccountInput) -> Dict[str, Any]: return { "account_id": account.account_id, "debtor_uuid": account.debtor_uuid, "status": "completed" } # Lambda handler def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: try: account_input = aggregate_input(event) result = process_account(account_input) return result except LambdaError as e: raise Exception(json.dumps(e.to_dict()))
Editor is loading...
Leave a Comment