import smtplib import random import string from urllib.parse import urljoin from pydantic import EmailStr from app.config import settings from app.tasks.celery import celery from app.tasks.email_templates import ( create_registration_confirmation_template, create_data_change_confirmation_email, create_data_change_email, ) from app.utils.auth import AuthService from app.users.schemas import SConfirmationData confirmation_mail_templates = { "registration": create_registration_confirmation_template, "data_change_confirmation": create_data_change_confirmation_email, } def generate_confirmation_code(length=6) -> str: characters = string.ascii_letters + string.digits confirmation_code = "".join(random.choice(characters) for _ in range(length)) return confirmation_code @celery.task def send_confirmation_email(user_data: dict) -> None: user_data = SConfirmationData.model_validate(user_data) invitation_token = AuthService.encode_confirmation_token(user_data.confirmation_code) confirmation_link = urljoin(settings.INVITATION_LINK_HOST, f"/submit#code={invitation_token}") msg_content = confirmation_mail_templates[user_data.type]( user_data.username, user_data.email_to, confirmation_link, user_data.confirmation_code ) with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server: server.login(settings.SMTP_USER, settings.SMTP_PASS) server.send_message(msg_content) @celery.task def send_data_change_email(username: str, email_to: EmailStr) -> None: msg_content = create_data_change_email(username=username, email_to=email_to) with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server: server.login(settings.SMTP_USER, settings.SMTP_PASS) server.send_message(msg_content)