from sqlalchemy import insert, select, update, and_, delete from app.dao.base import BaseDAO from app.database import async_session_maker, engine # noqa from app.exceptions import UserAlreadyInChatException, UserAlreadyPinnedChatException from app.users.models import Users from app.users.chat.models import Chats, Messages, UsersXChats, PinnedChats, PinnedMessages, Answers class ChatDAO(BaseDAO): model = Chats @staticmethod async def create(user_id: int, chat_name: str, created_by: int) -> int: query = insert(Chats).values(chat_for=user_id, chat_name=chat_name, created_by=created_by).returning(Chats.id) async with async_session_maker() as session: result = await session.execute(query) await session.commit() result = result.scalar() return result @staticmethod async def add_user_to_chat(user_id: int, chat_id: int) -> bool: query = select(UsersXChats.user_id).where(UsersXChats.chat_id == chat_id) async with async_session_maker() as session: result = await session.execute(query) result = result.scalars().all() if user_id in result: raise UserAlreadyInChatException query = insert(UsersXChats).values(user_id=user_id, chat_id=chat_id) await session.execute(query) await session.commit() return True @staticmethod async def send_message(user_id: int, chat_id: int, message: str, image_url: str | None = None) -> list[dict]: inserted_image = ( insert(Messages).values(chat_id=chat_id, user_id=user_id, message=message, image_url=image_url) .returning(Messages.id, Messages.message, Messages.image_url, Messages.chat_id, Messages.user_id, Messages.created_at).cte("inserted_image")) query = (select(inserted_image.c.id, inserted_image.c.message, inserted_image.c.image_url, inserted_image.c.chat_id, inserted_image.c.user_id, inserted_image.c.created_at, Users.avatar_image, Users.username, Users.avatar_hex, Answers.answer_id, Answers.message_id) .select_from(inserted_image) .join(Users, Users.id == inserted_image.c.user_id) .join(Answers, Answers.answer_id == inserted_image.c.id, isouter=True)) async with async_session_maker() as session: result = await session.execute(query) await session.commit() return result.mappings().all() @staticmethod async def get_message_by_id(message_id: int): query = (select(Messages.id, Messages.message, Messages.image_url, Messages.chat_id, Messages.user_id, Messages.created_at, Users.avatar_image, Users.username, Users.avatar_hex, Answers.answer_id, Answers.message_id).select_from(Messages) .join(Users, Users.id == Messages.user_id) .join(Answers, Answers.answer_id == Messages.id, isouter=True) .where( and_( Messages.id == message_id, Messages.visibility == True ) )) async with async_session_maker() as session: result = await session.execute(query) result = result.mappings().all() if result: return result[0] @staticmethod async def delete_message(message_id: int) -> bool: query = update(Messages).where(Messages.id == message_id).values(visibility=False) async with async_session_maker() as session: await session.execute(query) await session.commit() return True @staticmethod async def get_some_messages(chat_id: int, message_number_from: int, messages_to_get: int) -> list[dict]: """ WITH messages_with_users AS ( SELECT * FROM messages LEFT JOIN users ON messages.user_id = users.id ) SELECT message, image_url, chat_id, user_id, created_at, avatar_image FROM messages_with_users WHERE visibility = true AND chat_id = 2 ORDER BY created_at DESC LIMIT 15 OFFSET 0; """ messages_with_users = ( select(Messages.__table__.columns, Users.__table__.columns, Answers.__table__.columns) .select_from(Messages) .join(Users, Messages.user_id == Users.id) .join(Answers, Answers.answer_id == Messages.id, isouter=True)).cte('messages_with_users') messages = (select(messages_with_users.c.id, messages_with_users.c.message, messages_with_users.c.image_url, messages_with_users.c.chat_id, messages_with_users.c.user_id, messages_with_users.c.created_at, messages_with_users.c.avatar_image, messages_with_users.c.username, messages_with_users.c.avatar_hex, messages_with_users.c.answer_id, messages_with_users.c.message_id, ) .where( and_( messages_with_users.c.chat_id == chat_id, messages_with_users.c.visibility == True ) ).order_by(messages_with_users.c.created_at.desc()).limit(messages_to_get).offset(message_number_from)) async with async_session_maker() as session: result = await session.execute(messages) result = result.mappings().all() if result: result = [dict(res) for res in result] return result @staticmethod async def edit_message(message_id: int, new_message: str, new_image_url: str) -> bool: query = update(Messages).where(Messages.id == message_id).values(message=new_message, image_url=new_image_url) async with async_session_maker() as session: await session.execute(query) await session.commit() return True @staticmethod async def add_answer(message_id: int, answer_id: int) -> list[dict]: answer = (insert(Answers).values(message_id=message_id, answer_id=answer_id) .returning(Answers.answer_id, Answers.message_id).cte("answer")) query = (select(Messages.id, Messages.message, Messages.image_url, Messages.chat_id, Messages.user_id, Messages.created_at, Users.avatar_image, Users.username, Users.avatar_hex, answer.c.answer_id, answer.c.message_id) .select_from(Messages) .join(Users, Users.id == Messages.user_id) .join(answer, answer.c.answer_id == Messages.id, isouter=True) .where(Messages.id == answer_id)) async with async_session_maker() as session: result = await session.execute(query) await session.commit() return result.mappings().all() @staticmethod async def delete_chat(chat_id: int) -> bool: query = update(Chats).where(Chats.id == chat_id).values(visibility=False) async with async_session_maker() as session: await session.execute(query) await session.commit() return True @staticmethod async def delete_user(chat_id: int, user_id: int) -> bool: query = delete(UsersXChats).where(and_( UsersXChats.chat_id == chat_id, UsersXChats.user_id == user_id )) async with async_session_maker() as session: await session.execute(query) await session.commit() return True @staticmethod async def pinn_chat(chat_id: int, user_id: int) -> bool: query = select(PinnedChats.chat_id).where(PinnedChats.user_id == user_id) async with async_session_maker() as session: result = await session.execute(query) result = result.scalars().all() if chat_id in result: raise UserAlreadyPinnedChatException query = insert(PinnedChats).values(chat_id=chat_id, user_id=user_id) await session.execute(query) await session.commit() return True @staticmethod async def unpinn_chat(chat_id: int, user_id: int) -> bool: query = delete(PinnedChats).where(PinnedChats.chat_id == chat_id, PinnedChats.user_id == user_id) async with async_session_maker() as session: await session.execute(query) await session.commit() return True @staticmethod async def get_pinned_chats(user_id: int): chats_with_descriptions = (select(UsersXChats.__table__.columns, Chats.__table__.columns) .select_from(UsersXChats) .join(Chats, UsersXChats.chat_id == Chats.id) ).cte('chats_with_descriptions') chats_with_avatars = (select( chats_with_descriptions.c.chat_id, chats_with_descriptions.c.chat_for, chats_with_descriptions.c.chat_name, chats_with_descriptions.c.visibility, Users.id, Users.avatar_image, Users.avatar_hex, ) .select_from(chats_with_descriptions) .join(Users, Users.id == chats_with_descriptions.c.user_id) .cte('chats_with_avatars')) query = (select( chats_with_avatars.c.chat_id, chats_with_avatars.c.chat_for, chats_with_avatars.c.chat_name, chats_with_avatars.c.avatar_image, chats_with_avatars.c.avatar_hex, ) .distinct() .select_from(PinnedChats) .join(chats_with_avatars, PinnedChats.chat_id == chats_with_avatars.c.chat_id) .where( and_( chats_with_avatars.c.id == user_id, chats_with_avatars.c.visibility == True ))) # print(query.compile(engine, compile_kwargs={"literal_binds": True})) # Проверка SQL запроса async with async_session_maker() as session: result = await session.execute(query) result = result.mappings().all() return result @staticmethod async def pinn_message(chat_id: int, message_id: int, user_id: int) -> bool: query = insert(PinnedMessages).values(chat_id=chat_id, message_id=message_id, user_id=user_id) async with async_session_maker() as session: await session.execute(query) await session.commit() return True @staticmethod async def get_message_pinner(chat_id: int, message_id: int) -> bool: query = select(PinnedMessages.user_id).where( PinnedMessages.chat_id == chat_id, PinnedMessages.message_id == message_id ) async with async_session_maker() as session: result = await session.execute(query) result = result.scalar() return result @staticmethod async def unpinn_message(chat_id: int, message_id: int) -> bool: query = delete(PinnedMessages).where( PinnedMessages.chat_id == chat_id, PinnedMessages.message_id == message_id ) async with async_session_maker() as session: await session.execute(query) await session.commit() return True @staticmethod async def get_pinned_messages(chat_id: int) -> list[dict]: query = (select(Messages.id, Messages.message, Messages.image_url, Messages.chat_id, Messages.user_id, Messages.created_at, Users.avatar_image, Users.username).select_from(PinnedMessages) .join(Messages, PinnedMessages.message_id == Messages.id, isouter=True) .join(Users, PinnedMessages.user_id == Users.id, isouter=True) .where(and_(PinnedMessages.chat_id == chat_id, Messages.visibility == True)) .order_by(Messages.created_at.desc())) async with async_session_maker() as session: result = await session.execute(query) result = result.mappings().all() if result: result = [dict(res) for res in result] return result