from fastapi import APIRouter, Response, Depends, status from fastapi.responses import RedirectResponse from app.config import settings from app.exceptions import UsernameAlreadyInUseException, \ IncorrectPasswordException, PasswordsМismatchException, WrongCodeException, UserNotFoundException, \ SomethingWentWrongException from app.users.auth import get_password_hash, authenticate_user_by_email, \ create_access_token, verify_password, VERIFICATED_USER, authenticate_user, \ check_existing_user, check_verificated_user_with_exc from app.users.dao import UserDAO, UserCodesDAO from app.users.dependencies import get_current_user from app.users.models import Users from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserPassword, SUserRename, SUserAvatar, \ SUserPasswordRecover, SUserCode, SUserPasswordChange, SRecoverEmailSent, SUserToken, SEmailVerification, SUserName, \ SNewAvatar, SConfirmPasswordRecovery, SPasswordRecovered from app.tasks.tasks import send_registration_confirmation_email, send_password_change_email, \ send_password_recover_email router = APIRouter( prefix="/users", tags=["Пользователи"] ) @router.get("/teleport", response_class=RedirectResponse, status_code=status.HTTP_307_TEMPORARY_REDIRECT) async def get_teleport(): return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ") @router.get("", response_model=list[SUser]) async def get_all_users(): users = await UserDAO.find_all() return users @router.post("/register", response_model=SUserToken) async def register_user(response: Response, user_data: SUserRegister): await check_existing_user(user_data) hashed_password = get_password_hash(user_data.password) user_id = await UserDAO.add( email=user_data.email, hashed_password=hashed_password, username=user_data.username, date_of_birth=user_data.date_of_birth, ) result = send_registration_confirmation_email.delay( username=user_data.username, email_to=user_data.email, MODE=settings.MODE ) result = result.get() if await UserCodesDAO.set_user_codes( user_id=user_id, code=result, description="Код подтверждения почты" ) == result: user = await authenticate_user_by_email(user_data.email, user_data.password) access_token = create_access_token({"sub": str(user.id)}) response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True, secure=True) return {"access_token": access_token} @router.post('/email_verification', status_code=status.HTTP_200_OK, response_model=SEmailVerification) async def email_verification(user_code: SUserCode, user: Users = Depends(get_current_user)): user_codes = await UserCodesDAO.get_user_codes( user_id=user.id, description="Код подтверждения почты", code=user_code.user_code ) if not user_codes or not await UserDAO.change_data(user_id=user.id, role=VERIFICATED_USER): raise WrongCodeException return {'email_verification': True} @router.post("/login", response_model=SUserToken) async def login_user(response: Response, user_data: SUserLogin): user = await authenticate_user(user_data.email_or_username, user_data.password) access_token = create_access_token({"sub": str(user.id)}) response.set_cookie("black_phoenix_access_token", access_token, httponly=True) return {"access_token": access_token} @router.post("/logout", status_code=status.HTTP_200_OK) async def logout_user(response: Response): response.delete_cookie("black_phoenix_access_token") @router.get("/me", response_model=SUser) async def read_users_me(current_user: Users = Depends(get_current_user)): return current_user @router.patch("/rename", response_model=SUserName) async def rename_user(new_username: SUserRename, current_user: Users = Depends(get_current_user)): await check_verificated_user_with_exc(user_id=current_user.id) if not verify_password(new_username.password, current_user.hashed_password): raise IncorrectPasswordException existing_user = await UserDAO.find_one_or_none(username=new_username.username) if existing_user: raise UsernameAlreadyInUseException new_username = await UserDAO.change_data(current_user.id, username=new_username.username) return {'username': new_username} @router.patch("/change_avatar", response_model=SNewAvatar) async def change_avatar(user_data: SUserAvatar, current_user: Users = Depends(get_current_user)): await check_verificated_user_with_exc(user_id=current_user.id) if not verify_password(user_data.password, current_user.hashed_password): raise IncorrectPasswordException if await UserDAO.change_data( current_user.id, avatar_image=user_data.new_avatar_image, avatar_hex=SUserAvatar.avatar_hex ): return {'new_avatar_image': user_data.new_avatar_image, 'avatar_hex': SUserAvatar.avatar_hex} raise SomethingWentWrongException @router.patch("/change_password", status_code=status.HTTP_200_OK) async def change_password(new_password: SUserPassword, current_user: Users = Depends(get_current_user)): await check_verificated_user_with_exc(user_id=current_user.id) existing_user = await UserDAO.find_one_or_none(id=current_user.id) if not verify_password(new_password.current_password, existing_user.hashed_password): raise IncorrectPasswordException if new_password.new_password != new_password.new_password2: raise PasswordsМismatchException hashed_password = get_password_hash(new_password.new_password) await UserDAO.change_data(current_user.id, hashed_password=hashed_password) send_password_change_email.delay(current_user.username, current_user.email, MODE=settings.MODE) @router.patch("/send_recovery_email", status_code=status.HTTP_200_OK, response_model=SRecoverEmailSent) async def send_recovery_email(email: SUserPasswordRecover): existing_user = await UserDAO.find_one_or_none(email=email.email) if not existing_user: raise UserNotFoundException result = send_password_recover_email.delay(existing_user.username, existing_user.email, MODE=settings.MODE) result = result.get() if await UserCodesDAO.set_user_codes( user_id=existing_user.user_id, code=result, description="Код восстановления пароля" ) == result: return {'recover_email_sent': True} raise SomethingWentWrongException @router.post("/confirm_password_recovery", status_code=status.HTTP_200_OK, response_model=SConfirmPasswordRecovery) async def confirm_password_recovery(user_code: SUserCode): user_codes = await UserCodesDAO.get_user_codes( description="Код восстановления пароля", code=user_code.user_code ) if not user_codes: raise WrongCodeException return {'user_id': user_codes[0]['user_id']} @router.post("/password_recovery", status_code=status.HTTP_200_OK, response_model=SPasswordRecovered) async def password_recovery(passwords: SUserPasswordChange): if passwords.password1 != passwords.password2: raise PasswordsМismatchException hashed_password = get_password_hash(passwords.password1) username = await UserDAO.change_data(passwords.user_id, hashed_password=hashed_password) user = await UserDAO.find_one_or_none(username=username, id=passwords.user_id) if not user: raise UserNotFoundException send_password_change_email.delay(user.username, user.email, MODE=settings.MODE) return {"username": username}