from sqlalchemy import update, select, insert, and_, func, text, delete from app.dao.base import BaseDAO from app.database import async_session_maker, engine # noqa from app.models.chat import Chats from app.models.user_avatar import UserAvatar from app.models.users import Users from app.models.user_verification_code import UserVerificationCode from app.models.user_chat import UserChat from app.users.schemas import SUser, SUserAvatars class UserDAO(BaseDAO): model = Users async def find_one_or_none(self, **filter_by) -> SUser | None: async with async_session_maker() as session: query = select(Users).filter_by(**filter_by) result = await session.execute(query) result = result.scalar_one_or_none() if result: return SUser.model_validate(result, from_attributes=True) async def find_all(self, **filter_by): async with async_session_maker() as session: query = select(Users.__table__.columns).filter_by(**filter_by).where(Users.role != 100) result = await session.execute(query) return result.mappings().all() async def change_data(self, user_id: int, **data_to_change) -> str: query = update(Users).where(Users.id == user_id).values(**data_to_change) async with async_session_maker() as session: await session.execute(query) await session.commit() query = select(Users.username).where(Users.id == user_id) result = await session.execute(query) return result.scalar() async def get_user_role(self, user_id: int) -> int: query = select(Users.role).where(Users.id == user_id) async with async_session_maker() as session: result = await session.execute(query) return result.scalar() async def get_user_allowed_chats(self, user_id: int): """ WITH chats_with_descriptions AS ( SELECT * FROM usersxchats LEFT JOIN chats ON usersxchats.chat_id = chats.id ), chats_with_avatars as ( SELECT * FROM chats_with_descriptions LEFT JOIN users ON chats_with_descriptions.user_id = users.id ) SELECT chat_id, chat_for, chat_name, avatar_image FROM chats_with_avatars WHERE user_id = 1 """ chats_with_descriptions = ( select(UserChat.__table__.columns, Chats.__table__.columns) .select_from(UserChat) .join(Chats, UserChat.chat_id == Chats.id) ).cte("chats_with_descriptions") chats_with_avatars = ( select( chats_with_descriptions.c.chat_id, chats_with_descriptions.c.chat_for, chats_with_descriptions.c.chat_name, chats_with_descriptions.c.visibility, Users.id, Users.avatar_image, Users.avatar_hex, ) .select_from(chats_with_descriptions) .join(Users, Users.id == chats_with_descriptions.c.user_id) .cte("chats_with_avatars") ) query = ( select( chats_with_avatars.c.chat_id, chats_with_avatars.c.chat_for, chats_with_avatars.c.chat_name, chats_with_avatars.c.avatar_image, chats_with_avatars.c.avatar_hex, ) .select_from(chats_with_avatars) .where(and_(chats_with_avatars.c.id == user_id, chats_with_avatars.c.visibility == True)) # noqa: E712 ) async with async_session_maker() as session: result = await session.execute(query) result = result.mappings().all() return result async def get_user_avatar(self, user_id: int) -> str: query = select(Users.avatar_image).where(Users.id == user_id) async with async_session_maker() as session: result = await session.execute(query) return result.scalar() async def add_user_avatar(self, user_id: int, avatar: str) -> bool: query = insert(UserAvatar).values(user_id=user_id, avatar_image=avatar) async with async_session_maker() as session: await session.execute(query) await session.commit() return True async def get_user_avatars(self, user_id: int) -> SUserAvatars: query = select( func.json_build_object( "user_avatars", select( func.json_agg( func.json_build_object( "avatar_url", UserAvatar.avatar_image ) ) ) .where(UserAvatar.user_id == user_id) .scalar_subquery() ) ) async with async_session_maker() as session: result = await session.execute(query) result = result.scalar() return SUserAvatars.model_validate(result) async def delete_user_avatar(self, avatar_id: int, user_id: int) -> bool: query = delete(UserAvatar).where(and_(UserAvatar.id == avatar_id, UserAvatar.user_id == user_id)) async with async_session_maker() as session: await session.execute(query) await session.commit() return True class UserCodesDAO(BaseDAO): model = UserVerificationCode async def set_user_codes(self, cls, user_id: int, code: str, description: str): query = ( insert(UserVerificationCode) .values(user_id=user_id, code=code, description=description) .returning(cls.model.code) ) async with async_session_maker() as session: result = await session.execute(query) await session.commit() return result.scalar() async def get_user_codes(self, **filter_by) -> list[dict | None]: """ SELECT usersverificationcodes.id, usersverificationcodes.user_id, usersverificationcodes.code, usersverificationcodes.description, usersverificationcodes.date_of_creation FROM usersverificationcodes WHERE usersverificationcodes.user_id = 20 AND now() - usersverificationcodes.date_of_creation < INTERVAL '30 minutes' """ query = ( select(UserVerificationCode.__table__.columns) .where((func.now() - UserVerificationCode.date_of_creation) < text("INTERVAL '10 minutes'")) .filter_by(**filter_by) ) async with async_session_maker() as session: # print(query.compile(engine, compile_kwargs={"literal_binds": True})) # Проверка SQL запроса result = await session.execute(query) result = result.mappings().all() return result