import asyncio import io import uuid from pathlib import Path from fastapi import HTTPException, UploadFile from PIL import Image from app.core.config import settings UPLOAD_DIR: Path = settings.upload_path ALLOWED_IMAGE_TYPES = { "image/jpeg", "image/jpg", "image/png", "image/webp", "image/svg+xml", "image/heic", "image/heif", } ALLOWED_AUDIO_PREFIXES = {"audio/webm", "audio/mp4", "audio/ogg", "audio/x-m4a", "audio/aac"} ALLOWED_VIDEO_TYPES = {"video/mp4", "video/webm", "video/quicktime", "video/x-m4v", "video/3gpp"} MAX_ORIG_SIZE = (500, 500) THUMBNAIL_SIZES = { "product": (150, 150), "note": (300, 300), "attachment": (400, 300), } async def save_image(file: UploadFile, context: str = "note") -> dict: content_type = (file.content_type or "").lower() if content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=400, detail=f"Format non supporté : {content_type}") file_id = str(uuid.uuid4()) content = await file.read() orig_dir = UPLOAD_DIR / "images" / "originals" orig_dir.mkdir(parents=True, exist_ok=True) if content_type == "image/svg+xml": orig_path = orig_dir / f"{file_id}.svg" orig_path.write_bytes(content) return { "file_id": file_id, "file_path": str(orig_path.relative_to(UPLOAD_DIR)), "thumbnail_path": None, "file_type": "image", } orig_path = orig_dir / f"{file_id}.webp" img = Image.open(io.BytesIO(content)).convert("RGB") img.thumbnail(MAX_ORIG_SIZE, Image.LANCZOS) img.save(orig_path, "WEBP", quality=85) thumb_dir = UPLOAD_DIR / "images" / "thumbnails" thumb_dir.mkdir(parents=True, exist_ok=True) thumb_path = thumb_dir / f"{file_id}_thumb.webp" thumb_size = THUMBNAIL_SIZES.get(context, (300, 300)) img_thumb = img.copy() img_thumb.thumbnail(thumb_size, Image.LANCZOS) img_thumb.save(thumb_path, "WEBP", quality=80) return { "file_id": file_id, "file_path": str(orig_path.relative_to(UPLOAD_DIR)), "thumbnail_path": str(thumb_path.relative_to(UPLOAD_DIR)), "file_type": "image", } async def save_audio(file: UploadFile) -> dict: ct = (file.content_type or "").lower().split(";")[0].strip() if ct not in ALLOWED_AUDIO_PREFIXES: raise HTTPException(status_code=400, detail=f"Format audio non supporté : {file.content_type}") file_id = str(uuid.uuid4()) audio_dir = UPLOAD_DIR / "audio" audio_dir.mkdir(parents=True, exist_ok=True) raw_ext = ".ogg" if "ogg" in ct else (".webm" if "webm" in ct else ".m4a") raw_path = audio_dir / f"{file_id}_raw{raw_ext}" raw_path.write_bytes(await file.read()) # Transcode vers AAC/mp4 pour lecture universelle (Safari iOS, Chrome, Firefox) out_path = audio_dir / f"{file_id}.m4a" proc = await asyncio.create_subprocess_exec( "ffmpeg", "-i", str(raw_path), "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", "-y", str(out_path), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await proc.communicate() if out_path.exists(): raw_path.unlink(missing_ok=True) final_path = out_path else: # ffmpeg indisponible ou échec — conserver le fichier brut final_path = audio_dir / f"{file_id}{raw_ext}" raw_path.rename(final_path) return { "file_id": file_id, "file_path": str(final_path.relative_to(UPLOAD_DIR)), "thumbnail_path": None, "file_type": "audio", } async def save_video(file: UploadFile) -> dict: ct = (file.content_type or "").lower().split(";")[0].strip() if ct not in ALLOWED_VIDEO_TYPES: raise HTTPException(status_code=400, detail=f"Format vidéo non supporté : {file.content_type}") file_id = str(uuid.uuid4()) video_dir = UPLOAD_DIR / "videos" video_dir.mkdir(parents=True, exist_ok=True) content = await file.read() if "webm" in ct: # Transcode webm → H.264/mp4 pour Safari iOS raw_path = video_dir / f"{file_id}_raw.webm" raw_path.write_bytes(content) out_path = video_dir / f"{file_id}.mp4" proc = await asyncio.create_subprocess_exec( "ffmpeg", "-i", str(raw_path), "-c:v", "libx264", "-crf", "28", "-preset", "fast", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", "-y", str(out_path), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await proc.communicate() if out_path.exists(): raw_path.unlink(missing_ok=True) final_path = out_path else: final_path = video_dir / f"{file_id}.webm" raw_path.rename(final_path) else: # mp4/quicktime : déjà compatible, stockage direct final_path = video_dir / f"{file_id}.mp4" final_path.write_bytes(content) return { "file_id": file_id, "file_path": str(final_path.relative_to(UPLOAD_DIR)), "thumbnail_path": None, "file_type": "video", } def delete_media(file_id: str, file_path: str, thumbnail_path: str | None = None) -> None: (UPLOAD_DIR / file_path).unlink(missing_ok=True) if thumbnail_path: (UPLOAD_DIR / thumbnail_path).unlink(missing_ok=True)