chat_back/app/tasks/tasks.py

88 lines
2.6 KiB
Python

import json
import smtplib
import random
import string
from cryptography.fernet import Fernet
from pydantic import EmailStr
from app.config import settings
from app.tasks.celery import celery
from app.tasks.email_templates import create_registration_confirmation_template, \
create_password_change_confirmation_template, create_password_recover_template
def generate_confirmation_code(length=6):
characters = string.ascii_letters + string.digits
confirmation_code = ''.join(random.choice(characters) for _ in range(length))
return confirmation_code
@celery.task
def send_registration_confirmation_email(
user_id: int,
username: str,
email_to: EmailStr,
MODE: str
):
confirmation_code = generate_confirmation_code()
if MODE == 'TEST':
return confirmation_code
load = {"user_id": user_id, "username": username, "email": email_to, "confirmation_code": confirmation_code}
str_load = json.dumps(load)
cipher_suite = Fernet(settings.INVITATION_LINK_TOKEN_KEY)
invitation_token = cipher_suite.encrypt(str_load.encode())
confirmation_link = settings.INVITATION_LINK_HOST + '/api/users/email_verification/' + invitation_token.decode()
msg_content = create_registration_confirmation_template(
username=username, email_to=email_to, confirmation_link=confirmation_link
)
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.login(settings.SMTP_USER, settings.SMTP_PASS)
server.send_message(msg_content)
return confirmation_code
@celery.task
def send_password_change_email(
username: str,
email_to: EmailStr,
MODE: str
):
if MODE == 'TEST':
return True
msg_content = create_password_change_confirmation_template(
username=username, email_to=email_to
)
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.login(settings.SMTP_USER, settings.SMTP_PASS)
server.send_message(msg_content)
return True
@celery.task
def send_password_recover_email(
username: str,
email_to: EmailStr,
MODE: str
):
confirmation_code = generate_confirmation_code()
if MODE == 'TEST':
return confirmation_code
msg_content = create_password_recover_template(
username=username, email_to=email_to, confirmation_code=confirmation_code
)
with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.login(settings.SMTP_USER, settings.SMTP_PASS)
server.send_message(msg_content)
return confirmation_code