chat_back/app/dao/user.py

128 lines
3.9 KiB
Python

from pydantic import EmailStr
from sqlalchemy import update, select, insert, func, or_
from sqlalchemy.exc import MultipleResultsFound, IntegrityError, NoResultFound
from app.chat.shemas import SAllowedChats
from app.dao.base import BaseDAO
from app.exceptions import IncorrectDataException
from app.users.exceptions import UserAlreadyExistsException, UserNotFoundException
from app.models.chat import Chat
from app.models.user_avatar import UserAvatar
from app.models.users import Users
from app.models.user_chat import UserChat
from app.users.schemas import SUser, SUserAvatars, SUsers
class UserDAO(BaseDAO):
model = Users
async def add(self, **data) -> SUser:
try:
stmt = insert(Users).values(**data).returning(Users.__table__.columns)
result = await self.session.execute(stmt)
result = result.mappings().one()
return SUser.model_validate(result)
except IntegrityError:
raise UserAlreadyExistsException
async def find_one(self, **filter_by) -> SUser:
try:
query = select(Users.__table__.columns).filter_by(**filter_by)
result = await self.session.execute(query)
result = result.mappings().one()
return SUser.model_validate(result)
except MultipleResultsFound:
raise IncorrectDataException
except NoResultFound:
raise UserNotFoundException
async def find_one_by_username_or_email(self, username: str, email: EmailStr) -> SUser:
try:
query = select(Users.__table__.columns).where(
or_(Users.username == username, Users.email == email)
)
result = await self.session.execute(query)
result = result.mappings().one()
return SUser.model_validate(result)
except MultipleResultsFound:
raise IncorrectDataException
except NoResultFound:
raise UserNotFoundException
async def find_all(self, username: str) -> SUsers:
query = (
select(
func.json_build_object(
"users", func.json_agg(
func.json_build_object(
"id", Users.id,
"username", Users.username,
"email", Users.email,
"black_phoenix", Users.black_phoenix,
"avatar_image", Users.avatar_image,
"date_of_birth", Users.date_of_birth,
"date_of_registration", Users.date_of_registration,
)
)
)
)
.select_from(Users)
.filter(Users.role != 100, Users.username.ilike(username))
)
result = await self.session.execute(query)
result = result.scalar_one()
return SUsers.model_validate(result)
async def change_data(self, user_id: int, **data_to_change) -> None:
try:
stmt = update(Users).where(Users.id == user_id).values(**data_to_change)
await self.session.execute(stmt)
except IntegrityError:
raise UserAlreadyExistsException
async def get_user_allowed_chats(self, user_id: int) -> SAllowedChats:
query = (
select(
func.json_build_object(
"allowed_chats", func.json_agg(
func.json_build_object(
"chat_id", Chat.id,
"chat_for", Chat.chat_for,
"chat_name", Chat.chat_name,
"created_by", Chat.created_by,
"avatar_image", Chat.avatar_image,
)
)
)
)
.select_from(UserChat)
.join(Chat, Chat.id == UserChat.chat_id)
.where(UserChat.user_id == user_id, Chat.visibility == True) # noqa: E712
)
result = await self.session.execute(query)
result = result.scalar_one()
return SAllowedChats.model_validate(result)
async def add_user_avatar(self, user_id: int, avatar: str) -> None:
stmt = insert(UserAvatar).values(user_id=user_id, avatar_image=avatar)
await self.session.execute(stmt)
async def get_user_avatars(self, user_id: int) -> SUserAvatars:
query = select(
func.json_build_object(
"user_avatars", select(
func.json_agg(
func.json_build_object(
"avatar_url", UserAvatar.avatar_image
)
)
)
.where(UserAvatar.user_id == user_id)
.scalar_subquery()
)
)
result = await self.session.execute(query)
result = result.scalar()
return SUserAvatars.model_validate(result)