40 lines
1.2 KiB
Python
40 lines
1.2 KiB
Python
import logging
|
|
from datetime import timedelta
|
|
|
|
from pydantic import BaseModel
|
|
from redis.asyncio.client import Redis
|
|
from redis.exceptions import RedisError
|
|
|
|
from app.config import settings
|
|
from app.exceptions import BlackPhoenixException
|
|
|
|
|
|
def get_redis_session() -> Redis:
|
|
return Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB)
|
|
|
|
|
|
class RedisService:
|
|
def __init__(self, is_raise=None):
|
|
self.redis_session_factory = get_redis_session
|
|
self.is_raise = is_raise
|
|
|
|
async def __aenter__(self):
|
|
self.redis_session = self.redis_session_factory()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
if isinstance(exc, RedisError):
|
|
if self.is_raise:
|
|
raise BlackPhoenixException
|
|
return True
|
|
|
|
async def set_key(self, key: str, expire_time: timedelta, value: str) -> None:
|
|
await self.redis_session.setex(key, expire_time, value)
|
|
|
|
async def get_value[T: type[BaseModel]](self, key: str, model: T | None = None) -> T | str:
|
|
value = await self.redis_session.get(key)
|
|
logging.warning(value)
|
|
return model.model_validate_json(value) if model else value.decode()
|
|
|
|
async def delete_key(self, key: str) -> None:
|
|
await self.redis_session.delete(key)
|