chat_back/app/users/router.py
2024-06-01 19:59:52 +05:00

207 lines
6.1 KiB
Python

import json
from cryptography.fernet import Fernet
from fastapi import APIRouter, Response, Depends, status
from app.config import settings
from app.exceptions import (
PasswordsMismatchException,
WrongCodeException,
UserNotFoundException,
SomethingWentWrongException,
UserAlreadyExistsException,
)
from app.users.auth import get_password_hash, create_access_token, VERIFICATED_USER, AuthService
from app.users.dao import UserDAO, UserCodesDAO
from app.users.dependencies import get_current_user
from app.users.models import Users
from app.users.schemas import (
SUserLogin,
SUserRegister,
SUserResponse,
SUserPasswordRecover,
SUserCode,
SUserPasswordChange,
SRecoverEmailSent,
SUserToken,
SEmailVerification,
SConfirmPasswordRecovery,
SPasswordRecovered,
SUserAvatars,
SUsername,
SEmail,
)
from app.tasks.tasks import send_registration_confirmation_email, send_password_change_email, \
send_password_recover_email
router = APIRouter(prefix="/users", tags=["Пользователи"])
@router.get(
"",
status_code=status.HTTP_200_OK,
response_model=list[SUserResponse]
)
async def get_all_users():
users = await UserDAO.find_all()
return users
@router.post(
"/check_existing_username",
status_code=status.HTTP_200_OK,
response_model=None
)
async def check_existing_username(username: SUsername):
user = await UserDAO.find_one_or_none(username=username.username)
if user:
raise UserAlreadyExistsException
@router.post(
"/check_existing_email",
status_code=status.HTTP_200_OK,
response_model=None
)
async def check_existing_email(email: SEmail):
user = await UserDAO.find_one_or_none(email=email.email)
if user:
raise UserAlreadyExistsException
@router.post(
"/register",
status_code=status.HTTP_201_CREATED,
response_model=SUserToken,
)
async def register_user(response: Response, user_data: SUserRegister):
if user_data.password != user_data.password2:
raise PasswordsMismatchException
await AuthService.check_existing_user(user_data)
hashed_password = get_password_hash(user_data.password)
user_id = await UserDAO.add(
email=user_data.email,
hashed_password=hashed_password,
username=user_data.username,
date_of_birth=user_data.date_of_birth,
)
result = send_registration_confirmation_email.delay(
user_id=user_id, username=user_data.username, email_to=user_data.email, MODE=settings.MODE
)
result = result.get()
if await UserCodesDAO.set_user_codes(user_id=user_id, code=result, description="Код подтверждения почты") == result:
user = await AuthService.authenticate_user_by_email(user_data.email, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True, secure=True)
return {"access_token": access_token}
@router.get(
"/email_verification/{user_code}",
status_code=status.HTTP_200_OK,
response_model=SEmailVerification
)
async def email_verification(user_code: str):
invitation_token = user_code.encode()
cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY)
user_data = cipher_suite.decrypt(invitation_token).decode("utf-8")
user_data = json.loads(user_data)
user_codes = await UserCodesDAO.get_user_codes(
user_id=user_data["user_id"], description="Код подтверждения почты", code=user_data["confirmation_code"]
)
if not user_codes or not await UserDAO.change_data(user_id=user_data["user_id"], role=VERIFICATED_USER):
raise WrongCodeException
return {"email_verification": True}
@router.post(
"/login",
status_code=status.HTTP_200_OK,
response_model=SUserToken
)
async def login_user(response: Response, user_data: SUserLogin):
user = await AuthService.authenticate_user(user_data.email_or_username, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
return {"access_token": access_token}
@router.post(
"/logout",
status_code=status.HTTP_200_OK,
response_model=None
)
async def logout_user(response: Response):
response.delete_cookie("black_phoenix_access_token")
@router.get(
"/me",
status_code=status.HTTP_200_OK,
response_model=SUserResponse
)
async def get_user(current_user: Users = Depends(get_current_user)):
return current_user
@router.patch(
"/send_recovery_email",
status_code=status.HTTP_200_OK,
response_model=SRecoverEmailSent
)
async def send_recovery_email(email: SUserPasswordRecover):
existing_user = await UserDAO.find_one_or_none(email=email.email)
if not existing_user:
raise UserNotFoundException
result = send_password_recover_email.delay(existing_user.username, existing_user.email, MODE=settings.MODE)
result = result.get()
if (
await UserCodesDAO.set_user_codes(
user_id=existing_user.user_id, code=result, description="Код восстановления пароля"
)
== result
):
return {"recover_email_sent": True}
raise SomethingWentWrongException
@router.post(
"/confirm_password_recovery",
status_code=status.HTTP_200_OK,
response_model=SConfirmPasswordRecovery
)
async def confirm_password_recovery(user_code: SUserCode):
user_codes = await UserCodesDAO.get_user_codes(description="Код восстановления пароля", code=user_code.user_code)
if not user_codes:
raise WrongCodeException
return {"user_id": user_codes[0]["user_id"]}
@router.post(
"/password_recovery",
status_code=status.HTTP_200_OK,
response_model=SPasswordRecovered
)
async def password_recovery(passwords: SUserPasswordChange):
if passwords.password1 != passwords.password2:
raise PasswordsMismatchException
hashed_password = get_password_hash(passwords.password1)
username = await UserDAO.change_data(passwords.user_id, hashed_password=hashed_password)
user = await UserDAO.find_one_or_none(username=username, id=passwords.user_id)
if not user:
raise UserNotFoundException
send_password_change_email.delay(user.username, user.email, MODE=settings.MODE)
return {"username": username}
@router.get(
"/avatars",
status_code=status.HTTP_200_OK,
response_model=list[SUserAvatars]
)
async def get_user_avatars_history(user: Users = Depends(get_current_user)):
return await UserDAO.get_user_avatars(user_id=user.id)