chat_back/app/users/auth.py

114 lines
3.8 KiB
Python

from datetime import datetime, timedelta, UTC
from cryptography.fernet import Fernet
from jose import jwt
from passlib.context import CryptContext
from pydantic import EmailStr
from app.config import settings
from app.exceptions import (
UserDontHavePermissionException,
IncorrectAuthDataException,
UserNotFoundException,
UserMustConfirmEmailException,
)
from app.unit_of_work import UnitOfWork
from app.users.schemas import SUser, SInvitationData
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ADMIN_USER = 100
ADMIN_USER_ID = 3
REGISTRATED_USER = 0
VERIFICATED_USER = 1
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict[str, str | datetime]) -> str:
to_encode = data.copy()
expire = datetime.now(UTC) + timedelta(days=30)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, settings.ALGORITHM)
return encoded_jwt
def encode_invitation_token(user_data: SInvitationData) -> str:
cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY)
invitation_token = cipher_suite.encrypt(user_data.model_dump_json().encode())
return invitation_token.decode()
def decode_invitation_token(invitation_token: str) -> SInvitationData:
user_code = invitation_token.encode()
cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY)
user_data = cipher_suite.decrypt(user_code)
return SInvitationData.model_validate_json(user_data)
class AuthService:
@staticmethod
async def authenticate_user_by_email(uow: UnitOfWork, email: EmailStr, password: str) -> SUser | None:
async with uow:
user = await uow.user.find_one_or_none(email=email)
if not user or not verify_password(password, user.hashed_password):
return None
return user
@staticmethod
async def authenticate_user_by_username(uow: UnitOfWork, username: str, password: str) -> SUser | None:
async with uow:
user = await uow.user.find_one_or_none(username=username)
if not user or not verify_password(password, user.hashed_password):
return None
return user
@classmethod
async def authenticate_user(cls, uow: UnitOfWork, email_or_username: str, password: str) -> SUser:
user = await cls.authenticate_user_by_email(uow, email_or_username, password)
if not user:
user = await cls.authenticate_user_by_username(uow, email_or_username, password)
if not user:
raise IncorrectAuthDataException
return user
@staticmethod
async def check_verificated_user(uow: UnitOfWork, user_id: int) -> bool:
async with uow:
user = await uow.user.find_one_or_none(id=user_id)
if not user:
raise UserNotFoundException
return user.role >= VERIFICATED_USER
@classmethod
async def check_verificated_user_with_exc(cls, uow: UnitOfWork, user_id: int):
if not await cls.check_verificated_user(uow=uow, user_id=user_id):
raise UserMustConfirmEmailException
@staticmethod
async def get_user_allowed_chats_id(uow: UnitOfWork, user_id: int) -> list[int]:
async with uow:
user_allowed_chats = await uow.user.get_user_allowed_chats(user_id)
user_allowed_chats_id = [chat["chat_id"] for chat in user_allowed_chats]
return user_allowed_chats_id
@classmethod
async def validate_user_access_to_chat(cls, uow: UnitOfWork, user_id: int, chat_id: int) -> bool:
user_allowed_chats = await cls.get_user_allowed_chats_id(uow=uow, user_id=user_id)
if chat_id not in user_allowed_chats:
raise UserDontHavePermissionException
return True
@staticmethod
async def validate_user_admin(uow: UnitOfWork, user_id: int) -> bool:
async with uow:
user_role = await uow.user.get_user_role(user_id=user_id)
if user_role == ADMIN_USER:
return True
return False