chat_back/app/users/auth.py

104 lines
3.4 KiB
Python

from datetime import datetime, timedelta
from jose import jwt
from passlib.context import CryptContext
from pydantic import EmailStr
from app.config import settings
from app.exceptions import UserDontHavePermissionException, IncorrectAuthDataException, UserAlreadyExistsException, \
UserNotFoundException, UserMustConfirmEmailException
from app.users.dao import UserDAO
from app.users.models import Users
from app.users.schemas import SUserRegister
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ADMIN_USER = 100
ADMIN_USER_ID = 3
REGISTRATED_USER = 0
VERIFICATED_USER = 1
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# Функция создания JWT токена
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=180)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, settings.ALGORITHM
)
return encoded_jwt
# Функция проверки наличия юзера по мейлу
async def authenticate_user_by_email(email: EmailStr, password: str) -> Users | None:
user = await UserDAO.find_one_or_none(email=email)
if not user or not verify_password(password, user.hashed_password):
return None
return user
# Функция проверки наличия юзера по нику
async def authenticate_user_by_username(username: str, password: str) -> Users | None:
user = await UserDAO.find_one_or_none(username=username)
if not user or not verify_password(password, user.hashed_password):
return None
return user
async def authenticate_user(email_or_username: str, password: str) -> Users:
user = await authenticate_user_by_email(email_or_username, password)
if not user:
user = await authenticate_user_by_username(email_or_username, password)
if not user:
raise IncorrectAuthDataException
return user
async def check_existing_user(user_data: SUserRegister) -> None:
existing_user = await UserDAO.find_one_or_none(email=user_data.email)
if existing_user:
raise UserAlreadyExistsException
existing_user = await UserDAO.find_one_or_none(username=user_data.username)
if existing_user:
raise UserAlreadyExistsException
async def check_verificated_user(user_id: int) -> bool:
user = await UserDAO.find_one_or_none(id=user_id)
if not user:
raise UserNotFoundException
return user.role >= VERIFICATED_USER
async def check_verificated_user_with_exc(user_id: int) -> bool:
if not await check_verificated_user(user_id=user_id):
raise UserMustConfirmEmailException
async def get_user_allowed_chats_id(user_id: int):
user_allowed_chats = await UserDAO.get_user_allowed_chats(user_id)
user_allowed_chats_id = (chat['chat_id'] for chat in user_allowed_chats)
return user_allowed_chats_id
async def validate_user_access_to_chat(user_id: int, chat_id: int):
user_allowed_chats = await get_user_allowed_chats_id(user_id=user_id)
if not chat_id in user_allowed_chats:
raise UserDontHavePermissionException
return True
async def validate_user_admin(user_id: int):
user_role = await UserDAO.get_user_role(user_id=user_id)
if user_role == ADMIN_USER:
return True
return False