from fastapi import APIRouter, Response, Depends from pydantic import EmailStr from app.exceptions import UserAlreadyExistsException, IncorrectAuthDataException, UsernameAlreadyInUseException, \ PasswordIsTooShortException, IncorrectLengthOfNicknameException from app.users.auth import get_password_hash, authenticate_user_by_email, authenticate_user_by_username, \ create_access_token from app.users.dao import UserDAO from app.users.dependencies import get_current_user from app.users.models import Users from app.users.schemas import SUserLogin, SUserRegister router = APIRouter( prefix="/users", tags=["Пользователи"] ) @router.get("") async def get_all_users(): users = await UserDAO.find_all() return users @router.post("/register") async def register_user(response: Response, user_data: SUserRegister): existing_user = await UserDAO.find_one_or_none(email=user_data.email) if existing_user: raise UserAlreadyExistsException existing_user = await UserDAO.find_one_or_none(username=user_data.username) if existing_user: raise UserAlreadyExistsException if len(user_data.password) < 8: raise PasswordIsTooShortException if len(user_data.username) < 2 or len(user_data.username) > 30: raise IncorrectLengthOfNicknameException hashed_password = get_password_hash(user_data.password) await UserDAO.add(email=user_data.email, hashed_password=hashed_password, username=user_data.username, date_of_birth=user_data.date_of_birth, role=0, black_phoenix=0) user = await authenticate_user_by_email(user_data.email, user_data.password) access_token = create_access_token({"sub": str(user.id)}) response.set_cookie("black_phoenix_access_token", access_token, httponly=True) return {"access_token": access_token} @router.post("/login") async def login_user(response: Response, user_data: SUserLogin): user = await authenticate_user_by_email(user_data.email_or_username, user_data.password) if not user: user = await authenticate_user_by_username(user_data.email_or_username, user_data.password) if not user: raise IncorrectAuthDataException access_token = create_access_token({"sub": str(user.id)}) response.set_cookie("black_phoenix_access_token", access_token, httponly=True) return {"access_token": access_token} @router.post("/logout") async def logout_user(response: Response): response.delete_cookie("black_phoenix_access_token") @router.get("/me") async def read_users_me(current_user: Users = Depends(get_current_user)): return current_user @router.patch("/rename") async def rename_user(new_username, current_user: Users = Depends(get_current_user)): if len(new_username) < 2 or len(new_username) > 30: raise IncorrectLengthOfNicknameException existing_user = await UserDAO.find_one_or_none(username=new_username) if existing_user: raise UsernameAlreadyInUseException new_username = await UserDAO.change_data(current_user.id, username=new_username) return new_username @router.patch("/change_password") async def change_password(new_password, current_user: Users = Depends(get_current_user)): if len(new_password) < 8: raise PasswordIsTooShortException hashed_password = get_password_hash(new_password) await UserDAO.change_data(current_user.id, hashed_password=hashed_password)