Untitled
unknown
plain_text
a year ago
1.6 kB
7
Indexable
class UserCreateRequestSchema(BaseModel):
email: EmailStr = Field(...,max_length=320)
password: str = Field(...)
first_name: str = Field(...,max_length=255)
last_name: str = Field(...,max_length=255)
phone_number: str = Field(...,max_length=15)
birth_date: datetime.date = Field(...)
gender_id: int = Field(...,gt=0)
role_id: int = Field(...,gt=0)
@user_router.post("/register", tags=["users"])
async def create_new_user(new_user_data: UserCreateRequestSchema, user_service: user_service_dependency):
new_user = await user_service.create_user(new_user_data.model_dump())
return new_user
class UserService:
def __init__(self,user_repository: UserRepository ) -> None:
self.user_repository = user_repository
async def create_user(self,new_user_data: dict):
"""Create a new user in the database."""
existing_user = await self.user_repository.select_user_by_email_or_phone(
new_user_data.get('email'), new_user_data.get('phone_number')
)
if existing_user:
raise UserAlreadyExistError()
try:
new_user_data.update({'password': get_hashed_password(new_user_data.get('password'))})
user = await self.user_repository.insert_new_user(new_user_data)
jwt_token = sign_jwt(user.id, user.role_id)
return jwt_token
except Exception as e:
print(e)
raise ServerError()
---------------------------------------------------------------------------------------------------------------------
Editor is loading...
Leave a Comment