57 lines
1.7 KiB
Python
57 lines
1.7 KiB
Python
import smtplib
|
|
import random
|
|
import string
|
|
from urllib.parse import urljoin
|
|
|
|
from pydantic import EmailStr
|
|
|
|
|
|
from app.config import settings
|
|
from app.tasks.celery import celery
|
|
from app.tasks.email_templates import (
|
|
create_registration_confirmation_template,
|
|
create_data_change_confirmation_email,
|
|
create_data_change_email,
|
|
)
|
|
from app.services.auth_service import AuthService
|
|
from app.users.schemas import SConfirmationData
|
|
|
|
|
|
confirmation_mail_templates = {
|
|
"registration": create_registration_confirmation_template,
|
|
"data_change_confirmation": create_data_change_confirmation_email,
|
|
}
|
|
|
|
|
|
def generate_confirmation_code(length=6) -> str:
|
|
characters = string.ascii_letters + string.digits
|
|
confirmation_code = "".join(random.choice(characters) for _ in range(length))
|
|
return confirmation_code
|
|
|
|
|
|
@celery.task
|
|
def send_confirmation_email(user_data: dict) -> None:
|
|
user_data = SConfirmationData.model_validate(user_data)
|
|
invitation_token = AuthService.encode_confirmation_token(user_data.confirmation_code)
|
|
|
|
confirmation_link = urljoin(settings.INVITATION_LINK_HOST, f"/submitEmail#code={invitation_token}")
|
|
|
|
msg_content = confirmation_mail_templates[user_data.type](
|
|
user_data.username,
|
|
user_data.email_to,
|
|
confirmation_link,
|
|
user_data.confirmation_code
|
|
)
|
|
|
|
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
|
|
server.login(settings.SMTP_USER, settings.SMTP_PASS)
|
|
server.send_message(msg_content)
|
|
|
|
|
|
@celery.task
|
|
def send_data_change_email(username: str, email_to: EmailStr) -> None:
|
|
msg_content = create_data_change_email(username=username, email_to=email_to)
|
|
|
|
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
|
|
server.login(settings.SMTP_USER, settings.SMTP_PASS)
|
|
server.send_message(msg_content)
|