Добавлена смена пароля и восстановление пароля по коду из почты

This commit is contained in:
urec56 2024-02-24 20:58:26 +03:00
parent 826f6a957a
commit 46a7f90e86
10 changed files with 214 additions and 69 deletions

View file

@ -26,8 +26,8 @@ class IncorrectAuthDataException(BlackPhoenixException):
class IncorrectPasswordException(BlackPhoenixException):
status_code = status.HTTP_401_UNAUTHORIZED
detail = "Введён не верные пароль"
status_code = status.HTTP_409_CONFLICT
detail = "Введён не верный пароль"
class IncorrectTokenFormatException(BlackPhoenixException):
@ -59,11 +59,6 @@ class MessageNotFoundException(BlackPhoenixException):
detail = "Сообщение не найдено"
class IncorrectPasswordException(BlackPhoenixException):
status_code = status.HTTP_409_CONFLICT
detail = "Введён не верный пароль"
class PasswordsМismatchException(BlackPhoenixException):
status_code = status.HTTP_409_CONFLICT
detail = "Пароли не совпадают"

View file

@ -24,3 +24,46 @@ def create_registration_confirmation_template(
subtype="html"
)
return email
def create_password_change_confirmation_template(
username: str,
email_to: EmailStr,
):
email = EmailMessage()
email["Subject"] = "Смена пароля к аккаунту"
email["From"] = settings.SMTP_USER
email["To"] = email_to
email.set_content(
f"""
<h1>{username}, ты менял пароль?</h1>
<h2>Если нет, то пидора ответ</h2>
""",
subtype="html"
)
return email
def create_password_recover_template(
username: str,
email_to: EmailStr,
confirmation_code: str,
):
email = EmailMessage()
email["Subject"] = "Восстановление пароля"
email["From"] = settings.SMTP_USER
email["To"] = email_to
email.set_content(
f"""
<h1>{username}, ты тут хотел восстановить пароль?</h1>
<h2>{confirmation_code}</h2>
""",
subtype="html"
)
return email

View file

@ -8,7 +8,8 @@ from pydantic import EmailStr
from app.config import settings
from app.tasks.celery import celery
from app.tasks.email_templates import create_registration_confirmation_template
from app.tasks.email_templates import create_registration_confirmation_template, \
create_password_change_confirmation_template, create_password_recover_template
def generate_confirmation_code(length=6):
@ -34,3 +35,36 @@ def send_registration_confirmation_email(
return confirmation_code
@celery.task
def send_password_change_email(
username: str,
email_to: EmailStr,
):
msg_content = create_password_change_confirmation_template(
username=username, email_to=email_to
)
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.login(settings.SMTP_USER, settings.SMTP_PASS)
server.send_message(msg_content)
return True
@celery.task
def send_password_recover_email(
username: str,
email_to: EmailStr,
):
confirmation_code = generate_confirmation_code()
msg_content = create_password_recover_template(
username=username, email_to=email_to, confirmation_code=confirmation_code
)
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.login(settings.SMTP_USER, settings.SMTP_PASS)
server.send_message(msg_content)
return confirmation_code

View file

@ -48,6 +48,12 @@ async def test_login_user(email_or_username: str, password: str, status_code: in
assert "black_phoenix_access_token" in response.cookies
async def test_logout_user(ac: AsyncClient):
response = await ac.post("/users/logout")
assert response.status_code == 200
assert "black_phoenix_access_token" not in response.cookies
async def test_get_user(ac: AsyncClient):
await ac.post("/users/login", json={
"email_or_username": "urec@urec.com",
@ -59,7 +65,40 @@ async def test_get_user(ac: AsyncClient):
assert response.json()["black_phoenix"] == False
async def test_logout_user(ac: AsyncClient):
response = await ac.post("/users/logout")
assert response.status_code == 200
assert "black_phoenix_access_token" not in response.cookies
@pytest.mark.parametrize("username,password,statuscode", [
("urec", "12311231", 409),
("neurec", "12311231", 200),
("urec", "adw", 409),
("urec", "12311231", 200),
])
async def test_rename_user(username: str, password: str, statuscode: int, ac: AsyncClient):
await ac.post("/users/login", json={
"email_or_username": "urec@urec.com",
"password": "12311231"
})
response = await ac.patch("/users/rename", json={
"username": username,
"password": password
})
assert response.status_code == statuscode
if response.status_code == 200:
assert response.json() == username
@pytest.mark.parametrize("avatar_url,password,statuscode", [
("https://images.black-phoenix.ru/static/images/avatars/v6BtxTxfCFi2dBAl_avatar.png", "12311231", 200),
("https://images.black-phoenix.ru/static/images/%D1%82%D1%8B%20%D1%83%D0%B6%D0%B5%20%D0%BF%D0%B5%D1%88%D0%BA%D0%B0%20BP.png", "adw", 409),
("https://images.black-phoenix.ru/static/images/%D1%82%D1%8B%20%D1%83%D0%B6%D0%B5%20%D0%BF%D0%B5%D1%88%D0%BA%D0%B0%20BP.png", "12311231", 200),
])
async def test_change_avatar(avatar_url: str, password: str, statuscode: int, ac: AsyncClient):
await ac.post("/users/login", json={
"email_or_username": "urec@urec.com",
"password": "12311231"
})
response = await ac.patch("/users/change_avatar", json={
"new_avatar_image": avatar_url,
"password": password
})
assert response.status_code == statuscode
if response.status_code == 200:
assert response.json() == avatar_url

View file

@ -99,6 +99,4 @@ async def validate_user_admin(user_id: int):
return False
def get_user_codes_list(user_codes: list[dict], sort_description: str) -> list[str]:
user_codes_list = [user_code['code'] for user_code in user_codes if user_code['description'] == sort_description]
return user_codes_list

View file

@ -1,13 +1,7 @@
import json
from typing import Dict, List
from fastapi import WebSocket, Depends, WebSocketDisconnect
from pydantic import parse_obj_as
from fastapi import WebSocket, WebSocketDisconnect
from app.users.chat.dao import ChatDAO
from app.users.auth import validate_user_access_to_chat
from app.users.chat.shemas import SMessageSchema, SMessage
from app.users.models import Users
from app.users.chat.router import router

View file

@ -93,9 +93,9 @@ class UserCodesDAO(BaseDAO):
model = UsersVerificationCodes
@classmethod
async def set_user_codes(cls, user_id: int, code: str):
async def set_user_codes(cls, user_id: int, code: str, description: str):
query = (insert(UsersVerificationCodes)
.values(user_id=user_id, code=code, description="Код подтверждения почты")
.values(user_id=user_id, code=code, description=description)
.returning(cls.model.code))
async with async_session_maker() as session:
result = await session.execute(query)
@ -103,7 +103,7 @@ class UserCodesDAO(BaseDAO):
return result.scalar()
@classmethod
async def get_user_codes(cls, user_id: int) -> list[dict[str, str | int | datetime]]:
async def get_user_codes(cls, **filter_by) -> list[dict | None]:
"""
SELECT
usersverificationcodes.id,
@ -116,11 +116,10 @@ class UserCodesDAO(BaseDAO):
usersverificationcodes.user_id = 20
AND now() - usersverificationcodes.date_of_creation < INTERVAL '30 minutes'
"""
query = select(UsersVerificationCodes.__table__.columns).where(
and_(
UsersVerificationCodes.user_id == user_id,
(func.now() - UsersVerificationCodes.date_of_creation) < text("INTERVAL '30 minutes'")
))
query = (select(UsersVerificationCodes.__table__.columns)
.where((func.now() - UsersVerificationCodes.date_of_creation) < text("INTERVAL '30 minutes'"))
.filter_by(**filter_by))
async with async_session_maker() as session:
# print(query.compile(engine, compile_kwargs={"literal_binds": True})) # Проверка SQL запроса
result = await session.execute(query)

View file

@ -17,7 +17,8 @@ class Users(Base):
role: Mapped[int] = mapped_column(server_default=f'0')
black_phoenix: Mapped[bool] = mapped_column(server_default='false')
avatar_image: Mapped[Optional[str]] = mapped_column(
server_default='https://images.black-phoenix.ru/static/images/%D1%82%D1%8B%20%D1%83%D0%B6%D0%B5%20%D0%BF%D0%B5%D1%88%D0%BA%D0%B0%20BP.png')
server_default='https://images.black-phoenix.ru/static/images/%D1%82%D1%8B%20%D1%83%D0%B6%D0%B5%20%D0%BF%D0%B5%D1%88%D0%BA%D0%B0%20BP.png'
)
date_of_birth: Mapped[date]
date_of_registration: Mapped[date] = mapped_column(server_default=func.now())

View file

@ -1,21 +1,20 @@
from typing import Annotated
from fastapi import APIRouter, Response, Depends, UploadFile, File, Body, Form
from fastapi import APIRouter, Response, Depends
from fastapi.responses import RedirectResponse
from sqlalchemy.sql.functions import current_user
from starlette import status
from app.config import settings
from app.exceptions import UserAlreadyExistsException, UsernameAlreadyInUseException, \
from app.exceptions import UsernameAlreadyInUseException, \
IncorrectPasswordException, PasswordsМismatchException, WrongCodeException, UserNotFoundException
from app.images.router import upload_file, upload_file_returning_str
from app.users.auth import get_password_hash, authenticate_user_by_email, \
create_access_token, verify_password, REGISTRATED_USER, get_user_codes_list, VERIFICATED_USER, authenticate_user, \
create_access_token, verify_password, VERIFICATED_USER, authenticate_user, \
check_existing_user
from app.users.dao import UserDAO, UserCodesDAO
from app.users.dependencies import get_current_user
from app.users.models import Users
from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserPassword, SUserRename, SUserAvatar
from app.tasks.tasks import send_registration_confirmation_email
from app.users.schemas import SUserLogin, SUserRegister, SUser, SUserPassword, SUserRename, SUserAvatar, \
SUserPasswordRecover, SUserCode, SUserPasswordChange
from app.tasks.tasks import send_registration_confirmation_email, send_password_change_email, \
send_password_recover_email
router = APIRouter(
prefix="/users",
@ -48,21 +47,23 @@ async def register_user(response: Response, user_data: SUserRegister):
result = send_registration_confirmation_email.delay(username=user_data.username, email_to=user_data.email)
result = result.get()
if await UserCodesDAO.set_user_codes(user_id=user_id, code=result) == result:
if await UserCodesDAO.set_user_codes(
user_id=user_id, code=result, description="Код подтверждения почты"
) == result:
user = await authenticate_user_by_email(user_data.email, user_data.password)
access_token = create_access_token({"sub": str(user.id)})
response.set_cookie(key="black_phoenix_access_token", value=access_token, httponly=True)
return {"access_token": access_token}
@router.post('/email_verification', response_model=bool)
async def email_verification(user_code: str, user: Users = Depends(get_current_user)):
user_codes = await UserCodesDAO.get_user_codes(user_id=user.id)
user_codes = get_user_codes_list(user_codes=user_codes, sort_description="Код подтверждения почты")
if user_code in user_codes:
if await UserDAO.change_data(user_id=user.id, role=VERIFICATED_USER):
return True
@router.post('/email_verification', status_code=status.HTTP_200_OK, response_model=bool)
async def email_verification(user_code: SUserCode, user: Users = Depends(get_current_user)):
user_codes = await UserCodesDAO.get_user_codes(
user_id=user.id, description="Код подтверждения почты", code=user_code.user_code
)
if not user_codes or not await UserDAO.change_data(user_id=user.id, role=VERIFICATED_USER):
raise WrongCodeException
return True
@router.post("/login", response_model=dict[str, str])
@ -95,17 +96,12 @@ async def rename_user(new_username: SUserRename, current_user: Users = Depends(g
@router.patch("/change_avatar", response_model=str)
async def change_avatar(
new_avatar: Annotated[UploadFile, File()],
password: Annotated[SUserAvatar, Body()],
current_user: Users = Depends(get_current_user)
):
if not verify_password(password.password, current_user.hashed_password):
async def change_avatar(user_data: SUserAvatar, current_user: Users = Depends(get_current_user)):
if not verify_password(user_data.password, current_user.hashed_password):
raise IncorrectPasswordException
new_avatar_url = await upload_file_returning_str(new_avatar, "upload_avatar")
if await UserDAO.change_data(current_user.id, avatar_image=new_avatar_url):
return new_avatar_url
raise UserNotFoundException # Надо подумать
if await UserDAO.change_data(current_user.id, avatar_image=user_data.new_avatar_image):
return user_data.new_avatar_image
raise UserNotFoundException
@router.patch("/change_password", status_code=status.HTTP_200_OK)
@ -117,3 +113,41 @@ async def change_password(new_password: SUserPassword, current_user: Users = Dep
raise PasswordsМismatchException
hashed_password = get_password_hash(new_password.new_password)
await UserDAO.change_data(current_user.id, hashed_password=hashed_password)
send_password_change_email.delay(current_user.username, current_user.email)
@router.patch("/send_recovery_email", status_code=status.HTTP_200_OK)
async def send_recovery_email(email: SUserPasswordRecover):
existing_user = await UserDAO.find_one_or_none(email=email.email)
if not existing_user:
raise UserNotFoundException
result = send_password_recover_email.delay(existing_user.username, existing_user.email)
result = result.get()
if await UserCodesDAO.set_user_codes(
user_id=existing_user.user_id, code=result, description="Код восстановления пароля"
) == result:
return True
@router.post("/confirm_password_recovery", status_code=status.HTTP_200_OK, response_model=int)
async def confirm_password_recovery(user_code: SUserCode):
user_codes = await UserCodesDAO.get_user_codes(
description="Код восстановления пароля", code=user_code.user_code
)
if not user_codes:
raise WrongCodeException
return user_codes[0]['user_id']
@router.post("/password_recovery", status_code=status.HTTP_200_OK, response_model=str)
async def password_recovery(passwords: SUserPasswordChange):
if passwords.password1 != passwords.password2:
raise PasswordsМismatchException
hashed_password = get_password_hash(passwords.password1)
username = await UserDAO.change_data(passwords.user_id, hashed_password=hashed_password)
user = await UserDAO.find_one_or_none(username=username, id=passwords.user_id)
if not user:
raise UserNotFoundException
send_password_change_email.delay(user.username, user.email)
return username

View file

@ -49,16 +49,24 @@ class SUserRename(BaseModel):
class SUserAvatar(BaseModel):
password: str
@model_validator(mode="before")
@classmethod
def auto_loads_json_string(cls, data):
if isinstance(data, str) and data.startswith("{"):
data = json.loads(data)
return data
new_avatar_image: str
class SUserPassword(BaseModel):
password: str = Query(None, min_length=8)
new_password: str = Query(None, min_length=8)
new_password2: str = Query(None, min_length=8)
class SUserPasswordRecover(BaseModel):
email: EmailStr
class SUserCode(BaseModel):
user_code: str
class SUserPasswordChange(BaseModel):
user_id: int
password1: str
password2: str