import json from cryptography.fernet import Fernet from fastapi import APIRouter, Response, Depends, status from app.config import settings from app.exceptions import ( PasswordsMismatchException, WrongCodeException, UserNotFoundException, SomethingWentWrongException, UserAlreadyExistsException, ) from app.users.auth import get_password_hash, create_access_token, VERIFICATED_USER, AuthService from app.users.dao import UserDAO, UserCodesDAO from app.users.dependencies import get_current_user from app.users.schemas import ( SUserLogin, SUserRegister, SUserResponse, SUserPasswordRecover, SUserCode, SUserPasswordChange, SRecoverEmailSent, SUserToken, SEmailVerification, SConfirmPasswordRecovery, SPasswordRecovered, SUserAvatars, SUsername, SEmail, SUser, ) from app.tasks.tasks import send_registration_confirmation_email, send_password_change_email, \ send_password_recover_email router = APIRouter(prefix="/users", tags=["Пользователи"]) @router.get( "", status_code=status.HTTP_200_OK, response_model=list[SUserResponse], ) async def get_all_users(): users = await UserDAO.find_all() return users @router.post( "/check_existing_username", status_code=status.HTTP_200_OK, response_model=None, ) async def check_existing_username(username: SUsername): user = await UserDAO.find_one_or_none(username=username.username) if user: raise UserAlreadyExistsException @router.post( "/check_existing_email", status_code=status.HTTP_200_OK, response_model=None, ) async def check_existing_email(email: SEmail): user = await UserDAO.find_one_or_none(email=email.email) if user: raise UserAlreadyExistsException @router.post( "/register", status_code=status.HTTP_201_CREATED, response_model=SUserToken, ) async def register_user(response: Response, user_data: SUserRegister): if user_data.password != user_data.password2: raise PasswordsMismatchException await AuthService.check_existing_user(user_data) hashed_password = get_password_hash(user_data.password) user_id = await UserDAO.add( email=user_data.email, hashed_password=hashed_password, username=user_data.username, date_of_birth=user_data.date_of_birth, ) result = send_registration_confirmation_email.delay( user_id=user_id, username=user_data.username, email_to=user_data.email, MODE=settings.MODE ) result = result.get() if await UserCodesDAO.set_user_codes(user_id=user_id, code=result, description="Код подтверждения почты") == result: user = await AuthService.authenticate_user_by_email(user_data.email, user_data.password) access_token = create_access_token({"sub": str(user.id)}) response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True, secure=True) return {"access_token": access_token} @router.get( "/email_verification/{user_code}", status_code=status.HTTP_200_OK, response_model=SEmailVerification, ) async def email_verification(user_code: str): invitation_token = user_code.encode() cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY) user_data = cipher_suite.decrypt(invitation_token).decode("utf-8") user_data = json.loads(user_data) user_codes = await UserCodesDAO.get_user_codes( user_id=user_data["user_id"], description="Код подтверждения почты", code=user_data["confirmation_code"] ) if not user_codes or not await UserDAO.change_data(user_id=user_data["user_id"], role=VERIFICATED_USER): raise WrongCodeException return {"email_verification": True} @router.post( "/login", status_code=status.HTTP_200_OK, response_model=SUserToken, ) async def login_user(response: Response, user_data: SUserLogin): user = await AuthService.authenticate_user(user_data.email_or_username, user_data.password) access_token = create_access_token({"sub": str(user.id)}) response.set_cookie("black_phoenix_access_token", access_token, httponly=True) return {"access_token": access_token} @router.post( "/logout", status_code=status.HTTP_200_OK, response_model=None, ) async def logout_user(response: Response): response.delete_cookie("black_phoenix_access_token") @router.get( "/me", status_code=status.HTTP_200_OK, response_model=SUserResponse, ) async def get_user(current_user: SUser = Depends(get_current_user)): return current_user @router.patch( "/send_recovery_email", status_code=status.HTTP_200_OK, response_model=SRecoverEmailSent, ) async def send_recovery_email(email: SUserPasswordRecover): existing_user = await UserDAO.find_one_or_none(email=email.email) if not existing_user: raise UserNotFoundException result = send_password_recover_email.delay(existing_user.username, existing_user.email, MODE=settings.MODE) result = result.get() if ( await UserCodesDAO.set_user_codes( user_id=existing_user.user_id, code=result, description="Код восстановления пароля" ) == result ): return {"recover_email_sent": True} raise SomethingWentWrongException @router.post( "/confirm_password_recovery", status_code=status.HTTP_200_OK, response_model=SConfirmPasswordRecovery, ) async def confirm_password_recovery(user_code: SUserCode): user_codes = await UserCodesDAO.get_user_codes(description="Код восстановления пароля", code=user_code.user_code) if not user_codes: raise WrongCodeException return {"user_id": user_codes[0]["user_id"]} @router.post( "/password_recovery", status_code=status.HTTP_200_OK, response_model=SPasswordRecovered, ) async def password_recovery(passwords: SUserPasswordChange): if passwords.password1 != passwords.password2: raise PasswordsMismatchException hashed_password = get_password_hash(passwords.password1) username = await UserDAO.change_data(passwords.user_id, hashed_password=hashed_password) user = await UserDAO.find_one_or_none(username=username, id=passwords.user_id) if not user: raise UserNotFoundException send_password_change_email.delay(user.username, user.email, MODE=settings.MODE) return {"username": username} @router.get( "/avatars", status_code=status.HTTP_200_OK, response_model=SUserAvatars, ) async def get_user_avatars_history(user: SUser = Depends(get_current_user)): return await UserDAO.get_user_avatars(user_id=user.id)