90 lines
3.5 KiB
Python
90 lines
3.5 KiB
Python
from fastapi import APIRouter, Response, Depends
|
|
from fastapi.responses import RedirectResponse
|
|
|
|
from app.exceptions import UserAlreadyExistsException, IncorrectAuthDataException, UsernameAlreadyInUseException, \
|
|
PasswordIsTooShortException, IncorrectLengthOfNicknameException
|
|
from app.users.auth import get_password_hash, authenticate_user_by_email, authenticate_user_by_username, \
|
|
create_access_token
|
|
from app.users.dao import UserDAO
|
|
from app.users.dependencies import get_current_user
|
|
from app.users.models import Users
|
|
from app.users.schemas import SUserLogin, SUserRegister
|
|
|
|
router = APIRouter(
|
|
prefix="/users",
|
|
tags=["Пользователи"]
|
|
)
|
|
|
|
|
|
@router.get("/teleport")
|
|
async def get_teleport() -> RedirectResponse:
|
|
return RedirectResponse(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
|
|
|
|
|
|
@router.get("")
|
|
async def get_all_users():
|
|
users = await UserDAO.find_all()
|
|
return users
|
|
|
|
|
|
@router.post("/register")
|
|
async def register_user(response: Response, user_data: SUserRegister):
|
|
existing_user = await UserDAO.find_one_or_none(email=user_data.email)
|
|
if existing_user:
|
|
raise UserAlreadyExistsException
|
|
existing_user = await UserDAO.find_one_or_none(username=user_data.username)
|
|
if existing_user:
|
|
raise UserAlreadyExistsException
|
|
if len(user_data.password) < 8:
|
|
raise PasswordIsTooShortException
|
|
if len(user_data.username) < 2 or len(user_data.username) > 30:
|
|
raise IncorrectLengthOfNicknameException
|
|
hashed_password = get_password_hash(user_data.password)
|
|
await UserDAO.add(email=user_data.email, hashed_password=hashed_password,
|
|
username=user_data.username, date_of_birth=user_data.date_of_birth,
|
|
role=0, black_phoenix=0)
|
|
user = await authenticate_user_by_email(user_data.email, user_data.password)
|
|
access_token = create_access_token({"sub": str(user.id)})
|
|
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
|
|
return {"access_token": access_token}
|
|
|
|
|
|
@router.post("/login")
|
|
async def login_user(response: Response, user_data: SUserLogin):
|
|
user = await authenticate_user_by_email(user_data.email_or_username, user_data.password)
|
|
if not user:
|
|
user = await authenticate_user_by_username(user_data.email_or_username, user_data.password)
|
|
if not user:
|
|
raise IncorrectAuthDataException
|
|
access_token = create_access_token({"sub": str(user.id)})
|
|
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
|
|
return {"access_token": access_token}
|
|
|
|
|
|
@router.post("/logout")
|
|
async def logout_user(response: Response):
|
|
response.delete_cookie("black_phoenix_access_token")
|
|
|
|
|
|
@router.get("/me")
|
|
async def read_users_me(current_user: Users = Depends(get_current_user)):
|
|
return current_user
|
|
|
|
|
|
@router.patch("/rename")
|
|
async def rename_user(new_username, current_user: Users = Depends(get_current_user)):
|
|
if len(new_username) < 2 or len(new_username) > 30:
|
|
raise IncorrectLengthOfNicknameException
|
|
existing_user = await UserDAO.find_one_or_none(username=new_username)
|
|
if existing_user:
|
|
raise UsernameAlreadyInUseException
|
|
new_username = await UserDAO.change_data(current_user.id, username=new_username)
|
|
return new_username
|
|
|
|
|
|
@router.patch("/change_password")
|
|
async def change_password(new_password, current_user: Users = Depends(get_current_user)):
|
|
if len(new_password) < 8:
|
|
raise PasswordIsTooShortException
|
|
hashed_password = get_password_hash(new_password)
|
|
await UserDAO.change_data(current_user.id, hashed_password=hashed_password)
|