69 lines
2.2 KiB
Python
69 lines
2.2 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
from jose import jwt
|
|
from passlib.context import CryptContext
|
|
from pydantic import EmailStr
|
|
|
|
from app.config import settings
|
|
from app.exceptions import UserDontHavePermissionException
|
|
from app.users.dao import UserDAO
|
|
from app.users.models import Users
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
ADMIN_ROLE = 100
|
|
|
|
|
|
def get_password_hash(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
# Функция создания JWT токена
|
|
def create_access_token(data: dict) -> str:
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + timedelta(minutes=30)
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(
|
|
to_encode, settings.SECRET_KEY, settings.ALGORITHM
|
|
)
|
|
return encoded_jwt
|
|
|
|
|
|
# Функция проверки наличия юзера по мейлу
|
|
async def authenticate_user_by_email(email: EmailStr, password: str) -> Users | None:
|
|
user = await UserDAO.find_one_or_none(email=email)
|
|
if not user or not verify_password(password, user.hashed_password):
|
|
return None
|
|
return user
|
|
|
|
|
|
# Функция проверки наличия юзера по нику
|
|
async def authenticate_user_by_username(username: str, password: str) -> Users | None:
|
|
user = await UserDAO.find_one_or_none(username=username)
|
|
if not user or not verify_password(password, user.hashed_password):
|
|
return None
|
|
return user
|
|
|
|
|
|
async def get_user_allowed_chats_id(user_id: int):
|
|
user_allowed_chats = await UserDAO.get_user_allowed_chats(user_id)
|
|
user_allowed_chats_id = (chat['chat_id'] for chat in user_allowed_chats)
|
|
return user_allowed_chats_id
|
|
|
|
|
|
async def validate_user_access_to_chat(user_id: int, chat_id: int):
|
|
user_allowed_chats = await get_user_allowed_chats_id(user_id=user_id)
|
|
if not chat_id in user_allowed_chats:
|
|
raise UserDontHavePermissionException
|
|
return True
|
|
|
|
|
|
async def validate_user_admin(user_id: int):
|
|
user_role = await UserDAO.get_user_role(user_id=user_id)
|
|
if user_role == ADMIN_ROLE:
|
|
return True
|
|
return False
|