127 lines
4.7 KiB
Python
127 lines
4.7 KiB
Python
from datetime import datetime
|
|
|
|
from sqlalchemy import update, select, insert, and_, func, text
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app.dao.base import BaseDAO
|
|
from app.database import async_session_maker, engine
|
|
from app.users.chat.models import UsersXChats, Chats
|
|
from app.users.models import Users, UsersVerificationCodes
|
|
|
|
|
|
class UserDAO(BaseDAO):
|
|
model = Users
|
|
|
|
@staticmethod
|
|
async def change_data(user_id: int, **data_to_change):
|
|
query = update(Users).where(Users.id == user_id).values(**data_to_change)
|
|
async with async_session_maker() as session:
|
|
await session.execute(query)
|
|
await session.commit()
|
|
query = select(Users.username).where(Users.id == user_id)
|
|
result = await session.execute(query)
|
|
return result.scalar()
|
|
|
|
@staticmethod
|
|
async def get_user_role(user_id: int):
|
|
query = select(Users.role).where(Users.id == user_id)
|
|
async with async_session_maker() as session:
|
|
result = await session.execute(query)
|
|
return result.scalar()
|
|
|
|
@staticmethod
|
|
async def get_user_allowed_chats(user_id: int):
|
|
"""
|
|
WITH chats_with_descriptions AS (
|
|
SELECT *
|
|
FROM usersxchats
|
|
LEFT JOIN chats ON usersxchats.chat_id = chats.id
|
|
),
|
|
|
|
chats_with_avatars as (
|
|
SELECT *
|
|
FROM chats_with_descriptions
|
|
LEFT JOIN users ON chats_with_descriptions.user_id = users.id
|
|
)
|
|
|
|
SELECT chat_id, chat_for, chat_name, avatar_image
|
|
FROM chats_with_avatars
|
|
WHERE user_id = 1
|
|
"""
|
|
chats_with_descriptions = (select(UsersXChats.__table__.columns, Chats.__table__.columns)
|
|
.select_from(UsersXChats)
|
|
.join(Chats, UsersXChats.chat_id == Chats.id)
|
|
).cte('chats_with_descriptions')
|
|
|
|
chats_with_avatars = (select(
|
|
chats_with_descriptions.c.chat_id,
|
|
chats_with_descriptions.c.chat_for,
|
|
chats_with_descriptions.c.chat_name,
|
|
chats_with_descriptions.c.visibility,
|
|
Users.id,
|
|
Users.avatar_image,
|
|
)
|
|
.select_from(chats_with_descriptions)
|
|
.join(Users, Users.id == chats_with_descriptions.c.user_id)
|
|
.cte('chats_with_avatars'))
|
|
query = (select(
|
|
chats_with_avatars.c.chat_id,
|
|
chats_with_avatars.c.chat_for,
|
|
chats_with_avatars.c.chat_name,
|
|
chats_with_avatars.c.avatar_image
|
|
)
|
|
.select_from(chats_with_avatars)
|
|
.where(
|
|
and_(
|
|
chats_with_avatars.c.id == user_id,
|
|
chats_with_avatars.c.visibility == True
|
|
)))
|
|
async with async_session_maker() as session:
|
|
result = await session.execute(query)
|
|
result = result.mappings().all()
|
|
return result
|
|
|
|
@staticmethod
|
|
async def get_user_avatar(user_id: int):
|
|
query = select(Users.avatar_image).where(Users.id == user_id)
|
|
async with async_session_maker() as session:
|
|
result = await session.execute(query)
|
|
return result.scalar()
|
|
|
|
|
|
class UserCodesDAO(BaseDAO):
|
|
model = UsersVerificationCodes
|
|
|
|
@classmethod
|
|
async def set_user_codes(cls, user_id: int, code: str, description: str):
|
|
query = (insert(UsersVerificationCodes)
|
|
.values(user_id=user_id, code=code, description=description)
|
|
.returning(cls.model.code))
|
|
async with async_session_maker() as session:
|
|
result = await session.execute(query)
|
|
await session.commit()
|
|
return result.scalar()
|
|
|
|
@staticmethod
|
|
async def get_user_codes(**filter_by) -> list[dict | None]:
|
|
"""
|
|
SELECT
|
|
usersverificationcodes.id,
|
|
usersverificationcodes.user_id,
|
|
usersverificationcodes.code,
|
|
usersverificationcodes.description,
|
|
usersverificationcodes.date_of_creation
|
|
FROM usersverificationcodes
|
|
WHERE
|
|
usersverificationcodes.user_id = 20
|
|
AND now() - usersverificationcodes.date_of_creation < INTERVAL '30 minutes'
|
|
"""
|
|
query = (select(UsersVerificationCodes.__table__.columns)
|
|
.where((func.now() - UsersVerificationCodes.date_of_creation) < text("INTERVAL '30 minutes'"))
|
|
.filter_by(**filter_by))
|
|
|
|
async with async_session_maker() as session:
|
|
# print(query.compile(engine, compile_kwargs={"literal_binds": True})) # Проверка SQL запроса
|
|
result = await session.execute(query)
|
|
result = result.mappings().all()
|
|
return result
|