63 lines
2.1 KiB
Python
63 lines
2.1 KiB
Python
import smtplib
|
|
import random
|
|
import string
|
|
|
|
from pydantic import EmailStr
|
|
|
|
|
|
from app.config import settings
|
|
from app.tasks.celery import celery
|
|
from app.tasks.email_templates import (
|
|
create_registration_confirmation_template,
|
|
create_data_change_confirmation_email,
|
|
create_data_change_email,
|
|
)
|
|
from app.utils.auth import encode_confirmation_token
|
|
from app.users.schemas import SConfirmationData
|
|
|
|
|
|
def generate_confirmation_code(length=6) -> str:
|
|
characters = string.ascii_letters + string.digits
|
|
confirmation_code = "".join(random.choice(characters) for _ in range(length))
|
|
return confirmation_code
|
|
|
|
|
|
@celery.task
|
|
def send_registration_confirmation_email(user_data: dict) -> None:
|
|
user_data = SConfirmationData.model_validate(user_data)
|
|
invitation_token = encode_confirmation_token(user_data.confirmation_code)
|
|
|
|
confirmation_link = settings.INVITATION_LINK_HOST + "/api/users/email_verification/link/" + invitation_token
|
|
|
|
msg_content = create_registration_confirmation_template(
|
|
username=user_data.username,
|
|
email_to=user_data.email_to,
|
|
confirmation_link=confirmation_link,
|
|
confirmation_code=user_data.confirmation_code
|
|
)
|
|
|
|
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
|
|
server.login(settings.SMTP_USER, settings.SMTP_PASS)
|
|
server.send_message(msg_content)
|
|
|
|
|
|
@celery.task
|
|
def send_data_change_confirmation_email(user_data: dict) -> None:
|
|
user_data = SConfirmationData.model_validate(user_data)
|
|
|
|
msg_content = create_data_change_confirmation_email(
|
|
username=user_data.username, email_to=user_data.email_to, confirmation_code=user_data.confirmation_code
|
|
)
|
|
|
|
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
|
|
server.login(settings.SMTP_USER, settings.SMTP_PASS)
|
|
server.send_message(msg_content)
|
|
|
|
|
|
@celery.task
|
|
def send_data_change_email(username: str, email_to: EmailStr) -> None:
|
|
msg_content = create_data_change_email(username=username, email_to=email_to)
|
|
|
|
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
|
|
server.login(settings.SMTP_USER, settings.SMTP_PASS)
|
|
server.send_message(msg_content)
|