link_shortener/app/service.py

48 lines
1.3 KiB
Python

import time
from urllib.parse import urljoin
import redis
from fastapi import HTTPException
from app.config import settings
from app.dao import URLsDAO
from app.shemas import SGetStats, SNewUrl
async def get_visit_times(list_key: str, r: redis.Redis) -> SGetStats:
if await r.exists(list_key):
timestamps = await r.lrange(list_key, 0, -1)
visit_times = len(timestamps)
else:
visit_times = 0
timestamps = []
return SGetStats(visit_times=visit_times, timestamps=timestamps)
async def add_visit(list_key: str, r: redis.Redis) -> None:
timestamp = int(time.time())
await r.lpush(list_key, timestamp)
async def get_original_url(url_hash: int, r: redis.Redis) -> str:
if await r.exists(str(url_hash)):
url_bytes = await r.get(str(url_hash))
url = url_bytes.decode('utf-8')
else:
urls = await URLsDAO.find_by_hash(url_hash)
if not urls:
raise HTTPException(status_code=404, detail="Этого адреса не существует")
url = urls.original_url
await r.set(str(url_hash), url)
await r.expire(str(url_hash), 60)
return url
async def add_url(url: str) -> SNewUrl:
url_hash = await URLsDAO.add_new_url(original_url=url)
new_url = urljoin(settings.SHORTENER_HOST, str(url_hash))
return SNewUrl(new_url=new_url)