from datetime import timedelta from pydantic import BaseModel from redis.asyncio.client import Redis from redis.exceptions import RedisError from app.config import settings from app.exceptions import BlackPhoenixException def get_redis_session() -> Redis: return Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB) class RedisService: def __init__(self, is_raise=None): self.redis_session_factory = get_redis_session self.is_raise = is_raise async def __aenter__(self): self.redis_session = self.redis_session_factory() return self async def __aexit__(self, exc_type, exc, tb): if isinstance(exc, RedisError): if self.is_raise: raise BlackPhoenixException return True async def set_key(self, key: str, expire_time: timedelta, value: str) -> None: await self.redis_session.setex(key, expire_time, value) async def get_value[T: type[BaseModel]](self, key: str, model: T | None = None) -> T | str: value = await self.redis_session.get(key) return model.model_validate_json(value) if model else value.decode() async def delete_key(self, key: str) -> None: await self.redis_session.delete(key)