chat_back/app/users/router.py

172 lines
8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Response, Depends, status
from fastapi.responses import RedirectResponse
from app.config import settings
from app.exceptions import UsernameAlreadyInUseException, \
IncorrectPasswordException, PasswordsМismatchException, WrongCodeException, UserNotFoundException, \
SomethingWentWrongException
from app.users.auth import get_password_hash, authenticate_user_by_email, \
create_access_token, verify_password, VERIFICATED_USER, authenticate_user, \
check_existing_user, check_verificated_user_with_exc
from app.users.dao import UserDAO, UserCodesDAO
from app.users.dependencies import get_current_user
from app.users.models import Users
from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserPassword, SUserRename, SUserAvatar, \
SUserPasswordRecover, SUserCode, SUserPasswordChange, SRecoverEmailSent, SUserToken, SEmailVerification, SUserName, \
SNewAvatar, SConfirmPasswordRecovery, SPasswordRecovered, SUserAvatars
from app.tasks.tasks import send_registration_confirmation_email, send_password_change_email, \
send_password_recover_email
router = APIRouter(
prefix="/users",
tags=["Пользователи"]
)
@router.get("/teleport", response_class=RedirectResponse, status_code=status.HTTP_307_TEMPORARY_REDIRECT)
async def get_teleport():
return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
@router.get("", response_model=list[SUser])
async def get_all_users():
users = await UserDAO.find_all()
return users
@router.post("/register", response_model=SUserToken)
async def register_user(response: Response, user_data: SUserRegister):
await check_existing_user(user_data)
hashed_password = get_password_hash(user_data.password)
user_id = await UserDAO.add(
email=user_data.email,
hashed_password=hashed_password,
username=user_data.username,
date_of_birth=user_data.date_of_birth,
)
result = send_registration_confirmation_email.delay(
username=user_data.username, email_to=user_data.email, MODE=settings.MODE
)
result = result.get()
if await UserCodesDAO.set_user_codes(
user_id=user_id, code=result, description="Код подтверждения почты"
) == result:
user = await authenticate_user_by_email(user_data.email, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True, secure=True)
return {"access_token": access_token}
@router.post('/email_verification', status_code=status.HTTP_200_OK, response_model=SEmailVerification)
async def email_verification(user_code: SUserCode, user: Users = Depends(get_current_user)):
user_codes = await UserCodesDAO.get_user_codes(
user_id=user.id, description="Код подтверждения почты", code=user_code.user_code
)
if not user_codes or not await UserDAO.change_data(user_id=user.id, role=VERIFICATED_USER):
raise WrongCodeException
return {'email_verification': True}
@router.post("/login", response_model=SUserToken)
async def login_user(response: Response, user_data: SUserLogin):
user = await authenticate_user(user_data.email_or_username, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
return {"access_token": access_token}
@router.post("/logout", status_code=status.HTTP_200_OK)
async def logout_user(response: Response):
response.delete_cookie("black_phoenix_access_token")
@router.get("/me", response_model=SUser)
async def read_users_me(current_user: Users = Depends(get_current_user)):
return current_user
@router.patch("/rename", response_model=SUserName)
async def rename_user(new_username: SUserRename, current_user: Users = Depends(get_current_user)):
await check_verificated_user_with_exc(user_id=current_user.id)
if not verify_password(new_username.password, current_user.hashed_password):
raise IncorrectPasswordException
existing_user = await UserDAO.find_one_or_none(username=new_username.username)
if existing_user:
raise UsernameAlreadyInUseException
new_username = await UserDAO.change_data(current_user.id, username=new_username.username)
return {'username': new_username}
@router.patch("/change_avatar", response_model=SNewAvatar)
async def change_avatar(user_data: SUserAvatar, current_user: Users = Depends(get_current_user)):
await check_verificated_user_with_exc(user_id=current_user.id)
if not verify_password(user_data.password, current_user.hashed_password):
raise IncorrectPasswordException
if await UserDAO.change_data(
current_user.id, avatar_image=user_data.new_avatar_image, avatar_hex=user_data.avatar_hex
) and await UserDAO.add_user_avatar(user_id=current_user.id, avatar=user_data.new_avatar_image):
return {'new_avatar_image': user_data.new_avatar_image, 'avatar_hex': user_data.avatar_hex}
raise SomethingWentWrongException
@router.patch("/change_password", status_code=status.HTTP_200_OK)
async def change_password(new_password: SUserPassword, current_user: Users = Depends(get_current_user)):
await check_verificated_user_with_exc(user_id=current_user.id)
existing_user = await UserDAO.find_one_or_none(id=current_user.id)
if not verify_password(new_password.current_password, existing_user.hashed_password):
raise IncorrectPasswordException
if new_password.new_password != new_password.new_password2:
raise PasswordsМismatchException
hashed_password = get_password_hash(new_password.new_password)
await UserDAO.change_data(current_user.id, hashed_password=hashed_password)
send_password_change_email.delay(current_user.username, current_user.email, MODE=settings.MODE)
@router.patch("/send_recovery_email", status_code=status.HTTP_200_OK, response_model=SRecoverEmailSent)
async def send_recovery_email(email: SUserPasswordRecover):
existing_user = await UserDAO.find_one_or_none(email=email.email)
if not existing_user:
raise UserNotFoundException
result = send_password_recover_email.delay(existing_user.username, existing_user.email, MODE=settings.MODE)
result = result.get()
if await UserCodesDAO.set_user_codes(
user_id=existing_user.user_id, code=result, description="Код восстановления пароля"
) == result:
return {'recover_email_sent': True}
raise SomethingWentWrongException
@router.post("/confirm_password_recovery", status_code=status.HTTP_200_OK, response_model=SConfirmPasswordRecovery)
async def confirm_password_recovery(user_code: SUserCode):
user_codes = await UserCodesDAO.get_user_codes(
description="Код восстановления пароля", code=user_code.user_code
)
if not user_codes:
raise WrongCodeException
return {'user_id': user_codes[0]['user_id']}
@router.post("/password_recovery", status_code=status.HTTP_200_OK, response_model=SPasswordRecovered)
async def password_recovery(passwords: SUserPasswordChange):
if passwords.password1 != passwords.password2:
raise PasswordsМismatchException
hashed_password = get_password_hash(passwords.password1)
username = await UserDAO.change_data(passwords.user_id, hashed_password=hashed_password)
user = await UserDAO.find_one_or_none(username=username, id=passwords.user_id)
if not user:
raise UserNotFoundException
send_password_change_email.delay(user.username, user.email, MODE=settings.MODE)
return {"username": username}
@router.get("/avatars", response_model=list[SUserAvatars])
async def get_user_avatars_history(user: Users = Depends(get_current_user)):
return await UserDAO.get_user_avatars(user_id=user.id)
@router.delete("/avatars", response_model=bool)
async def delete_form_user_avatars_history(avatar_id: int, user: Users = Depends(get_current_user)):
return await UserDAO.delete_user_avatar(avatar_id=avatar_id, user_id=user.id)