99 lines
4.3 KiB
Python
99 lines
4.3 KiB
Python
from fastapi import APIRouter, Response, Depends
|
||
from fastapi.responses import RedirectResponse
|
||
from starlette import status
|
||
|
||
from app.exceptions import UserAlreadyExistsException, UsernameAlreadyInUseException, \
|
||
IncorrectPasswordException, PasswordsМismatchException, WrongCodeException
|
||
from app.users.auth import get_password_hash, authenticate_user_by_email, authenticate_user_by_username, \
|
||
create_access_token, verify_password, REGISTRATED_USER, get_user_codes_list, VERIFICATED_USER, authenticate_user, \
|
||
check_existing_user
|
||
from app.users.dao import UserDAO, UserCodesDAO
|
||
from app.users.dependencies import get_current_user
|
||
from app.users.models import Users
|
||
from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserName, SUserPassword
|
||
from app.tasks.tasks import send_registration_confirmation_email
|
||
|
||
router = APIRouter(
|
||
prefix="/users",
|
||
tags=["Пользователи"]
|
||
)
|
||
|
||
|
||
@router.get("/teleport", response_class=RedirectResponse, status_code=status.HTTP_307_TEMPORARY_REDIRECT)
|
||
async def get_teleport():
|
||
return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
|
||
|
||
|
||
@router.get("", response_model=list[SUser])
|
||
async def get_all_users():
|
||
users = await UserDAO.find_all()
|
||
return users
|
||
|
||
|
||
@router.post("/register", response_model=dict[str, str])
|
||
async def register_user(response: Response, user_data: SUserRegister):
|
||
await check_existing_user(user_data)
|
||
hashed_password = get_password_hash(user_data.password)
|
||
user_id = await UserDAO.add(
|
||
email=user_data.email,
|
||
hashed_password=hashed_password,
|
||
username=user_data.username,
|
||
date_of_birth=user_data.date_of_birth,
|
||
)
|
||
|
||
result = send_registration_confirmation_email.delay(username=user_data.username, email_to=user_data.email)
|
||
result = result.get()
|
||
|
||
if await UserCodesDAO.set_user_codes(user_id=user_id, code=result) == result:
|
||
user = await authenticate_user_by_email(user_data.email, user_data.password)
|
||
access_token = create_access_token({"sub": str(user.id)})
|
||
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True)
|
||
return {"access_token": access_token}
|
||
|
||
|
||
@router.post('/email_verification', response_model=bool)
|
||
async def email_verification(user_code: str, user: Users = Depends(get_current_user)):
|
||
user_codes = await UserCodesDAO.get_user_codes(user_id=user.id)
|
||
user_codes = get_user_codes_list(user_codes=user_codes, sort_description="Код подтверждения почты")
|
||
if user_code in user_codes:
|
||
if await UserDAO.change_data(user_id=user.id, role=VERIFICATED_USER):
|
||
return True
|
||
raise WrongCodeException
|
||
|
||
|
||
@router.post("/login", response_model=dict[str, str])
|
||
async def login_user(response: Response, user_data: SUserLogin):
|
||
user = await authenticate_user(user_data.email_or_username, user_data.password)
|
||
access_token = create_access_token({"sub": str(user.id)})
|
||
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
|
||
return {"access_token": access_token}
|
||
|
||
|
||
@router.post("/logout", status_code=status.HTTP_200_OK)
|
||
async def logout_user(response: Response):
|
||
response.delete_cookie("black_phoenix_access_token")
|
||
|
||
|
||
@router.get("/me", response_model=SUser)
|
||
async def read_users_me(current_user: Users = Depends(get_current_user)):
|
||
return current_user
|
||
|
||
|
||
@router.patch("/rename", response_model=str)
|
||
async def rename_user(new_username: SUserName, current_user: Users = Depends(get_current_user)):
|
||
existing_user = await UserDAO.find_one_or_none(username=new_username.username)
|
||
if existing_user:
|
||
raise UsernameAlreadyInUseException
|
||
new_username = await UserDAO.change_data(current_user.id, username=new_username.username)
|
||
return new_username
|
||
|
||
|
||
@router.patch("/change_password", status_code=status.HTTP_200_OK)
|
||
async def change_password(new_password: SUserPassword, current_user: Users = Depends(get_current_user)):
|
||
existing_user = await UserDAO.find_one_or_none(id=current_user.id)
|
||
if not verify_password(new_password.password, existing_user.hashed_password):
|
||
raise IncorrectPasswordException
|
||
if new_password.new_password != new_password.new_password2:
|
||
raise PasswordsМismatchException
|
||
hashed_password = get_password_hash(new_password.new_password)
|
||
await UserDAO.change_data(current_user.id, hashed_password=hashed_password)
|