chat_back/app/services/auth_service.py
2025-03-16 12:04:09 +03:00

85 lines
3.1 KiB
Python

from datetime import UTC, datetime, timedelta
from cryptography.fernet import Fernet, InvalidToken
from jose import jwt
from passlib.context import CryptContext
from app.chat.exceptions import ChatNotFoundException, UserCanNotReadThisChatException
from app.config import settings
from app.users.exceptions import (
IncorrectAuthDataException,
UserMustConfirmEmailException,
UserNotFoundException,
WrongCodeException,
)
from app.users.schemas import SInvitationData, SUser
from app.utils.unit_of_work import UnitOfWork
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# bcrypt не умеет больше 72 байт хешировать. Остальное обрезаем
cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password.encode("utf-8")[:72])
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password.encode("utf-8")[:72], hashed_password)
def create_access_token(data: dict[str, str | datetime]) -> str:
to_encode = data.copy()
expire = datetime.now(UTC) + timedelta(days=30)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, settings.ALGORITHM)
return encoded_jwt
def encode_invitation_token(user_data: SInvitationData) -> str:
invitation_token = cipher_suite.encrypt(user_data.model_dump_json().encode())
return invitation_token.decode()
def decode_invitation_token(invitation_token: str) -> SInvitationData:
try:
user_code = invitation_token.encode()
user_data = cipher_suite.decrypt(user_code)
return SInvitationData.model_validate_json(user_data)
except InvalidToken:
raise WrongCodeException
def encode_confirmation_token(confirmation_code: str) -> str:
invitation_token = cipher_suite.encrypt(confirmation_code.encode())
return invitation_token.decode()
def decode_confirmation_token(invitation_token: str) -> str:
try:
user_code = invitation_token.encode()
confirmation_code = cipher_suite.decrypt(user_code).decode()
return confirmation_code
except InvalidToken:
raise WrongCodeException
def get_confirmation_code(user_code: str) -> str:
return user_code if len(user_code) == 6 else decode_confirmation_token(user_code)
async def authenticate_user(uow: UnitOfWork, email_or_username: str, password: str) -> SUser:
try:
async with uow:
user = await uow.user.find_one_by_username_or_email(username=email_or_username, email=email_or_username)
if not verify_password(password, user.hashed_password):
raise IncorrectAuthDataException
return user
except UserNotFoundException:
raise IncorrectAuthDataException
async def check_verificated_user(uow: UnitOfWork, user_id: int) -> None:
async with uow:
user = await uow.user.find_one(id=user_id)
if user.role < settings.VERIFICATED_USER:
raise UserMustConfirmEmailException
async def validate_user_access_to_chat(uow: UnitOfWork, user_id: int, chat_id: int) -> None:
try:
async with uow:
await uow.chat.find_chat(chat_id=chat_id, user_id=user_id)
except ChatNotFoundException:
raise UserCanNotReadThisChatException