Untitled
unknown
python
a year ago
1.5 kB
21
Indexable
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional
import json
# Models
class AccountInput(BaseModel):
account_id: int
debtor_uuid: str
# Exceptions
class LambdaError(Exception):
def __init__(self, message: str, error_type: Optional[str] = None):
self.error_type = error_type or self.__class__.__name__
self.error_message = message
super().__init__(message)
def to_dict(self) -> Dict[str, Any]:
return {
"errorType": self.error_type,
"errorMessage": self.error_message
}
class ValidationError(LambdaError):
def __init__(self, message: str):
super().__init__(message, "ValidationError")
def aggregate_input(event: Dict[str, Any]) -> AccountInput:
try:
body = json.loads(event.get('body', '{}'))
return AccountInput(**body)
except json.JSONDecodeError:
raise ValidationError("Invalid JSON in request body")
except ValueError as e:
raise ValidationError(str(e))
def process_account(account: AccountInput) -> Dict[str, Any]:
return {
"account_id": account.account_id,
"debtor_uuid": account.debtor_uuid,
"status": "completed"
}
# Lambda handler
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
try:
account_input = aggregate_input(event)
result = process_account(account_input)
return result
except LambdaError as e:
raise Exception(json.dumps(e.to_dict()))Editor is loading...
Leave a Comment