From 52662c66d8cca59d55c712b4a8a0736e7f309287 Mon Sep 17 00:00:00 2001 From: urec56 Date: Tue, 16 Jul 2024 19:11:11 +0400 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B5=D0=B4=D0=B5=D0=BB?= =?UTF-8?q?=D0=B0=D0=BB=20=D0=B0=D1=83=D1=84=D1=81=D0=B5=D1=80=D0=B2=D0=B8?= =?UTF-8?q?=D1=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/chat_service.py | 6 +- app/services/user_service.py | 6 +- app/tasks/tasks.py | 4 +- app/users/router.py | 15 ++--- app/utils/auth.py | 105 ++++++++++++++++++----------------- 5 files changed, 68 insertions(+), 68 deletions(-) diff --git a/app/services/chat_service.py b/app/services/chat_service.py index ed16520..686a6bb 100644 --- a/app/services/chat_service.py +++ b/app/services/chat_service.py @@ -6,7 +6,7 @@ from app.config import settings from app.services.user_service import UserService from app.utils.unit_of_work import UnitOfWork from app.users.schemas import SInvitationData -from app.utils.auth import encode_invitation_token, decode_invitation_token +from app.utils.auth import AuthService class ChatService: @@ -53,13 +53,13 @@ class ChatService: @staticmethod def create_invitation_link(chat_id: int) -> str: invitation_data = SInvitationData.model_validate({"chat_id": chat_id}) - invitation_token = encode_invitation_token(invitation_data) + invitation_token = AuthService.encode_invitation_token(invitation_data) invitation_link = urljoin(settings.INVITATION_LINK_HOST, f"/submit#code={invitation_token}") return invitation_link @classmethod async def invite_to_chat(cls, uow: UnitOfWork, user_id: int, invitation_token: str) -> None: - invitation_data = decode_invitation_token(invitation_token) + invitation_data = AuthService.decode_invitation_token(invitation_token) chat = await cls.find_chat(uow=uow, chat_id=invitation_data.chat_id, user_id=user_id) if user_id == chat.chat_for: raise UserCanNotReadThisChatException diff --git a/app/services/user_service.py b/app/services/user_service.py index 7cab0c0..2269121 100644 --- a/app/services/user_service.py +++ b/app/services/user_service.py @@ -4,7 +4,7 @@ from app.tasks.tasks import generate_confirmation_code, send_confirmation_email, from app.utils.unit_of_work import UnitOfWork from app.users.exceptions import PasswordsMismatchException, WrongCodeException from app.users.schemas import SUser, SUsers, SUserRegister, SConfirmationData, SUserAvatars, SUserChangeData -from app.utils.auth import get_password_hash +from app.utils.auth import AuthService class UserService: @@ -25,7 +25,7 @@ class UserService: if user_data.password != user_data.password2: raise PasswordsMismatchException - hashed_password = get_password_hash(user_data.password) + hashed_password = AuthService.get_password_hash(user_data.password) async with uow: user = await uow.user.add( email=user_data.email, @@ -78,7 +78,7 @@ class UserService: verification_code = await RedisService.get_verification_code(redis=redis_session, user_id=user.id) if verification_code != user_data.verification_code: raise WrongCodeException - hashed_password = get_password_hash(user_data.new_password) if user_data.new_password else user.hashed_password + hashed_password = AuthService.get_password_hash(user_data.new_password) if user_data.new_password else user.hashed_password async with uow: await uow.user.change_data( user_id=user.id, diff --git a/app/tasks/tasks.py b/app/tasks/tasks.py index 14482af..dfb21a6 100644 --- a/app/tasks/tasks.py +++ b/app/tasks/tasks.py @@ -13,7 +13,7 @@ from app.tasks.email_templates import ( create_data_change_confirmation_email, create_data_change_email, ) -from app.utils.auth import encode_confirmation_token +from app.utils.auth import AuthService from app.users.schemas import SConfirmationData @@ -32,7 +32,7 @@ def generate_confirmation_code(length=6) -> str: @celery.task def send_confirmation_email(user_data: dict) -> None: user_data = SConfirmationData.model_validate(user_data) - invitation_token = encode_confirmation_token(user_data.confirmation_code) + invitation_token = AuthService.encode_confirmation_token(user_data.confirmation_code) confirmation_link = urljoin(settings.INVITATION_LINK_HOST, f"/submit#code={invitation_token}") diff --git a/app/users/router.py b/app/users/router.py index 402e50c..a8b5287 100644 --- a/app/users/router.py +++ b/app/users/router.py @@ -5,11 +5,7 @@ from fastapi import APIRouter, Depends, status from app.services.user_service import UserService from app.users.exceptions import UserAlreadyExistsException, PasswordAlreadyInUseException, UserNotFoundException from app.utils.unit_of_work import UnitOfWork -from app.utils.auth import ( - create_access_token, - AuthService, - get_confirmation_code -) +from app.utils.auth import AuthService from app.users.dependencies import get_current_user from app.users.schemas import ( SUserLogin, @@ -24,7 +20,8 @@ from app.users.schemas import ( SUsers, SUserPassword, SGetUsersFilter, - SUserCode, Responses, + SUserCode, + Responses, ) router = APIRouter(prefix="/users", tags=["Пользователи"]) @@ -109,7 +106,7 @@ async def register_user(user_data: SUserRegister, uow=Depends(UnitOfWork)): user = await UserService.add_user(uow=uow, user_data=user_data) await UserService.send_confirmation_email(user=user, mail_type="registration") - access_token = create_access_token({"sub": str(user.id)}) + access_token = AuthService.create_access_token({"sub": str(user.id)}) return {"authorization": f"Bearer {access_token}"} @@ -172,7 +169,7 @@ async def resend_email_verification(user: SUser = Depends(get_current_user)): }, ) async def email_verification(user_code: SUserCode, user: SUser = Depends(get_current_user), uow=Depends(UnitOfWork)): - confirmation_code = get_confirmation_code(user_code.user_code) + confirmation_code = AuthService.get_confirmation_code(user_code.user_code) await UserService.verificate_user(uow=uow, user=user, confirmation_code=confirmation_code) @@ -199,7 +196,7 @@ async def login_user(user_data: SUserLogin, uow=Depends(UnitOfWork)): user = await AuthService.authenticate_user( uow=uow, email_or_username=user_data.email_or_username, password=user_data.password ) - access_token = create_access_token({"sub": str(user.id)}) + access_token = AuthService.create_access_token({"sub": str(user.id)}) return {"authorization": f"Bearer {access_token}"} diff --git a/app/utils/auth.py b/app/utils/auth.py index be16c47..029574a 100644 --- a/app/utils/auth.py +++ b/app/utils/auth.py @@ -5,8 +5,12 @@ from jose import jwt from passlib.context import CryptContext from app.config import settings -from app.users.exceptions import IncorrectAuthDataException, UserNotFoundException, UserMustConfirmEmailException, \ - WrongCodeException +from app.users.exceptions import ( + IncorrectAuthDataException, + UserNotFoundException, + UserMustConfirmEmailException, + WrongCodeException, +) from app.chat.exceptions import ChatNotFoundException, UserCanNotReadThisChatException from app.utils.unit_of_work import UnitOfWork from app.users.schemas import SUser, SInvitationData @@ -16,62 +20,61 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY) -def get_password_hash(password: str) -> str: - return pwd_context.hash(password) - - -def verify_password(plain_password: str, hashed_password: str) -> bool: - return pwd_context.verify(plain_password, hashed_password) - - -def create_access_token(data: dict[str, str | datetime]) -> str: - to_encode = data.copy() - expire = datetime.now(UTC) + timedelta(days=30) - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, settings.ALGORITHM) - return encoded_jwt - - -def encode_invitation_token(user_data: SInvitationData) -> str: - invitation_token = cipher_suite.encrypt(user_data.model_dump_json().encode()) - return invitation_token.decode() - - -def decode_invitation_token(invitation_token: str) -> SInvitationData: - try: - user_code = invitation_token.encode() - user_data = cipher_suite.decrypt(user_code) - return SInvitationData.model_validate_json(user_data) - except InvalidToken: - raise WrongCodeException - - -def encode_confirmation_token(confirmation_code: str) -> str: - invitation_token = cipher_suite.encrypt(confirmation_code.encode()) - return invitation_token.decode() - - -def decode_confirmation_token(invitation_token: str) -> str: - try: - user_code = invitation_token.encode() - confirmation_code = cipher_suite.decrypt(user_code).decode() - return confirmation_code - except InvalidToken: - raise WrongCodeException - - -def get_confirmation_code(user_code: str) -> str: - return user_code if len(user_code) == 6 else decode_confirmation_token(user_code) - - class AuthService: + @staticmethod + def get_password_hash(password: str) -> str: + return pwd_context.hash(password) + + @staticmethod + def verify_password(plain_password: str, hashed_password: str) -> bool: + return pwd_context.verify(plain_password, hashed_password) + + @staticmethod + def create_access_token(data: dict[str, str | datetime]) -> str: + to_encode = data.copy() + expire = datetime.now(UTC) + timedelta(days=30) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, settings.ALGORITHM) + return encoded_jwt + + @staticmethod + def encode_invitation_token(user_data: SInvitationData) -> str: + invitation_token = cipher_suite.encrypt(user_data.model_dump_json().encode()) + return invitation_token.decode() + + @staticmethod + def decode_invitation_token(invitation_token: str) -> SInvitationData: + try: + user_code = invitation_token.encode() + user_data = cipher_suite.decrypt(user_code) + return SInvitationData.model_validate_json(user_data) + except InvalidToken: + raise WrongCodeException + + @staticmethod + def encode_confirmation_token(confirmation_code: str) -> str: + invitation_token = cipher_suite.encrypt(confirmation_code.encode()) + return invitation_token.decode() + + @staticmethod + def decode_confirmation_token(invitation_token: str) -> str: + try: + user_code = invitation_token.encode() + confirmation_code = cipher_suite.decrypt(user_code).decode() + return confirmation_code + except InvalidToken: + raise WrongCodeException + + @classmethod + def get_confirmation_code(cls, user_code: str) -> str: + return user_code if len(user_code) == 6 else cls.decode_confirmation_token(user_code) @classmethod async def authenticate_user(cls, uow: UnitOfWork, email_or_username: str, password: str) -> SUser: try: async with uow: user = await uow.user.find_one_by_username_or_email(username=email_or_username, email=email_or_username) - if not verify_password(password, user.hashed_password): + if not cls.verify_password(password, user.hashed_password): raise IncorrectAuthDataException return user except UserNotFoundException: