import asyncio import os from datetime import datetime from pathlib import Path from urllib.parse import urlparse from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.database import get_session router = APIRouter() def _dir_stats(path: Path) -> dict: files = [f for f in path.rglob("*") if f.is_file() and not f.name.startswith(".")] return {"count": len(files), "size_bytes": sum(f.stat().st_size for f in files)} @router.get("/stats") async def get_stats(session: AsyncSession = Depends(get_session)): # Taille de la BDD db_size = (await session.execute(text("SELECT pg_database_size(current_database())"))).scalar() # Compteurs notes_count = (await session.execute(text("SELECT COUNT(*) FROM notes.items"))).scalar() todos_count = (await session.execute(text("SELECT COUNT(*) FROM todos.items"))).scalar() lists_count = (await session.execute(text("SELECT COUNT(*) FROM shopping.lists"))).scalar() # Médias sur le disque uploads = settings.upload_path photos = _dir_stats(uploads / "images" / "originals") if (uploads / "images" / "originals").exists() else {"count": 0, "size_bytes": 0} audio = _dir_stats(uploads / "audio") if (uploads / "audio").exists() else {"count": 0, "size_bytes": 0} return { "db_size_bytes": db_size, "media": { "photos": photos, "audio": audio, }, "counts": { "notes": notes_count, "todos": todos_count, "shopping_lists": lists_count, }, } def _pg_env() -> dict: url = urlparse(settings.database_url.replace("+asyncpg", "")) env = os.environ.copy() env["PGPASSWORD"] = url.password or "" return env, url @router.post("/backup") async def create_backup(): backup_dir = settings.backup_path backup_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y-%m-%d_%H%M") filename = f"homehub_{timestamp}.dump" filepath = backup_dir / filename env, url = _pg_env() proc = await asyncio.create_subprocess_exec( "pg_dump", "-Fc", "-h", url.hostname or "db", "-p", str(url.port or 5432), "-U", url.username or "homehub", "-d", (url.path or "/homehub").lstrip("/"), "-f", str(filepath), env=env, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise HTTPException(500, f"Échec du backup : {stderr.decode()}") stat = filepath.stat() return { "filename": filename, "size": stat.st_size, "created_at": datetime.now().isoformat(), } @router.get("/backups") async def list_backups(): backup_dir = settings.backup_path if not backup_dir.exists(): return [] files = sorted(backup_dir.glob("*.dump"), key=lambda f: f.stat().st_mtime, reverse=True) return [ { "filename": f.name, "size": f.stat().st_size, "created_at": datetime.fromtimestamp(f.stat().st_mtime).isoformat(), } for f in files ] @router.post("/restore/{filename}") async def restore_backup(filename: str): # Sécurité : interdit les chemins relatifs if "/" in filename or ".." in filename or not filename.endswith(".dump"): raise HTTPException(400, "Nom de fichier invalide") filepath = settings.backup_path / filename if not filepath.exists(): raise HTTPException(404, "Fichier introuvable") env, url = _pg_env() proc = await asyncio.create_subprocess_exec( "pg_restore", "--clean", "--if-exists", "--no-owner", "--no-privileges", "-h", url.hostname or "db", "-p", str(url.port or 5432), "-U", url.username or "homehub", "-d", (url.path or "/homehub").lstrip("/"), str(filepath), env=env, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() # pg_restore retourne 1 pour des warnings non-fatals (DROP IF EXISTS sur objets manquants) if proc.returncode not in (0, 1): raise HTTPException(500, f"Échec de la restauration : {stderr.decode()}") return {"message": "Restauration réussie"}