75 lines
2.2 KiB
Python
75 lines
2.2 KiB
Python
from fastapi import Depends, Request, Response, WebSocket
|
|
from jose import JWTError, jwt, ExpiredSignatureError
|
|
|
|
from app.config import settings
|
|
from app.exceptions import (
|
|
IncorrectTokenFormatException,
|
|
TokenAbsentException,
|
|
TokenExpiredException,
|
|
UserIsNotPresentException,
|
|
UserMustConfirmEmailException,
|
|
)
|
|
from app.services.user_service import UserService
|
|
from app.unit_of_work import UnitOfWork
|
|
from app.users.auth import create_access_token, VERIFICATED_USER
|
|
from app.users.schemas import SUser
|
|
|
|
|
|
def get_token(request: Request) -> str:
|
|
token = request.cookies.get("black_phoenix_access_token")
|
|
if not token:
|
|
raise TokenAbsentException
|
|
return token
|
|
|
|
|
|
async def get_current_user(response: Response, token: str = Depends(get_token), uow=Depends(UnitOfWork)) -> SUser:
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, settings.ALGORITHM)
|
|
except ExpiredSignatureError:
|
|
raise TokenExpiredException
|
|
except JWTError:
|
|
raise IncorrectTokenFormatException
|
|
|
|
user_id: str = payload.get("sub")
|
|
if not user_id:
|
|
raise UserIsNotPresentException
|
|
|
|
user = await UserService.find_one_or_none(uow=uow, user_id=int(user_id))
|
|
if not user:
|
|
raise UserIsNotPresentException
|
|
|
|
access_token = create_access_token({"sub": str(user.id)})
|
|
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True, secure=True, samesite="none")
|
|
return user
|
|
|
|
|
|
async def check_verificated_user_with_exc(user: SUser = Depends(get_current_user)) -> SUser:
|
|
if not user.role >= VERIFICATED_USER:
|
|
raise UserMustConfirmEmailException
|
|
return user
|
|
|
|
|
|
def get_token_ws(websocket: WebSocket) -> str:
|
|
token = websocket.cookies.get("black_phoenix_access_token")
|
|
if not token:
|
|
raise TokenAbsentException
|
|
return token
|
|
|
|
|
|
async def get_current_user_ws(token: str = Depends(get_token_ws), uow=Depends(UnitOfWork)):
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, settings.ALGORITHM)
|
|
except ExpiredSignatureError:
|
|
raise TokenExpiredException
|
|
except JWTError:
|
|
raise IncorrectTokenFormatException
|
|
|
|
user_id: str = payload.get("sub")
|
|
if not user_id:
|
|
raise UserIsNotPresentException
|
|
|
|
user = await UserService.find_one_or_none(uow=uow, user_id=int(user_id))
|
|
if not user:
|
|
raise UserIsNotPresentException
|
|
|
|
return user
|