chat_back/app/dao/user.py
2024-06-12 17:35:16 +05:00

117 lines
3.5 KiB
Python

from pydantic import HttpUrl
from sqlalchemy import update, select, insert, func
from sqlalchemy.exc import MultipleResultsFound, IntegrityError, NoResultFound
from app.chat.shemas import SAllowedChats
from app.dao.base import BaseDAO
from app.database import engine # noqa
from app.exceptions import IncorrectDataException, UserNotFoundException
from app.users.exceptions import UserAlreadyExistsException
from app.models.chat import Chat
from app.models.user_avatar import UserAvatar
from app.models.users import Users
from app.models.user_chat import UserChat
from app.users.schemas import SUser, SUserAvatars, SUsers
class UserDAO(BaseDAO):
model = Users
@staticmethod
def check_query_compile(query):
print(query.compile(engine, compile_kwargs={"literal_binds": True})) # Проверка SQL запроса
async def add(self, **data) -> SUser:
try:
stmt = insert(Users).values(**data).returning(Users.__table__.columns)
result = await self.session.execute(stmt)
result = result.mappings().one()
return SUser.model_validate(result)
except IntegrityError:
raise UserAlreadyExistsException
async def find_one(self, **filter_by) -> SUser:
try:
query = select(Users.__table__.columns).filter_by(**filter_by)
result = await self.session.execute(query)
result = result.mappings().one()
return SUser.model_validate(result)
except MultipleResultsFound:
raise IncorrectDataException
except NoResultFound:
raise UserNotFoundException
async def find_all(self) -> SUsers:
query = (
select(
func.json_build_object(
"users", func.json_agg(
func.json_build_object(
"id", Users.id,
"username", Users.username,
"email", Users.email,
"black_phoenix", Users.black_phoenix,
"avatar_image", Users.avatar_image,
"date_of_birth", Users.date_of_birth,
"date_of_registration", Users.date_of_registration,
)
)
)
)
.select_from(Users)
.where(Users.role != 100)
)
result = await self.session.execute(query)
result = result.scalar_one()
return SUsers.model_validate(result)
async def change_data(self, user_id: int, **data_to_change) -> None:
stmt = update(Users).where(Users.id == user_id).values(**data_to_change)
await self.session.execute(stmt)
async def get_user_allowed_chats(self, user_id: int) -> SAllowedChats:
query = (
select(
func.json_build_object(
"allowed_chats", func.json_agg(
func.json_build_object(
"chat_id", Chat.id,
"chat_for", Chat.chat_for,
"chat_name", Chat.chat_name,
"avatar_image", Users.avatar_image,
)
)
)
)
.select_from(UserChat)
.join(Chat, Chat.id == UserChat.chat_id)
.join(Users, Users.id == UserChat.user_id)
.where(UserChat.user_id == user_id, Chat.visibility == True) # noqa: E712
)
result = await self.session.execute(query)
result = result.scalar_one()
return SAllowedChats.model_validate(result)
async def add_user_avatar(self, user_id: int, avatar: HttpUrl) -> None:
stmt = insert(UserAvatar).values(user_id=user_id, avatar_image=avatar)
await self.session.execute(stmt)
async def get_user_avatars(self, user_id: int) -> SUserAvatars:
query = select(
func.json_build_object(
"user_avatars", select(
func.json_agg(
func.json_build_object(
"avatar_url", UserAvatar.avatar_image
)
)
)
.where(UserAvatar.user_id == user_id)
.scalar_subquery()
)
)
result = await self.session.execute(query)
result = result.scalar()
return SUserAvatars.model_validate(result)