chat_back/app/users/router.py

235 lines
6.9 KiB
Python

import asyncio
from random import randrange
from fastapi import APIRouter, Depends, status
from app.config import settings
from app.exceptions import UserNotFoundException, PasswordAlreadyInUseException
from app.users.exceptions import (
UserAlreadyExistsException,
PasswordsMismatchException,
WrongCodeException,
)
from app.services.redis_service import RedisService, get_redis_session
from app.unit_of_work import UnitOfWork
from app.utils.auth import (
get_password_hash,
create_access_token,
AuthService,
decode_confirmation_token
)
from app.users.dependencies import get_current_user
from app.users.schemas import (
SUserLogin,
SUserRegister,
SUserResponse,
SEmailVerification,
SUserAvatars,
SUserFilter,
SUser,
SUserChangeData,
SUserSendConfirmationCode,
STokenLogin,
SUsers,
SConfirmationData,
SUserPassword,
)
from app.tasks.tasks import (
send_registration_confirmation_email,
send_data_change_confirmation_email,
generate_confirmation_code,
send_data_change_email
)
router = APIRouter(prefix="/users", tags=["Пользователи"])
@router.get(
"",
status_code=status.HTTP_200_OK,
response_model=SUsers,
)
async def get_all_users(uow=Depends(UnitOfWork)):
async with uow:
users = await uow.user.find_all()
return users
@router.post(
"/check_existing_user",
status_code=status.HTTP_200_OK,
response_model=None,
)
async def check_existing_user(user_filter: SUserFilter, uow=Depends(UnitOfWork)):
async with uow:
try:
await uow.user.find_one(**user_filter.model_dump(exclude_none=True))
raise UserAlreadyExistsException
except UserNotFoundException:
pass
@router.post(
"/check_existing_password",
status_code=status.HTTP_200_OK,
response_model=None
)
async def check_existing_password(user_password: SUserPassword):
random_int = randrange(10)
if random_int > 5:
raise PasswordAlreadyInUseException
@router.post(
"/register",
status_code=status.HTTP_201_CREATED,
response_model=STokenLogin,
)
async def register_user(user_data: SUserRegister, uow=Depends(UnitOfWork)):
if user_data.password != user_data.password2:
raise PasswordsMismatchException
hashed_password = get_password_hash(user_data.password)
async with uow:
user = await uow.user.add(
email=user_data.email,
hashed_password=hashed_password,
username=user_data.username,
date_of_birth=user_data.date_of_birth,
)
await uow.user.add_user_avatar(user_id=user.id, avatar=str(user.avatar_image))
await uow.commit()
user_code = generate_confirmation_code()
user_mail_data = SConfirmationData.model_validate(
{"user_id": user.id, "username": user_data.username, "email_to": user_data.email, "confirmation_code": user_code}
)
send_registration_confirmation_email.delay(user_mail_data.model_dump())
redis_session = get_redis_session()
await RedisService.set_verification_code(redis=redis_session, user_id=user.id, verification_code=user_code)
access_token = create_access_token({"sub": str(user.id)})
return {"authorization": f"Bearer {access_token}"}
@router.post(
"/resend_email_verification",
status_code=status.HTTP_200_OK,
)
async def resend_email_verification(user: SUser = Depends(get_current_user)):
user_code = generate_confirmation_code()
user_mail_data = SConfirmationData.model_validate(
{
"user_id": user.id,
"username": user.username,
"email_to": user.email,
"confirmation_code": user_code
}
)
send_registration_confirmation_email.delay(user_mail_data.model_dump())
redis_session = get_redis_session()
await RedisService.delete_verification_code(redis=redis_session, user_id=user.id)
await RedisService.set_verification_code(redis=redis_session, user_id=user.id, verification_code=user_code)
@router.get(
"/email_verification/{user_code}",
status_code=status.HTTP_200_OK,
response_model=SEmailVerification,
)
async def email_verification(user_code: str, uow=Depends(UnitOfWork)):
user_data = decode_confirmation_token(user_code)
redis_session = get_redis_session()
async with uow:
verification_code = await RedisService.get_verification_code(redis=redis_session, user_id=user_data.user_id)
if verification_code != user_data.confirmation_code:
raise WrongCodeException
await uow.user.change_data(user_id=user_data.user_id, role=settings.VERIFICATED_USER)
await uow.commit()
return {"email_verification": True}
@router.post(
"/login",
status_code=status.HTTP_200_OK,
response_model=STokenLogin,
)
async def login_user(user_data: SUserLogin, uow=Depends(UnitOfWork)):
user = await AuthService.authenticate_user(uow, user_data.email_or_username, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
return {"authorization": f"Bearer {access_token}"}
@router.get(
"/me",
status_code=status.HTTP_200_OK,
response_model=SUserResponse,
)
async def get_user(current_user: SUser = Depends(get_current_user)):
await asyncio.sleep(10)
return current_user
@router.get(
"/avatars",
status_code=status.HTTP_200_OK,
response_model=SUserAvatars,
)
async def get_user_avatars_history(user=Depends(get_current_user), uow=Depends(UnitOfWork)):
async with uow:
user_avatars = await uow.user.get_user_avatars(user_id=user.id)
return user_avatars
@router.post(
"/send_confirmation_code",
status_code=status.HTTP_200_OK,
response_model=None,
)
async def send_confirmation_code(user_data: SUserSendConfirmationCode, user: SUser = Depends(get_current_user)):
redis_session = get_redis_session()
user_code = generate_confirmation_code()
user_mail_data = SConfirmationData.model_validate(
{"user_id": user.id, "username": user.username, "email_to": user_data.email, "confirmation_code": user_code}
)
send_data_change_confirmation_email.delay(user_mail_data.model_dump())
await RedisService.set_verification_code(redis=redis_session, user_id=user.id, verification_code=user_code)
@router.post(
"/change_data",
status_code=status.HTTP_200_OK,
response_model=None,
)
async def change_user_data(user_data: SUserChangeData, user=Depends(get_current_user), uow=Depends(UnitOfWork)):
redis_session = get_redis_session()
verification_code = await RedisService.get_verification_code(redis=redis_session, user_id=user.id)
if verification_code != user_data.verification_code:
raise WrongCodeException
if user_data.new_password:
hashed_password = get_password_hash(user_data.new_password)
else:
hashed_password = user.hashed_password
async with uow:
if not user_data.avatar_url:
await uow.user.change_data(
user_id=user.id,
email=user_data.email,
username=user_data.username,
hashed_password=hashed_password
)
else:
await uow.user.change_data(
user_id=user.id,
email=user_data.email,
username=user_data.username,
avatar_image=str(user_data.avatar_url),
hashed_password=hashed_password
)
await uow.user.add_user_avatar(user_id=user.id, avatar=str(user_data.avatar_url))
await uow.commit()
send_data_change_email.delay(user_data.username, user_data.email)
await RedisService.delete_verification_code(redis=redis_session, user_id=user.id)