Полировка users/router

This commit is contained in:
urec56 2024-02-08 22:25:34 +03:00
parent b9f347253d
commit c0f808c20b
6 changed files with 63 additions and 47 deletions

View file

@ -44,16 +44,6 @@ class UserIsNotPresentException(BlackPhoenixException):
status_code = status.HTTP_401_UNAUTHORIZED
class PasswordIsTooShortException(BlackPhoenixException):
status_code = status.HTTP_409_CONFLICT
detail = "Пароль должен быть не менее 8 символов"
class IncorrectLengthOfNicknameException(BlackPhoenixException):
status_code = status.HTTP_409_CONFLICT
detail = "Ник должен быть не короче 2 и не длиннее 30 символов"
class UserDontHavePermissionException(BlackPhoenixException):
status_code = status.HTTP_409_CONFLICT
detail = "У вас нет прав для этого действия"
@ -64,3 +54,11 @@ class MessageNotFoundException(BlackPhoenixException):
detail = "Сообщение не найдено"
class IncorrectPasswordException(BlackPhoenixException):
status_code = status.HTTP_409_CONFLICT
detail = "Введён не верный пароль"
class PasswordsМismatchException(BlackPhoenixException):
status_code = status.HTTP_409_CONFLICT
detail = "Пароли не совпадают"

View file

@ -29,6 +29,7 @@ app.add_middleware(
"Set-Cookie",
"Access-Control-Allow-Headers",
"Authorization",
"Accept"
],
)

View file

@ -42,6 +42,3 @@ async def authenticate_user_by_username(username: str, password: str):
if not user or not verify_password(password, user.hashed_password):
return None
return user

View file

@ -14,7 +14,7 @@ class Users(Base):
username: Mapped[str]
hashed_password: Mapped[str]
role: Mapped[int]
black_phoenix: Mapped[int]
black_phoenix: Mapped[int] # Заменить на bool
avatar_image: Mapped[Optional[str]] = mapped_column(server_default='app/static/images/ты уже пешка BP.png')
date_of_birth: Mapped[date]

View file

@ -1,14 +1,15 @@
from fastapi import APIRouter, Response, Depends
from fastapi.responses import RedirectResponse
from starlette import status
from app.exceptions import UserAlreadyExistsException, IncorrectAuthDataException, UsernameAlreadyInUseException, \
PasswordIsTooShortException, IncorrectLengthOfNicknameException
IncorrectPasswordException, PasswordsМismatchException
from app.users.auth import get_password_hash, authenticate_user_by_email, authenticate_user_by_username, \
create_access_token
create_access_token, verify_password
from app.users.dao import UserDAO
from app.users.dependencies import get_current_user
from app.users.models import Users
from app.users.schemas import SUserLogin, SUserRegister
from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserName, SUserPassword
router = APIRouter(
prefix="/users",
@ -16,18 +17,18 @@ router = APIRouter(
)
@router.get("/teleport")
async def get_teleport() -> RedirectResponse:
@router.get("/teleport", response_class=RedirectResponse, status_code=status.HTTP_307_TEMPORARY_REDIRECT)
async def get_teleport():
return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
@router.get("")
@router.get("", response_model=list[SUser])
async def get_all_users():
users = await UserDAO.find_all()
return users
@router.post("/register")
@router.post("/register", response_model=dict[str, str])
async def register_user(response: Response, user_data: SUserRegister):
existing_user = await UserDAO.find_one_or_none(email=user_data.email)
if existing_user:
@ -35,21 +36,22 @@ async def register_user(response: Response, user_data: SUserRegister):
existing_user = await UserDAO.find_one_or_none(username=user_data.username)
if existing_user:
raise UserAlreadyExistsException
if len(user_data.password) < 8:
raise PasswordIsTooShortException
if len(user_data.username) < 2 or len(user_data.username) > 30:
raise IncorrectLengthOfNicknameException
hashed_password = get_password_hash(user_data.password)
await UserDAO.add(email=user_data.email, hashed_password=hashed_password,
username=user_data.username, date_of_birth=user_data.date_of_birth,
role=0, black_phoenix=0)
await UserDAO.add(
email=user_data.email,
hashed_password=hashed_password,
username=user_data.username,
date_of_birth=user_data.date_of_birth,
role=0,
black_phoenix=0
)
user = await authenticate_user_by_email(user_data.email, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True)
return {"access_token": access_token}
@router.post("/login")
@router.post("/login", response_model=dict[str, str])
async def login_user(response: Response, user_data: SUserLogin):
user = await authenticate_user_by_email(user_data.email_or_username, user_data.password)
if not user:
@ -61,30 +63,31 @@ async def login_user(response: Response, user_data: SUserLogin):
return {"access_token": access_token}
@router.post("/logout")
@router.post("/logout", status_code=status.HTTP_200_OK)
async def logout_user(response: Response):
response.delete_cookie("black_phoenix_access_token")
@router.get("/me")
@router.get("/me", response_model=SUser)
async def read_users_me(current_user: Users = Depends(get_current_user)):
return current_user
@router.patch("/rename")
async def rename_user(new_username, current_user: Users = Depends(get_current_user)):
if len(new_username) < 2 or len(new_username) > 30:
raise IncorrectLengthOfNicknameException
existing_user = await UserDAO.find_one_or_none(username=new_username)
@router.patch("/rename", response_model=str)
async def rename_user(new_username: SUserName, current_user: Users = Depends(get_current_user)):
existing_user = await UserDAO.find_one_or_none(username=new_username.username)
if existing_user:
raise UsernameAlreadyInUseException
new_username = await UserDAO.change_data(current_user.id, username=new_username)
new_username = await UserDAO.change_data(current_user.id, username=new_username.username)
return new_username
@router.patch("/change_password")
async def change_password(new_password, current_user: Users = Depends(get_current_user)):
if len(new_password) < 8:
raise PasswordIsTooShortException
hashed_password = get_password_hash(new_password)
@router.patch("/change_password", status_code=status.HTTP_200_OK)
async def change_password(new_password: SUserPassword, current_user: Users = Depends(get_current_user)):
existing_user = await UserDAO.find_one_or_none(id=current_user.id)
if not verify_password(new_password.password, existing_user.hashed_password):
raise IncorrectPasswordException
if new_password.new_password != new_password.new_password2:
raise PasswordsМismatchException
hashed_password = get_password_hash(new_password.new_password)
await UserDAO.change_data(current_user.id, hashed_password=hashed_password)

View file

@ -1,22 +1,39 @@
from datetime import date
from pydantic import BaseModel, EmailStr
from fastapi import Query
class SUserLogin(BaseModel):
email_or_username: EmailStr | str
password: str
class Config:
from_attributes = True
class SUserRegister(BaseModel):
email: EmailStr
username: str = Query(None, min_length=2, max_length=30)
password: str = Query(None, min_length=8)
date_of_birth: date
class SUser(BaseModel):
email: EmailStr
id: int
username: str
password: str
avatar_image: str
black_phoenix: int
date_of_birth: date
class Config:
from_attributes = True
class SUserName(BaseModel):
username: str = Query(None, min_length=2, max_length=30)
class SUserPassword(BaseModel):
password: str = Query(None, min_length=8)
new_password: str = Query(None, min_length=8)
new_password2: str = Query(None, min_length=8)