from fastapi import APIRouter, Response, Depends from fastapi.responses import RedirectResponse from starlette import status from app.exceptions import UserAlreadyExistsException, UsernameAlreadyInUseException, \ IncorrectPasswordException, PasswordsМismatchException, WrongCodeException from app.users.auth import get_password_hash, authenticate_user_by_email, authenticate_user_by_username, \ create_access_token, verify_password, REGISTRATED_USER, get_user_codes_list, VERIFICATED_USER, authenticate_user, \ check_existing_user from app.users.dao import UserDAO, UserCodesDAO from app.users.dependencies import get_current_user from app.users.models import Users from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserName, SUserPassword from app.tasks.tasks import send_registration_confirmation_email router = APIRouter( prefix="/users", tags=["Пользователи"] ) @router.get("/teleport", response_class=RedirectResponse, status_code=status.HTTP_307_TEMPORARY_REDIRECT) async def get_teleport(): return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ") @router.get("", response_model=list[SUser]) async def get_all_users(): users = await UserDAO.find_all() return users @router.post("/register", response_model=dict[str, str]) async def register_user(response: Response, user_data: SUserRegister): await check_existing_user(user_data) hashed_password = get_password_hash(user_data.password) user_id = await UserDAO.add( email=user_data.email, hashed_password=hashed_password, username=user_data.username, date_of_birth=user_data.date_of_birth, ) result = send_registration_confirmation_email.delay(username=user_data.username, email_to=user_data.email) result = result.get() if await UserCodesDAO.set_user_codes(user_id=user_id, code=result) == result: user = await authenticate_user_by_email(user_data.email, user_data.password) access_token = create_access_token({"sub": str(user.id)}) response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True) return {"access_token": access_token} @router.post('/email_verification', response_model=bool) async def email_verification(user_code: str, user: Users = Depends(get_current_user)): user_codes = await UserCodesDAO.get_user_codes(user_id=user.id) user_codes = get_user_codes_list(user_codes=user_codes, sort_description="Код подтверждения почты") if user_code in user_codes: if await UserDAO.change_data(user_id=user.id, role=VERIFICATED_USER): return True raise WrongCodeException @router.post("/login", response_model=dict[str, str]) async def login_user(response: Response, user_data: SUserLogin): user = await authenticate_user(user_data.email_or_username, user_data.password) access_token = create_access_token({"sub": str(user.id)}) response.set_cookie("black_phoenix_access_token", access_token, httponly=True) return {"access_token": access_token} @router.post("/logout", status_code=status.HTTP_200_OK) async def logout_user(response: Response): response.delete_cookie("black_phoenix_access_token") @router.get("/me", response_model=SUser) async def read_users_me(current_user: Users = Depends(get_current_user)): return current_user @router.patch("/rename", response_model=str) async def rename_user(new_username: SUserName, current_user: Users = Depends(get_current_user)): existing_user = await UserDAO.find_one_or_none(username=new_username.username) if existing_user: raise UsernameAlreadyInUseException new_username = await UserDAO.change_data(current_user.id, username=new_username.username) return new_username @router.patch("/change_password", status_code=status.HTTP_200_OK) async def change_password(new_password: SUserPassword, current_user: Users = Depends(get_current_user)): existing_user = await UserDAO.find_one_or_none(id=current_user.id) if not verify_password(new_password.password, existing_user.hashed_password): raise IncorrectPasswordException if new_password.new_password != new_password.new_password2: raise PasswordsМismatchException hashed_password = get_password_hash(new_password.new_password) await UserDAO.change_data(current_user.id, hashed_password=hashed_password)