Untitled
unknown
plain_text
a year ago
1.6 kB
3
Indexable
class UserCreateRequestSchema(BaseModel): email: EmailStr = Field(...,max_length=320) password: str = Field(...) first_name: str = Field(...,max_length=255) last_name: str = Field(...,max_length=255) phone_number: str = Field(...,max_length=15) birth_date: datetime.date = Field(...) gender_id: int = Field(...,gt=0) role_id: int = Field(...,gt=0) @user_router.post("/register", tags=["users"]) async def create_new_user(new_user_data: UserCreateRequestSchema, user_service: user_service_dependency): new_user = await user_service.create_user(new_user_data.model_dump()) return new_user class UserService: def __init__(self,user_repository: UserRepository ) -> None: self.user_repository = user_repository async def create_user(self,new_user_data: dict): """Create a new user in the database.""" existing_user = await self.user_repository.select_user_by_email_or_phone( new_user_data.get('email'), new_user_data.get('phone_number') ) if existing_user: raise UserAlreadyExistError() try: new_user_data.update({'password': get_hashed_password(new_user_data.get('password'))}) user = await self.user_repository.insert_new_user(new_user_data) jwt_token = sign_jwt(user.id, user.role_id) return jwt_token except Exception as e: print(e) raise ServerError() ---------------------------------------------------------------------------------------------------------------------
Editor is loading...
Leave a Comment