123 lines
4.6 KiB
Python
123 lines
4.6 KiB
Python
from datetime import datetime, UTC, timedelta
|
|
|
|
from app.chat.shemas import SMessage, SSendMessage, SPinnedMessages, SMessageList, SMessageRaw, SMessageRawList
|
|
from app.services.user_service import UserService
|
|
from app.users.schemas import SUser
|
|
from app.utils.unit_of_work import UnitOfWork
|
|
|
|
|
|
class MessageService:
|
|
users: dict[int, tuple[SUser, datetime]] = {}
|
|
|
|
@classmethod
|
|
async def _get_cached_user(cls, uow: UnitOfWork, user_id: int) -> SUser:
|
|
if user_id in cls.users and cls.users[user_id][1] > datetime.now(UTC):
|
|
return cls.users[user_id][0]
|
|
|
|
user = await UserService.find_user(uow=uow, id=user_id)
|
|
cls.users[user_id] = user, datetime.now(UTC) + timedelta(minutes=5)
|
|
return user
|
|
|
|
@classmethod
|
|
async def add_avatar_image_and_username_to_message(cls, uow: UnitOfWork, message: SMessageRaw) -> SMessage:
|
|
user = await cls._get_cached_user(uow=uow, user_id=message.user_id)
|
|
message = message.model_dump()
|
|
message["avatar_image"] = str(user.avatar_image)
|
|
message["username"] = user.username
|
|
return SMessage.model_validate(message)
|
|
|
|
@classmethod
|
|
async def add_avatar_image_and_username_to_message_list(
|
|
cls, uow: UnitOfWork, messages: SMessageRawList
|
|
) -> SMessageList:
|
|
return SMessageList.model_validate(
|
|
{
|
|
"messages": [
|
|
await cls.add_avatar_image_and_username_to_message(uow=uow, message=message)
|
|
for message in messages.message_raw_list
|
|
] if messages.message_raw_list else None
|
|
}
|
|
)
|
|
|
|
@classmethod
|
|
async def send_message(
|
|
cls, uow: UnitOfWork, user_id: int, chat_id: int, message: SSendMessage, image_url: str | None = None
|
|
) -> SMessage:
|
|
async with uow:
|
|
if message.answer:
|
|
answer_message = await uow.message.get_message_by_id(message_id=message.answer)
|
|
message_id = await uow.message.send_message(
|
|
user_id=user_id,
|
|
chat_id=chat_id,
|
|
message=message.message,
|
|
image_url=image_url,
|
|
answer_id=answer_message.id,
|
|
answer_message=answer_message.message,
|
|
answer_image_url=answer_message.answer_image_url,
|
|
)
|
|
else:
|
|
message_id = await uow.message.send_message(
|
|
user_id=user_id,
|
|
chat_id=chat_id,
|
|
message=message.message,
|
|
image_url=image_url,
|
|
)
|
|
raw_message = await uow.message.get_message_by_id(message_id=message_id)
|
|
new_message = await cls.add_avatar_image_and_username_to_message(uow=uow, message=raw_message)
|
|
|
|
return new_message
|
|
|
|
@staticmethod
|
|
async def delete_message(uow: UnitOfWork, message_id: int) -> None:
|
|
async with uow:
|
|
await uow.message.delete_message(message_id=message_id)
|
|
|
|
@staticmethod
|
|
async def edit_message(uow: UnitOfWork, message_id: int, new_message: str, new_image_url: str) -> None:
|
|
async with uow:
|
|
await uow.message.edit_message(
|
|
message_id=message_id, new_message=new_message, new_image_url=new_image_url
|
|
)
|
|
|
|
@classmethod
|
|
async def pin_message(cls, uow: UnitOfWork, chat_id: int, user_id: int, message_id: int) -> SMessage:
|
|
async with uow:
|
|
await uow.chat.pin_message(chat_id=chat_id, message_id=message_id, user_id=user_id)
|
|
await uow.commit()
|
|
|
|
raw_message = await uow.message.get_message_by_id(message_id=message_id)
|
|
pinned_message = await cls.add_avatar_image_and_username_to_message(uow=uow, message=raw_message)
|
|
|
|
return pinned_message
|
|
|
|
@staticmethod
|
|
async def unpin_message(uow: UnitOfWork, chat_id: int, message_id: int) -> None:
|
|
async with uow:
|
|
await uow.chat.unpin_message(chat_id=chat_id, message_id=message_id)
|
|
await uow.commit()
|
|
|
|
@classmethod
|
|
async def get_message_by_id(cls, uow: UnitOfWork, message_id: int) -> SMessage:
|
|
async with uow:
|
|
raw_message = await uow.message.get_message_by_id(message_id=message_id)
|
|
message = await cls.add_avatar_image_and_username_to_message(uow=uow, message=raw_message)
|
|
return message
|
|
|
|
@classmethod
|
|
async def get_pinned_messages(cls, uow: UnitOfWork, chat_id: int) -> SPinnedMessages:
|
|
async with uow:
|
|
pinned_messages_ids = await uow.chat.get_pinned_messages_ids(chat_id=chat_id)
|
|
raw_messages = await uow.message.get_messages_from_ids(messages_ids=pinned_messages_ids)
|
|
pinned_messages = await cls.add_avatar_image_and_username_to_message_list(uow=uow, messages=raw_messages)
|
|
return SPinnedMessages.model_validate({"pinned_messages": pinned_messages.messages})
|
|
|
|
@classmethod
|
|
async def get_some_messages(
|
|
cls, uow: UnitOfWork, chat_id: int, message_number_from: int, messages_to_get: int
|
|
) -> SMessageList:
|
|
async with uow:
|
|
messages = await uow.message.get_some_messages(
|
|
chat_id=chat_id, message_number_from=message_number_from, messages_to_get=messages_to_get
|
|
)
|
|
messages = await cls.add_avatar_image_and_username_to_message_list(uow=uow, messages=messages)
|
|
return messages
|