208 lines
6.3 KiB
Python
208 lines
6.3 KiB
Python
import json
|
|
|
|
from cryptography.fernet import Fernet
|
|
from fastapi import APIRouter, Response, Depends, status
|
|
|
|
from app.config import settings
|
|
from app.exceptions import (
|
|
PasswordsMismatchException,
|
|
WrongCodeException,
|
|
UserAlreadyExistsException,
|
|
IncorrectAuthDataException,
|
|
)
|
|
from app.services.redis_service import RedisService, get_redis_session
|
|
from app.unit_of_work import UnitOfWork
|
|
from app.users.auth import get_password_hash, create_access_token, VERIFICATED_USER, AuthService, verify_password
|
|
from app.users.dependencies import get_current_user
|
|
from app.users.schemas import (
|
|
SUserLogin,
|
|
SUserRegister,
|
|
SUserResponse,
|
|
SUserToken,
|
|
SEmailVerification,
|
|
SUserAvatars,
|
|
SUsername,
|
|
SEmail,
|
|
SUser,
|
|
SUserChangeData,
|
|
SUserSendConfirmationCode,
|
|
)
|
|
from app.tasks.tasks import (
|
|
send_registration_confirmation_email,
|
|
send_data_change_confirmation_email
|
|
)
|
|
|
|
router = APIRouter(prefix="/users", tags=["Пользователи"])
|
|
|
|
|
|
@router.get(
|
|
"",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=list[SUserResponse],
|
|
)
|
|
async def get_all_users(uow=Depends(UnitOfWork)):
|
|
async with uow:
|
|
users = await uow.user.find_all()
|
|
return users
|
|
|
|
|
|
@router.post(
|
|
"/check_existing_username",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=None,
|
|
)
|
|
async def check_existing_username(username: SUsername, uow=Depends(UnitOfWork)):
|
|
async with uow:
|
|
user = await uow.user.find_one_or_none(username=username.username)
|
|
if user:
|
|
raise UserAlreadyExistsException
|
|
|
|
|
|
@router.post(
|
|
"/check_existing_email",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=None,
|
|
)
|
|
async def check_existing_email(email: SEmail, uow=Depends(UnitOfWork)):
|
|
async with uow:
|
|
user = await uow.user.find_one_or_none(email=email.email)
|
|
if user:
|
|
raise UserAlreadyExistsException
|
|
|
|
|
|
@router.post(
|
|
"/register",
|
|
status_code=status.HTTP_201_CREATED,
|
|
response_model=SUserToken,
|
|
)
|
|
async def register_user(response: Response, user_data: SUserRegister, uow=Depends(UnitOfWork)):
|
|
if user_data.password != user_data.password2:
|
|
raise PasswordsMismatchException
|
|
await AuthService.check_existing_user(uow, user_data)
|
|
|
|
hashed_password = get_password_hash(user_data.password)
|
|
async with uow:
|
|
user_id = await uow.user.add(
|
|
email=user_data.email,
|
|
hashed_password=hashed_password,
|
|
username=user_data.username,
|
|
date_of_birth=user_data.date_of_birth,
|
|
)
|
|
await uow.commit()
|
|
|
|
result = send_registration_confirmation_email.delay(
|
|
user_id=user_id, username=user_data.username, email_to=user_data.email, MODE=settings.MODE
|
|
)
|
|
user_code = result.get()
|
|
redis_session = get_redis_session()
|
|
await RedisService.set_verification_code(redis=redis_session, user_id=user_id, verification_code=user_code)
|
|
user = await AuthService.authenticate_user_by_email(uow, user_data.email, user_data.password)
|
|
access_token = create_access_token({"sub": str(user.id)})
|
|
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True, secure=True)
|
|
return {"access_token": access_token}
|
|
|
|
|
|
@router.get(
|
|
"/email_verification/{user_code}",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=SEmailVerification,
|
|
)
|
|
async def email_verification(user_code: str, uow=Depends(UnitOfWork)):
|
|
invitation_token = user_code.encode()
|
|
cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY)
|
|
user_data = cipher_suite.decrypt(invitation_token).decode("utf-8")
|
|
user_data = json.loads(user_data)
|
|
redis_session = get_redis_session()
|
|
async with uow:
|
|
verification_code = await RedisService.get_verification_code(redis=redis_session, user_id=user_data["user_id"])
|
|
if verification_code != user_data["confirmation_code"]:
|
|
raise WrongCodeException
|
|
|
|
await uow.user.change_data(user_id=user_data["user_id"], role=VERIFICATED_USER)
|
|
await uow.commit()
|
|
return {"email_verification": True}
|
|
|
|
|
|
@router.post(
|
|
"/login",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=SUserToken,
|
|
)
|
|
async def login_user(response: Response, user_data: SUserLogin, uow=Depends(UnitOfWork)):
|
|
user = await AuthService.authenticate_user(uow, user_data.email_or_username, user_data.password)
|
|
access_token = create_access_token({"sub": str(user.id)})
|
|
response.set_cookie("black_phoenix_access_token", access_token, httponly=True)
|
|
return {"access_token": access_token}
|
|
|
|
|
|
@router.post(
|
|
"/logout",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=None,
|
|
)
|
|
async def logout_user(response: Response):
|
|
response.delete_cookie("black_phoenix_access_token")
|
|
|
|
|
|
@router.get(
|
|
"/me",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=SUserResponse,
|
|
)
|
|
async def get_user(current_user: SUser = Depends(get_current_user)):
|
|
return current_user
|
|
|
|
|
|
@router.get(
|
|
"/avatars",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=SUserAvatars,
|
|
)
|
|
async def get_user_avatars_history(user=Depends(get_current_user), uow=Depends(UnitOfWork)):
|
|
async with uow:
|
|
return await uow.user.get_user_avatars(user_id=user.id)
|
|
|
|
|
|
@router.post(
|
|
"/send_confirmation_code",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=None,
|
|
)
|
|
async def send_confirmation_code(user_data: SUserSendConfirmationCode, user: SUser = Depends(get_current_user)):
|
|
redis_session = get_redis_session()
|
|
if verify_password(user_data.current_password, user.hashed_password):
|
|
verification_code = send_data_change_confirmation_email.delay(
|
|
username=user.username, email_to=user_data.email, MODE=settings.MODE
|
|
)
|
|
verification_code = verification_code.get()
|
|
await RedisService.set_verification_code(redis=redis_session, user_id=user.id, verification_code=verification_code)
|
|
raise IncorrectAuthDataException
|
|
|
|
|
|
@router.post(
|
|
"/change_data",
|
|
status_code=status.HTTP_200_OK,
|
|
response_model=None,
|
|
)
|
|
async def change_user_data(user_data: SUserChangeData, user=Depends(get_current_user), uow=Depends(UnitOfWork)):
|
|
redis_session = get_redis_session()
|
|
verification_code = await RedisService.get_verification_code(redis=redis_session, user_id=user.id)
|
|
if verification_code != user_data.verification_code:
|
|
raise WrongCodeException
|
|
if user_data.new_password:
|
|
hashed_password = get_password_hash(user_data.new_password)
|
|
else:
|
|
hashed_password = user.hashed_password
|
|
async with uow:
|
|
await uow.user.change_data(
|
|
user_id=user.id,
|
|
email=user_data.email,
|
|
username=user_data.username,
|
|
avatar_url=user_data.avatar_url,
|
|
hashed_password=hashed_password
|
|
)
|
|
await uow.user.add_user_avatar(user_id=user.id, avatar=user_data.avatar_url)
|
|
await uow.commit()
|
|
await RedisService.delete_verification_code(redis=redis_session, user_id=user.id)
|
|
|
|
|