210 lines
8.4 KiB
Python
210 lines
8.4 KiB
Python
import json
|
|
|
|
from cryptography.fernet import Fernet
|
|
from fastapi import APIRouter, Response, Depends, status
|
|
from fastapi.responses import RedirectResponse
|
|
|
|
from app.config import settings
|
|
from app.exceptions import (
|
|
UsernameAlreadyInUseException,
|
|
IncorrectPasswordException,
|
|
PasswordsMismatchException,
|
|
WrongCodeException,
|
|
UserNotFoundException,
|
|
SomethingWentWrongException,
|
|
UserAlreadyExistsException,
|
|
)
|
|
from app.users.auth import get_password_hash, create_access_token, verify_password, VERIFICATED_USER, AuthService
|
|
from app.users.dao import UserDAO, UserCodesDAO
|
|
from app.users.dependencies import get_current_user, check_verificated_user_with_exc
|
|
from app.users.models import Users
|
|
from app.users.schemas import (
|
|
SUserLogin,
|
|
SUserRegister,
|
|
SUserResponse,
|
|
SUserPassword,
|
|
SUserRename,
|
|
SUserAvatar,
|
|
SUserPasswordRecover,
|
|
SUserCode,
|
|
SUserPasswordChange,
|
|
SRecoverEmailSent,
|
|
SUserToken,
|
|
SEmailVerification,
|
|
SUserName,
|
|
SNewAvatar,
|
|
SConfirmPasswordRecovery,
|
|
SPasswordRecovered,
|
|
SUserAvatars,
|
|
SUsername,
|
|
SEmail,
|
|
)
|
|
from app.tasks.tasks import send_registration_confirmation_email, send_password_change_email, send_password_recover_email
|
|
|
|
router = APIRouter(prefix="/users", tags=["Пользователи"])
|
|
|
|
|
|
@router.get("/teleport", response_class=RedirectResponse, status_code=status.HTTP_307_TEMPORARY_REDIRECT)
|
|
async def get_teleport():
|
|
return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
|
|
|
|
|
|
@router.get("", response_model=list[SUserResponse])
|
|
async def get_all_users():
|
|
users = await UserDAO.find_all()
|
|
return users
|
|
|
|
|
|
@router.post("/check_existing_username", status_code=status.HTTP_200_OK)
|
|
async def check_existing_username(username: SUsername):
|
|
user = await UserDAO.find_one_or_none(username=username.username)
|
|
if user:
|
|
raise UserAlreadyExistsException
|
|
|
|
|
|
@router.post("/check_existing_email", status_code=status.HTTP_200_OK)
|
|
async def check_existing_email(email: SEmail):
|
|
user = await UserDAO.find_one_or_none(email=email.email)
|
|
if user:
|
|
raise UserAlreadyExistsException
|
|
|
|
|
|
@router.post("/register", response_model=SUserToken, status_code=status.HTTP_201_CREATED)
|
|
async def register_user(response: Response, user_data: SUserRegister):
|
|
if user_data.password != user_data.password2:
|
|
raise PasswordsMismatchException
|
|
await AuthService.check_existing_user(user_data)
|
|
|
|
hashed_password = get_password_hash(user_data.password)
|
|
user_id = await UserDAO.add(
|
|
email=user_data.email,
|
|
hashed_password=hashed_password,
|
|
username=user_data.username,
|
|
date_of_birth=user_data.date_of_birth,
|
|
)
|
|
|
|
result = send_registration_confirmation_email.delay(
|
|
user_id=user_id, username=user_data.username, email_to=user_data.email, MODE=settings.MODE
|
|
)
|
|
result = result.get()
|
|
|
|
if await UserCodesDAO.set_user_codes(user_id=user_id, code=result, description="Код подтверждения почты") == result:
|
|
user = await AuthService.authenticate_user_by_email(user_data.email, user_data.password)
|
|
access_token = create_access_token({"sub": str(user.id)})
|
|
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True, secure=True)
|
|
return {"access_token": access_token}
|
|
|
|
|
|
@router.get("/email_verification/{user_code}", status_code=status.HTTP_200_OK, response_model=SEmailVerification)
|
|
async def email_verification(user_code: str):
|
|
invitation_token = user_code.encode()
|
|
cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY)
|
|
user_data = cipher_suite.decrypt(invitation_token).decode("utf-8")
|
|
user_data = json.loads(user_data)
|
|
user_codes = await UserCodesDAO.get_user_codes(
|
|
user_id=user_data["user_id"], description="Код подтверждения почты", code=user_data["confirmation_code"]
|
|
)
|
|
if not user_codes or not await UserDAO.change_data(user_id=user_data["user_id"], role=VERIFICATED_USER):
|
|
raise WrongCodeException
|
|
return {"email_verification": True}
|
|
|
|
|
|
@router.post("/login", response_model=SUserToken)
|
|
async def login_user(response: Response, user_data: SUserLogin):
|
|
user = await AuthService.authenticate_user(user_data.email_or_username, user_data.password)
|
|
access_token = create_access_token({"sub": str(user.id)})
|
|
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
|
|
return {"access_token": access_token}
|
|
|
|
|
|
@router.post("/logout", status_code=status.HTTP_200_OK)
|
|
async def logout_user(response: Response):
|
|
response.delete_cookie("black_phoenix_access_token")
|
|
|
|
|
|
@router.get("/me", response_model=SUserResponse)
|
|
async def read_users_me(current_user: Users = Depends(get_current_user)):
|
|
return current_user
|
|
|
|
|
|
@router.patch("/rename", response_model=SUserName)
|
|
async def rename_user(new_username: SUserRename, current_user: Users = Depends(check_verificated_user_with_exc)):
|
|
if not verify_password(new_username.password, current_user.hashed_password):
|
|
raise IncorrectPasswordException
|
|
existing_user = await UserDAO.find_one_or_none(username=new_username.username)
|
|
if existing_user:
|
|
raise UsernameAlreadyInUseException
|
|
new_username = await UserDAO.change_data(current_user.id, username=new_username.username)
|
|
return {"username": new_username}
|
|
|
|
|
|
@router.patch("/change_avatar", response_model=SNewAvatar)
|
|
async def change_avatar(user_data: SUserAvatar, current_user: Users = Depends(check_verificated_user_with_exc)):
|
|
if not verify_password(user_data.password, current_user.hashed_password):
|
|
raise IncorrectPasswordException
|
|
if await UserDAO.change_data(
|
|
current_user.id, avatar_image=user_data.new_avatar_image, avatar_hex=user_data.avatar_hex
|
|
) and await UserDAO.add_user_avatar(user_id=current_user.id, avatar=user_data.new_avatar_image):
|
|
return {"new_avatar_image": user_data.new_avatar_image, "avatar_hex": user_data.avatar_hex}
|
|
raise SomethingWentWrongException
|
|
|
|
|
|
@router.patch("/change_password", status_code=status.HTTP_200_OK)
|
|
async def change_password(new_password: SUserPassword, current_user: Users = Depends(check_verificated_user_with_exc)):
|
|
if new_password.new_password != new_password.new_password2:
|
|
raise PasswordsMismatchException
|
|
existing_user = await UserDAO.find_one_or_none(id=current_user.id)
|
|
if not verify_password(new_password.current_password, existing_user.hashed_password):
|
|
raise IncorrectPasswordException
|
|
hashed_password = get_password_hash(new_password.new_password)
|
|
await UserDAO.change_data(current_user.id, hashed_password=hashed_password)
|
|
send_password_change_email.delay(current_user.username, current_user.email, MODE=settings.MODE)
|
|
|
|
|
|
@router.patch("/send_recovery_email", status_code=status.HTTP_200_OK, response_model=SRecoverEmailSent)
|
|
async def send_recovery_email(email: SUserPasswordRecover):
|
|
existing_user = await UserDAO.find_one_or_none(email=email.email)
|
|
if not existing_user:
|
|
raise UserNotFoundException
|
|
result = send_password_recover_email.delay(existing_user.username, existing_user.email, MODE=settings.MODE)
|
|
result = result.get()
|
|
|
|
if (
|
|
await UserCodesDAO.set_user_codes(
|
|
user_id=existing_user.user_id, code=result, description="Код восстановления пароля"
|
|
)
|
|
== result
|
|
):
|
|
return {"recover_email_sent": True}
|
|
raise SomethingWentWrongException
|
|
|
|
|
|
@router.post("/confirm_password_recovery", status_code=status.HTTP_200_OK, response_model=SConfirmPasswordRecovery)
|
|
async def confirm_password_recovery(user_code: SUserCode):
|
|
user_codes = await UserCodesDAO.get_user_codes(description="Код восстановления пароля", code=user_code.user_code)
|
|
if not user_codes:
|
|
raise WrongCodeException
|
|
return {"user_id": user_codes[0]["user_id"]}
|
|
|
|
|
|
@router.post("/password_recovery", status_code=status.HTTP_200_OK, response_model=SPasswordRecovered)
|
|
async def password_recovery(passwords: SUserPasswordChange):
|
|
if passwords.password1 != passwords.password2:
|
|
raise PasswordsMismatchException
|
|
hashed_password = get_password_hash(passwords.password1)
|
|
username = await UserDAO.change_data(passwords.user_id, hashed_password=hashed_password)
|
|
user = await UserDAO.find_one_or_none(username=username, id=passwords.user_id)
|
|
if not user:
|
|
raise UserNotFoundException
|
|
send_password_change_email.delay(user.username, user.email, MODE=settings.MODE)
|
|
return {"username": username}
|
|
|
|
|
|
@router.get("/avatars", response_model=list[SUserAvatars])
|
|
async def get_user_avatars_history(user: Users = Depends(get_current_user)):
|
|
return await UserDAO.get_user_avatars(user_id=user.id)
|
|
|
|
|
|
@router.delete("/avatars", response_model=bool)
|
|
async def delete_form_user_avatars_history(avatar_id: int, user: Users = Depends(get_current_user)):
|
|
return await UserDAO.delete_user_avatar(avatar_id=avatar_id, user_id=user.id)
|