126 lines
3.8 KiB
Python
126 lines
3.8 KiB
Python
from pydantic import HttpUrl
|
|
from sqlalchemy import update, select, insert, func
|
|
from sqlalchemy.exc import MultipleResultsFound, IntegrityError
|
|
|
|
from app.dao.base import BaseDAO
|
|
from app.database import engine # noqa
|
|
from app.exceptions import IncorrectDataException
|
|
from app.users.exceptions import UserAlreadyExistsException
|
|
from app.models.chat import Chat
|
|
from app.models.user_avatar import UserAvatar
|
|
from app.models.users import Users
|
|
from app.models.user_chat import UserChat
|
|
from app.users.schemas import SUser, SUserAvatars, SUsers
|
|
|
|
|
|
class UserDAO(BaseDAO):
|
|
model = Users
|
|
|
|
async def add(self, **data) -> int:
|
|
try:
|
|
stmt = insert(self.model).values(**data).returning(Users.id)
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar()
|
|
except IntegrityError:
|
|
raise UserAlreadyExistsException
|
|
|
|
async def find_one_or_none(self, **filter_by) -> SUser | None:
|
|
try:
|
|
query = select(Users.__table__.columns).filter_by(**filter_by)
|
|
result = await self.session.execute(query)
|
|
result = result.mappings().one_or_none()
|
|
if result:
|
|
return SUser.model_validate(result)
|
|
except MultipleResultsFound:
|
|
raise IncorrectDataException
|
|
|
|
async def find_all(self) -> SUsers:
|
|
query = select(Users.__table__.columns).where(Users.role != 100)
|
|
result = await self.session.execute(query)
|
|
return SUsers.model_validate({"users": result.mappings().all()})
|
|
|
|
async def change_data(self, user_id: int, **data_to_change) -> str:
|
|
query = update(Users).where(Users.id == user_id).values(**data_to_change).returning(Users.username)
|
|
result = await self.session.execute(query)
|
|
return result.scalar()
|
|
|
|
async def get_user_role(self, user_id: int) -> int:
|
|
query = select(Users.role).where(Users.id == user_id)
|
|
result = await self.session.execute(query)
|
|
return result.scalar()
|
|
|
|
async def get_user_allowed_chats(self, user_id: int):
|
|
"""
|
|
WITH chats_with_descriptions AS (
|
|
SELECT *
|
|
FROM usersxchats
|
|
LEFT JOIN chats ON usersxchats.chat_id = chats.id
|
|
),
|
|
|
|
chats_with_avatars as (
|
|
SELECT *
|
|
FROM chats_with_descriptions
|
|
LEFT JOIN users ON chats_with_descriptions.user_id = users.id
|
|
)
|
|
|
|
SELECT chat_id, chat_for, chat_name, avatar_image
|
|
FROM chats_with_avatars
|
|
WHERE user_id = 1
|
|
"""
|
|
chats_with_descriptions = (
|
|
select(UserChat.__table__.columns, Chat.__table__.columns)
|
|
.select_from(UserChat)
|
|
.join(Chat, UserChat.chat_id == Chat.id)
|
|
).cte("chats_with_descriptions")
|
|
|
|
chats_with_avatars = (
|
|
select(
|
|
chats_with_descriptions.c.chat_id,
|
|
chats_with_descriptions.c.chat_for,
|
|
chats_with_descriptions.c.chat_name,
|
|
chats_with_descriptions.c.visibility,
|
|
Users.id,
|
|
Users.avatar_image,
|
|
)
|
|
.select_from(chats_with_descriptions)
|
|
.join(Users, Users.id == chats_with_descriptions.c.user_id)
|
|
.cte("chats_with_avatars")
|
|
)
|
|
query = (
|
|
select(
|
|
chats_with_avatars.c.chat_id,
|
|
chats_with_avatars.c.chat_for,
|
|
chats_with_avatars.c.chat_name,
|
|
chats_with_avatars.c.avatar_image,
|
|
)
|
|
.select_from(chats_with_avatars)
|
|
.where(chats_with_avatars.c.id == user_id, chats_with_avatars.c.visibility == True) # noqa: E712
|
|
)
|
|
|
|
result = await self.session.execute(query)
|
|
result = result.mappings().all()
|
|
return result
|
|
|
|
async def add_user_avatar(self, user_id: int, avatar: HttpUrl) -> bool:
|
|
query = insert(UserAvatar).values(user_id=user_id, avatar_image=avatar)
|
|
await self.session.execute(query)
|
|
await self.session.commit()
|
|
return True
|
|
|
|
async def get_user_avatars(self, user_id: int) -> SUserAvatars:
|
|
query = select(
|
|
func.json_build_object(
|
|
"user_avatars", select(
|
|
func.json_agg(
|
|
func.json_build_object(
|
|
"avatar_url", UserAvatar.avatar_image
|
|
)
|
|
)
|
|
)
|
|
.where(UserAvatar.user_id == user_id)
|
|
.scalar_subquery()
|
|
)
|
|
)
|
|
result = await self.session.execute(query)
|
|
result = result.scalar()
|
|
return SUserAvatars.model_validate(result)
|