chat_back/chat_test/app/users/router.py

99 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Response, Depends
from fastapi.responses import RedirectResponse
from starlette import status
from app.exceptions import UserAlreadyExistsException, IncorrectAuthDataException, UsernameAlreadyInUseException, \
IncorrectPasswordException, PasswordsМismatchException
from app.users.auth import get_password_hash, authenticate_user_by_email, authenticate_user_by_username, \
create_access_token, verify_password
from app.users.dao import UserDAO, UserCodesDAO
from app.users.dependencies import get_current_user
from app.users.models import Users
from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserName, SUserPassword
from app.tasks.tasks import send_registration_confirmation_email
router = APIRouter(
prefix="/users",
tags=["Пользователи"]
)
@router.get("/teleport", response_class=RedirectResponse, status_code=status.HTTP_307_TEMPORARY_REDIRECT)
async def get_teleport():
return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
@router.get("", response_model=list[SUser])
async def get_all_users():
users = await UserDAO.find_all()
return users
@router.post("/register", response_model=dict[str, str])
async def register_user(response: Response, user_data: SUserRegister):
existing_user = await UserDAO.find_one_or_none(email=user_data.email)
if existing_user:
raise UserAlreadyExistsException
existing_user = await UserDAO.find_one_or_none(username=user_data.username)
if existing_user:
raise UserAlreadyExistsException
hashed_password = get_password_hash(user_data.password)
user_id = await UserDAO.add(
email=user_data.email,
hashed_password=hashed_password,
username=user_data.username,
date_of_birth=user_data.date_of_birth,
role=0,
black_phoenix=False
)
result = send_registration_confirmation_email.delay(username=user_data.username, email_to=user_data.email)
result = result.get()
if await UserCodesDAO.set_user_codes(user_id=user_id, code=result) == result:
user = await authenticate_user_by_email(user_data.email, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True)
return {"access_token": access_token}
@router.post("/login", response_model=dict[str, str])
async def login_user(response: Response, user_data: SUserLogin):
user = await authenticate_user_by_email(user_data.email_or_username, user_data.password)
if not user:
user = await authenticate_user_by_username(user_data.email_or_username, user_data.password)
if not user:
raise IncorrectAuthDataException
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
return {"access_token": access_token}
@router.post("/logout", status_code=status.HTTP_200_OK)
async def logout_user(response: Response):
response.delete_cookie("black_phoenix_access_token")
@router.get("/me", response_model=SUser)
async def read_users_me(current_user: Users = Depends(get_current_user)):
return current_user
@router.patch("/rename", response_model=str)
async def rename_user(new_username: SUserName, current_user: Users = Depends(get_current_user)):
existing_user = await UserDAO.find_one_or_none(username=new_username.username)
if existing_user:
raise UsernameAlreadyInUseException
new_username = await UserDAO.change_data(current_user.id, username=new_username.username)
return new_username
@router.patch("/change_password", status_code=status.HTTP_200_OK)
async def change_password(new_password: SUserPassword, current_user: Users = Depends(get_current_user)):
existing_user = await UserDAO.find_one_or_none(id=current_user.id)
if not verify_password(new_password.password, existing_user.hashed_password):
raise IncorrectPasswordException
if new_password.new_password != new_password.new_password2:
raise PasswordsМismatchException
hashed_password = get_password_hash(new_password.new_password)
await UserDAO.change_data(current_user.id, hashed_password=hashed_password)