image_processor/image_service.py
2025-03-07 16:53:32 +03:00

44 lines
1.2 KiB
Python

import io
from urllib.parse import urljoin
import aioboto3
from PIL import Image
import pillow_avif
from pydantic import HttpUrl
from schemas import SImageWithFormat
SUPPORTED_FORMATS = {"avif", "png", "webp"}
def process_image(img_bytes: bytes, formats: set[str]) -> list[SImageWithFormat]:
images = []
image = Image.open(io.BytesIO(img_bytes))
for image_format in formats:
buffer = io.BytesIO()
image.save(buffer, format=image_format)
images.append(SImageWithFormat.model_validate({"image": buffer.getvalue(), "format": image_format}))
return images
async def upload_images(imgs: list[SImageWithFormat], name: str, upload_url: str, bucket_name: str) -> list[HttpUrl]:
urls = []
s3_session = aioboto3.Session()
async with s3_session.client(
"s3",
endpoint_url=upload_url,
) as s3_client:
for img in imgs:
object_key = f"{name}.{img.format}"
# await s3_client.put_object(
# Bucket=bucket_name,
# Key=object_key,
# Body=img.image,
# ContentType=f"image/{img.format}"
# )
urls.append(HttpUrl(urljoin(upload_url, f"{bucket_name}/{object_key}")))
return urls