From d269ca8bc9ed977de8cafbbe27171de9a4700750 Mon Sep 17 00:00:00 2001 From: Gilles Soulier Date: Thu, 28 May 2026 20:07:20 +0200 Subject: [PATCH] =?UTF-8?q?docs:=20plan=20d'impl=C3=A9mentation=20complet?= =?UTF-8?q?=20(10=20t=C3=A2ches)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Script Python stdlib, FastAPI + SQLite + Alembic, Docker Compose nginx, routes dashboard + agents IA, tests pytest par tâche. Co-Authored-By: Claude Sonnet 4.6 --- .../plans/2026-05-28-inventaire-hdd.md | 2040 +++++++++++++++++ 1 file changed, 2040 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-28-inventaire-hdd.md diff --git a/docs/superpowers/plans/2026-05-28-inventaire-hdd.md b/docs/superpowers/plans/2026-05-28-inventaire-hdd.md new file mode 100644 index 0000000..2ebe785 --- /dev/null +++ b/docs/superpowers/plans/2026-05-28-inventaire-hdd.md @@ -0,0 +1,2040 @@ +# Inventaire HDD — Plan d'implémentation + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Construire le système complet d'inventaire disques : script client Python stdlib only, backend FastAPI + SQLite avec Alembic, déployé en Docker Compose avec nginx. + +**Architecture:** Script `inventaire.py` one-shot (root, stdlib only) POSTe un payload JSON vers `POST /api/ingest`. Backend FastAPI stocke dans SQLite via SQLAlchemy 2. nginx sur port 8088 proxifie `/api/*` vers FastAPI et sert le frontend statique. Alembic applique les migrations au démarrage. + +**Tech Stack:** Python 3.11, FastAPI 0.111, SQLAlchemy 2.0, Alembic 1.13, Pydantic v2, pytest + httpx, SQLite, Docker + nginx:alpine + +--- + +## Structure des fichiers + +``` +mes_hdd/ +├── inventaire.py # Script client stdlib only +├── api/ +│ ├── Dockerfile +│ ├── requirements.txt +│ ├── main.py # App FastAPI + startup +│ ├── database.py # Engine SQLAlchemy + get_db +│ ├── models.py # ORM : Machine, Disk, Snapshot +│ ├── schemas.py # Pydantic : IngestPayload + réponses +│ ├── routes/ +│ │ ├── __init__.py +│ │ ├── ingest.py # POST /api/ingest +│ │ ├── disks.py # GET /api/disks/* +│ │ ├── machines.py # GET /api/machines/* +│ │ └── ai.py # GET /api/ai/* +│ ├── services/ +│ │ ├── __init__.py +│ │ └── ingest_service.py # Logique upsert + détection mouvement +│ └── migrations/ +│ ├── env.py +│ ├── script.py.mako +│ └── versions/ +│ └── 0001_initial.py +├── frontend/ +│ └── index.html # Placeholder étape 2 +├── tests/ +│ ├── conftest.py # Fixtures pytest (DB in-memory, client) +│ ├── test_ingest.py +│ ├── test_disks_api.py +│ ├── test_machines_api.py +│ ├── test_ai_api.py +│ └── test_inventaire.py # Tests script client (mock subprocess) +├── alembic.ini +├── docker-compose.yml +└── nginx.conf +``` + +--- + +## Task 1 : Infrastructure Docker + nginx + +**Files:** +- Create: `docker-compose.yml` +- Create: `nginx.conf` +- Create: `api/Dockerfile` +- Create: `api/requirements.txt` +- Create: `frontend/index.html` + +- [ ] **Step 1: Créer `api/requirements.txt`** + +``` +fastapi==0.111.0 +uvicorn[standard]==0.29.0 +sqlalchemy==2.0.30 +alembic==1.13.1 +pydantic==2.7.1 +httpx==0.27.0 +pytest==8.2.0 +pytest-asyncio==0.23.6 +``` + +- [ ] **Step 2: Créer `api/Dockerfile`** + +```dockerfile +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . +CMD ["sh", "-c", "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 8000"] +``` + +- [ ] **Step 3: Créer `nginx.conf`** + +```nginx +server { + listen 80; + + location /api/ { + proxy_pass http://api:8000/api/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + } + + location / { + root /usr/share/nginx/html; + try_files $uri $uri/ /index.html; + } +} +``` + +- [ ] **Step 4: Créer `docker-compose.yml`** + +```yaml +services: + api: + build: ./api + restart: unless-stopped + volumes: + - mes_hdd_db:/data + environment: + DB_PATH: /data/mes_hdd.db + expose: + - "8000" + + web: + image: nginx:alpine + restart: unless-stopped + ports: + - "8088:80" + volumes: + - ./frontend:/usr/share/nginx/html:ro + - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro + depends_on: + - api + +volumes: + mes_hdd_db: +``` + +- [ ] **Step 5: Créer `frontend/index.html` (placeholder)** + +```html + + +Inventaire HDD +

Frontend — étape 2

+ +``` + +- [ ] **Step 6: Commit** + +```bash +git add docker-compose.yml nginx.conf api/Dockerfile api/requirements.txt frontend/ +git commit -m "feat: infrastructure Docker + nginx" +``` + +--- + +## Task 2 : Modèles SQLAlchemy + +**Files:** +- Create: `api/database.py` +- Create: `api/models.py` + +- [ ] **Step 1: Écrire le test** + +```python +# tests/test_models.py +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from api.models import Base, Machine, Disk, Snapshot + +def test_tables_créées(): + engine = create_engine("sqlite:///:memory:") + Base.metadata.create_all(engine) + Session = sessionmaker(bind=engine) + db = Session() + + db.add(Machine(hostname="pve1", ip="10.0.0.1", os="proxmox", + os_version="8.2", last_seen="2026-05-28T10:00:00")) + db.add(Disk(serial="SN123", model="Test Disk", type="HDD", + capacity_bytes=1000000000, capacity_human="1.0 Go", + first_seen_host="pve1", first_seen_at="2026-05-28T10:00:00", + last_seen_host="pve1", last_seen_at="2026-05-28T10:00:00", + smart_status="ok")) + db.add(Snapshot(serial="SN123", hostname="pve1", device="sda", + smart_status="ok", smart_label="Bon état", + smart_detail="100h · 30°C", smart_raw_json="{}", + partitions_json="[]", collected_at="2026-05-28T10:00:00")) + db.commit() + + assert db.query(Machine).count() == 1 + assert db.query(Disk).count() == 1 + assert db.query(Snapshot).count() == 1 +``` + +- [ ] **Step 2: Vérifier que le test échoue** + +```bash +cd api && python -m pytest ../tests/test_models.py -v +``` +Attendu : `ImportError` (modules non créés) + +- [ ] **Step 3: Créer `api/database.py`** + +```python +import os +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +DB_PATH = os.environ.get("DB_PATH", "/data/mes_hdd.db") +engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() +``` + +- [ ] **Step 4: Créer `api/models.py`** + +```python +from sqlalchemy import Column, String, Integer, Text +from sqlalchemy.orm import DeclarativeBase + +class Base(DeclarativeBase): + pass + +class Machine(Base): + __tablename__ = "machines" + hostname = Column(String, primary_key=True) + ip = Column(String) + os = Column(String) + os_version = Column(String) + last_seen = Column(String) + +class Disk(Base): + __tablename__ = "disks" + serial = Column(String, primary_key=True) + model = Column(String) + type = Column(String) + capacity_bytes = Column(Integer) + capacity_human = Column(String) + first_seen_host = Column(String) + first_seen_at = Column(String) + last_seen_host = Column(String) + last_seen_at = Column(String) + smart_status = Column(String) + +class Snapshot(Base): + __tablename__ = "snapshots" + id = Column(Integer, primary_key=True, autoincrement=True) + serial = Column(String, index=True) + hostname = Column(String) + device = Column(String) + smart_status = Column(String) + smart_label = Column(String) + smart_detail = Column(String) + smart_raw_json = Column(Text) + partitions_json = Column(Text) + collected_at = Column(String) +``` + +- [ ] **Step 5: Vérifier que le test passe** + +```bash +cd api && python -m pytest ../tests/test_models.py -v +``` +Attendu : `PASSED` + +- [ ] **Step 6: Commit** + +```bash +git add api/database.py api/models.py tests/test_models.py +git commit -m "feat: modèles SQLAlchemy Machine/Disk/Snapshot" +``` + +--- + +## Task 3 : Alembic — migration initiale + +**Files:** +- Create: `alembic.ini` +- Create: `api/migrations/env.py` +- Create: `api/migrations/script.py.mako` +- Create: `api/migrations/versions/0001_initial.py` + +- [ ] **Step 1: Créer `alembic.ini` (à la racine de `api/`)** + +```ini +[alembic] +script_location = migrations +sqlalchemy.url = sqlite:////data/mes_hdd.db + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S +``` + +- [ ] **Step 2: Créer `api/migrations/env.py`** + +```python +import os +from logging.config import fileConfig +from sqlalchemy import engine_from_config, pool +from alembic import context + +config = context.config + +if config.config_file_name: + fileConfig(config.config_file_name) + +# Override URL depuis env si présent +db_path = os.environ.get("DB_PATH", "/data/mes_hdd.db") +config.set_main_option("sqlalchemy.url", f"sqlite:///{db_path}") + +from api.models import Base +target_metadata = Base.metadata + +def run_migrations_offline(): + url = config.get_main_option("sqlalchemy.url") + context.configure(url=url, target_metadata=target_metadata, + literal_binds=True, dialect_opts={"paramstyle": "named"}) + with context.begin_transaction(): + context.run_migrations() + +def run_migrations_online(): + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + with context.begin_transaction(): + context.run_migrations() + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() +``` + +- [ ] **Step 3: Créer `api/migrations/script.py.mako`** + +``` +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + +def upgrade(): + ${upgrades if upgrades else "pass"} + +def downgrade(): + ${downgrades if downgrades else "pass"} +``` + +- [ ] **Step 4: Créer `api/migrations/versions/0001_initial.py`** + +```python +"""Initial schema + +Revision ID: 0001 +Revises: +Create Date: 2026-05-28 +""" +from alembic import op +import sqlalchemy as sa + +revision = "0001" +down_revision = None +branch_labels = None +depends_on = None + +def upgrade(): + op.create_table("machines", + sa.Column("hostname", sa.String(), primary_key=True), + sa.Column("ip", sa.String()), + sa.Column("os", sa.String()), + sa.Column("os_version", sa.String()), + sa.Column("last_seen", sa.String()), + ) + op.create_table("disks", + sa.Column("serial", sa.String(), primary_key=True), + sa.Column("model", sa.String()), + sa.Column("type", sa.String()), + sa.Column("capacity_bytes", sa.Integer()), + sa.Column("capacity_human", sa.String()), + sa.Column("first_seen_host", sa.String()), + sa.Column("first_seen_at", sa.String()), + sa.Column("last_seen_host", sa.String()), + sa.Column("last_seen_at", sa.String()), + sa.Column("smart_status", sa.String()), + ) + op.create_table("snapshots", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("serial", sa.String(), index=True), + sa.Column("hostname", sa.String()), + sa.Column("device", sa.String()), + sa.Column("smart_status", sa.String()), + sa.Column("smart_label", sa.String()), + sa.Column("smart_detail", sa.String()), + sa.Column("smart_raw_json", sa.Text()), + sa.Column("partitions_json", sa.Text()), + sa.Column("collected_at", sa.String()), + ) + +def downgrade(): + op.drop_table("snapshots") + op.drop_table("disks") + op.drop_table("machines") +``` + +- [ ] **Step 5: Vérifier que la migration s'applique** + +```bash +cd api && DB_PATH=/tmp/test_mes_hdd.db alembic upgrade head +``` +Attendu : `Running upgrade -> 0001, Initial schema` + +- [ ] **Step 6: Nettoyer et committer** + +```bash +rm -f /tmp/test_mes_hdd.db +git add alembic.ini api/migrations/ +git commit -m "feat: Alembic migrations initiales (3 tables)" +``` + +--- + +## Task 4 : Schémas Pydantic + +**Files:** +- Create: `api/schemas.py` + +- [ ] **Step 1: Écrire le test de validation** + +```python +# tests/test_schemas.py +from api.schemas import IngestPayload + +def _payload(): + return { + "hostname": "pve1", "ip": "10.0.0.1", + "os": "proxmox", "os_version": "8.2", + "collected_at": "2026-05-28T15:00:00+02:00", + "disks": [{ + "device": "sda", "path": "/dev/sda", + "by_id": "ata-SN123", "model": "Test Disk", + "serial": "SN123", "type": "HDD", + "capacity_bytes": 1000000000, "capacity_human": "1.0 Go", + "bus": "sata", + "smart": { + "status": "ok", "label": "Bon état", + "detail": "100h · 30°C · aucun secteur défectueux", + "temperature_c": 30, "power_on_hours": 100, + "reallocated_sectors": 0, "pending_sectors": 0, + "uncorrectable_sectors": 0, + }, + "partitions": [{ + "name": "sda1", "uuid": "abc-123", + "fstype": "ext4", "size_bytes": 500000000, + "size_human": "500 Mo", "used_bytes": 100000000, + "used_human": "100 Mo", "free_bytes": 400000000, + "free_human": "400 Mo", "used_percent": 20, + "mountpoint": "/", "home_users": None, "lvm": None, + }], + }], + } + +def test_ingest_payload_valide(): + p = IngestPayload.model_validate(_payload()) + assert p.hostname == "pve1" + assert len(p.disks) == 1 + assert p.disks[0].serial == "SN123" + assert p.disks[0].smart.status == "ok" + assert p.disks[0].partitions[0].uuid == "abc-123" + +def test_partition_lvm(): + data = _payload() + data["disks"][0]["partitions"][0]["lvm"] = { + "vg_name": "vg_data", + "logical_volumes": [{ + "lv_name": "lv_home", "size_human": "300 Go", + "used_human": "50 Go", "free_human": "250 Go", + "used_percent": 17, "fstype": "ext4", "mountpoint": "/home", + }], + } + data["disks"][0]["partitions"][0]["fstype"] = "LVM2_member" + p = IngestPayload.model_validate(data) + lvm = p.disks[0].partitions[0].lvm + assert lvm.vg_name == "vg_data" + assert lvm.logical_volumes[0].lv_name == "lv_home" +``` + +- [ ] **Step 2: Vérifier que le test échoue** + +```bash +cd api && python -m pytest ../tests/test_schemas.py -v +``` +Attendu : `ImportError` + +- [ ] **Step 3: Créer `api/schemas.py`** + +```python +from __future__ import annotations +from pydantic import BaseModel +from typing import Optional, List + +class SmartData(BaseModel): + status: str + label: str + detail: str + temperature_c: Optional[int] = None + power_on_hours: Optional[int] = None + reallocated_sectors: Optional[int] = None + pending_sectors: Optional[int] = None + uncorrectable_sectors: Optional[int] = None + +class HomeUser(BaseModel): + user: str + size_bytes: int + size_human: str + +class LogicalVolume(BaseModel): + lv_name: str + size_human: str + used_human: Optional[str] = None + free_human: Optional[str] = None + used_percent: Optional[int] = None + fstype: Optional[str] = None + mountpoint: Optional[str] = None + +class LvmInfo(BaseModel): + vg_name: str + logical_volumes: List[LogicalVolume] + +class Partition(BaseModel): + name: str + uuid: Optional[str] = None + fstype: Optional[str] = None + size_bytes: Optional[int] = None + size_human: Optional[str] = None + used_bytes: Optional[int] = None + used_human: Optional[str] = None + free_bytes: Optional[int] = None + free_human: Optional[str] = None + used_percent: Optional[int] = None + mountpoint: Optional[str] = None + home_users: Optional[List[HomeUser]] = None + lvm: Optional[LvmInfo] = None + +class DiskPayload(BaseModel): + device: str + path: str + by_id: Optional[str] = None + model: str + serial: str + type: str + capacity_bytes: int + capacity_human: str + bus: str + smart: SmartData + partitions: List[Partition] = [] + proxmox_role: Optional[str] = None + +class IngestPayload(BaseModel): + hostname: str + ip: str + os: str + os_version: str + collected_at: str + disks: List[DiskPayload] + +class IngestResponse(BaseModel): + accepted: int + hostname: str + +class DiskSummary(BaseModel): + serial: str + model: str + type: str + capacity_human: str + last_seen_host: str + last_seen_at: str + smart_status: str + moved: bool + +class DiskDetail(BaseModel): + serial: str + model: str + type: str + capacity_human: str + first_seen_host: str + first_seen_at: str + last_seen_host: str + last_seen_at: str + smart_status: str + moved: bool + snapshots: List[dict] + +class MachineSummary(BaseModel): + hostname: str + ip: str + os: str + os_version: str + last_seen: str + disk_count: int + +class MachineDetail(BaseModel): + hostname: str + ip: str + os: str + os_version: str + last_seen: str + disks: List[DiskSummary] +``` + +- [ ] **Step 4: Vérifier que les tests passent** + +```bash +cd api && python -m pytest ../tests/test_schemas.py -v +``` +Attendu : `2 passed` + +- [ ] **Step 5: Commit** + +```bash +git add api/schemas.py tests/test_schemas.py +git commit -m "feat: schémas Pydantic IngestPayload + réponses API" +``` + +--- + +## Task 5 : Service d'ingest + route POST /api/ingest + +**Files:** +- Create: `api/services/__init__.py` +- Create: `api/services/ingest_service.py` +- Create: `api/routes/__init__.py` +- Create: `api/routes/ingest.py` +- Create: `tests/conftest.py` +- Create: `tests/test_ingest.py` + +- [ ] **Step 1: Créer `tests/conftest.py`** + +```python +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from api.models import Base +from api.database import get_db + +@pytest.fixture(scope="function") +def db_session(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(engine) + +@pytest.fixture(scope="function") +def client(db_session): + from api.main import app + def override(): + yield db_session + app.dependency_overrides[get_db] = override + yield TestClient(app) + app.dependency_overrides.clear() +``` + +- [ ] **Step 2: Écrire `tests/test_ingest.py`** + +```python +def _disk(serial="SN123"): + return { + "device": "sda", "path": "/dev/sda", "by_id": None, + "model": "Test Disk", "serial": serial, "type": "HDD", + "capacity_bytes": 1_000_000_000, "capacity_human": "1.0 Go", + "bus": "sata", + "smart": {"status": "ok", "label": "Bon état", + "detail": "100h · 30°C", "temperature_c": 30, + "power_on_hours": 100, "reallocated_sectors": 0, + "pending_sectors": 0, "uncorrectable_sectors": 0}, + "partitions": [], + } + +def _payload(**kwargs): + base = {"hostname": "pve1", "ip": "10.0.0.1", + "os": "proxmox", "os_version": "8.2", + "collected_at": "2026-05-28T15:00:00+02:00", + "disks": [_disk()]} + base.update(kwargs) + return base + +def test_ingest_crée_disque_et_machine(client): + r = client.post("/api/ingest", json=_payload()) + assert r.status_code == 200 + assert r.json() == {"accepted": 1, "hostname": "pve1"} + +def test_ingest_upsert_machine(client): + client.post("/api/ingest", json=_payload(ip="10.0.0.1")) + client.post("/api/ingest", json=_payload(ip="10.0.0.99")) + r = client.get("/api/machines/pve1") + assert r.json()["ip"] == "10.0.0.99" + +def test_ingest_détecte_mouvement_disque(client): + client.post("/api/ingest", json=_payload(hostname="pve1")) + client.post("/api/ingest", json={**_payload(hostname="pve2"), + "disks": [_disk("SN123")]}) + r = client.get("/api/disks/SN123") + data = r.json() + assert data["first_seen_host"] == "pve1" + assert data["last_seen_host"] == "pve2" + assert data["moved"] is True + +def test_ingest_crée_snapshot_par_envoi(client): + client.post("/api/ingest", json=_payload()) + client.post("/api/ingest", json=_payload()) + r = client.get("/api/disks/SN123") + assert len(r.json()["snapshots"]) == 2 +``` + +- [ ] **Step 3: Vérifier que les tests échouent** + +```bash +cd api && python -m pytest ../tests/test_ingest.py -v +``` +Attendu : `ImportError` ou `404` + +- [ ] **Step 4: Créer `api/services/__init__.py`** (vide) + +```bash +touch api/services/__init__.py api/routes/__init__.py +``` + +- [ ] **Step 5: Créer `api/services/ingest_service.py`** + +```python +import json +from sqlalchemy.orm import Session +from api.models import Machine, Disk, Snapshot +from api.schemas import IngestPayload + +def process_ingest(payload: IngestPayload, db: Session) -> int: + ts = payload.collected_at + + machine = db.get(Machine, payload.hostname) + if machine is None: + machine = Machine(hostname=payload.hostname) + machine.ip = payload.ip + machine.os = payload.os + machine.os_version = payload.os_version + machine.last_seen = ts + db.merge(machine) + + count = 0 + for d in payload.disks: + disk = db.get(Disk, d.serial) + if disk is None: + disk = Disk(serial=d.serial, + first_seen_host=payload.hostname, + first_seen_at=ts) + disk.model = d.model + disk.type = d.type + disk.capacity_bytes = d.capacity_bytes + disk.capacity_human = d.capacity_human + disk.last_seen_host = payload.hostname + disk.last_seen_at = ts + disk.smart_status = d.smart.status + db.merge(disk) + + snap = Snapshot( + serial=d.serial, + hostname=payload.hostname, + device=d.device, + smart_status=d.smart.status, + smart_label=d.smart.label, + smart_detail=d.smart.detail, + smart_raw_json=json.dumps({ + "temperature_c": d.smart.temperature_c, + "power_on_hours": d.smart.power_on_hours, + "reallocated_sectors": d.smart.reallocated_sectors, + "pending_sectors": d.smart.pending_sectors, + "uncorrectable_sectors": d.smart.uncorrectable_sectors, + }), + partitions_json=json.dumps([p.model_dump() for p in d.partitions]), + collected_at=ts, + ) + db.add(snap) + count += 1 + + db.commit() + return count +``` + +- [ ] **Step 6: Créer `api/routes/ingest.py`** + +```python +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session +from api.database import get_db +from api.schemas import IngestPayload, IngestResponse +from api.services.ingest_service import process_ingest + +router = APIRouter() + +@router.post("/ingest", response_model=IngestResponse) +def ingest(payload: IngestPayload, db: Session = Depends(get_db)): + count = process_ingest(payload, db) + return IngestResponse(accepted=count, hostname=payload.hostname) +``` + +- [ ] **Step 7: Créer `api/main.py` minimal pour que les tests tournent** + +```python +from fastapi import FastAPI +from api.routes.ingest import router as ingest_router + +app = FastAPI(title="Inventaire HDD") +app.include_router(ingest_router, prefix="/api") +``` + +- [ ] **Step 8: Vérifier que les tests passent** + +```bash +cd api && python -m pytest ../tests/test_ingest.py -v +``` +Attendu : `4 passed` + +- [ ] **Step 9: Commit** + +```bash +git add api/services/ api/routes/ingest.py api/routes/__init__.py api/main.py tests/conftest.py tests/test_ingest.py +git commit -m "feat: service ingest + POST /api/ingest" +``` + +--- + +## Task 6 : Routes lecture disques + machines + +**Files:** +- Create: `api/routes/disks.py` +- Create: `api/routes/machines.py` +- Create: `tests/test_disks_api.py` +- Create: `tests/test_machines_api.py` + +- [ ] **Step 1: Écrire `tests/test_disks_api.py`** + +```python +def _ingest(client, hostname="pve1", serial="SN123", smart_status="ok"): + client.post("/api/ingest", json={ + "hostname": hostname, "ip": "10.0.0.1", + "os": "proxmox", "os_version": "8.2", + "collected_at": "2026-05-28T15:00:00+02:00", + "disks": [{"device": "sda", "path": "/dev/sda", "by_id": None, + "model": "Test", "serial": serial, "type": "HDD", + "capacity_bytes": 1000000, "capacity_human": "1 Mo", + "bus": "sata", + "smart": {"status": smart_status, "label": "Bon état", + "detail": "", "temperature_c": None, + "power_on_hours": None, "reallocated_sectors": None, + "pending_sectors": None, "uncorrectable_sectors": None}, + "partitions": []}], + }) + +def test_liste_disques(client): + _ingest(client) + r = client.get("/api/disks") + assert r.status_code == 200 + assert len(r.json()) == 1 + assert r.json()[0]["serial"] == "SN123" + +def test_détail_disque(client): + _ingest(client, hostname="pve1") + r = client.get("/api/disks/SN123") + assert r.status_code == 200 + data = r.json() + assert data["serial"] == "SN123" + assert data["moved"] is False + assert len(data["snapshots"]) == 1 + +def test_disque_inexistant(client): + r = client.get("/api/disks/INCONNU") + assert r.status_code == 404 +``` + +- [ ] **Step 2: Écrire `tests/test_machines_api.py`** + +```python +def _ingest(client, hostname="pve1"): + client.post("/api/ingest", json={ + "hostname": hostname, "ip": "10.0.0.5", + "os": "debian", "os_version": "12", + "collected_at": "2026-05-28T15:00:00+02:00", + "disks": [{"device": "sda", "path": "/dev/sda", "by_id": None, + "model": "M", "serial": f"SN-{hostname}", "type": "SSD", + "capacity_bytes": 500000, "capacity_human": "500 Ko", + "bus": "sata", + "smart": {"status": "ok", "label": "Bon état", "detail": "", + "temperature_c": None, "power_on_hours": None, + "reallocated_sectors": None, "pending_sectors": None, + "uncorrectable_sectors": None}, + "partitions": []}], + }) + +def test_liste_machines(client): + _ingest(client, "pve1") + _ingest(client, "pve2") + r = client.get("/api/machines") + assert r.status_code == 200 + assert len(r.json()) == 2 + +def test_détail_machine(client): + _ingest(client, "pve1") + r = client.get("/api/machines/pve1") + assert r.status_code == 200 + data = r.json() + assert data["hostname"] == "pve1" + assert data["disk_count"] == 1 + +def test_disques_machine(client): + _ingest(client, "pve1") + r = client.get("/api/machines/pve1/disks") + assert r.status_code == 200 + assert len(r.json()) == 1 + +def test_machine_inexistante(client): + r = client.get("/api/machines/INCONNU") + assert r.status_code == 404 +``` + +- [ ] **Step 3: Créer `api/routes/disks.py`** + +```python +import json +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from api.database import get_db +from api.models import Disk, Snapshot +from api.schemas import DiskSummary, DiskDetail + +router = APIRouter() + +def _to_summary(disk: Disk) -> DiskSummary: + return DiskSummary( + serial=disk.serial, model=disk.model, type=disk.type, + capacity_human=disk.capacity_human, + last_seen_host=disk.last_seen_host, last_seen_at=disk.last_seen_at, + smart_status=disk.smart_status, + moved=disk.first_seen_host != disk.last_seen_host, + ) + +@router.get("/disks", response_model=list[DiskSummary]) +def list_disks(db: Session = Depends(get_db)): + return [_to_summary(d) for d in db.query(Disk).all()] + +@router.get("/disks/{serial}", response_model=DiskDetail) +def get_disk(serial: str, db: Session = Depends(get_db)): + disk = db.get(Disk, serial) + if disk is None: + raise HTTPException(status_code=404, detail="Disque introuvable") + snaps = (db.query(Snapshot) + .filter(Snapshot.serial == serial) + .order_by(Snapshot.collected_at.desc()) + .all()) + return DiskDetail( + serial=disk.serial, model=disk.model, type=disk.type, + capacity_human=disk.capacity_human, + first_seen_host=disk.first_seen_host, first_seen_at=disk.first_seen_at, + last_seen_host=disk.last_seen_host, last_seen_at=disk.last_seen_at, + smart_status=disk.smart_status, + moved=disk.first_seen_host != disk.last_seen_host, + snapshots=[{ + "hostname": s.hostname, "device": s.device, + "smart_status": s.smart_status, "smart_label": s.smart_label, + "smart_detail": s.smart_detail, + "smart_raw": json.loads(s.smart_raw_json), + "partitions": json.loads(s.partitions_json), + "collected_at": s.collected_at, + } for s in snaps], + ) +``` + +- [ ] **Step 4: Créer `api/routes/machines.py`** + +```python +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from api.database import get_db +from api.models import Machine, Disk +from api.schemas import MachineSummary, MachineDetail, DiskSummary + +router = APIRouter() + +def _disk_summary(disk: Disk) -> DiskSummary: + return DiskSummary( + serial=disk.serial, model=disk.model, type=disk.type, + capacity_human=disk.capacity_human, + last_seen_host=disk.last_seen_host, last_seen_at=disk.last_seen_at, + smart_status=disk.smart_status, + moved=disk.first_seen_host != disk.last_seen_host, + ) + +@router.get("/machines", response_model=list[MachineSummary]) +def list_machines(db: Session = Depends(get_db)): + result = [] + for m in db.query(Machine).all(): + count = db.query(Disk).filter(Disk.last_seen_host == m.hostname).count() + result.append(MachineSummary( + hostname=m.hostname, ip=m.ip, os=m.os, os_version=m.os_version, + last_seen=m.last_seen, disk_count=count, + )) + return result + +@router.get("/machines/{hostname}", response_model=MachineDetail) +def get_machine(hostname: str, db: Session = Depends(get_db)): + m = db.get(Machine, hostname) + if m is None: + raise HTTPException(status_code=404, detail="Machine introuvable") + disks = db.query(Disk).filter(Disk.last_seen_host == hostname).all() + return MachineDetail( + hostname=m.hostname, ip=m.ip, os=m.os, os_version=m.os_version, + last_seen=m.last_seen, disk_count=len(disks), + disks=[_disk_summary(d) for d in disks], + ) + +@router.get("/machines/{hostname}/disks", response_model=list[DiskSummary]) +def get_machine_disks(hostname: str, db: Session = Depends(get_db)): + m = db.get(Machine, hostname) + if m is None: + raise HTTPException(status_code=404, detail="Machine introuvable") + disks = db.query(Disk).filter(Disk.last_seen_host == hostname).all() + return [_disk_summary(d) for d in disks] +``` + +- [ ] **Step 5: Mettre à jour `api/main.py` pour inclure les nouvelles routes** + +```python +from fastapi import FastAPI +from api.routes.ingest import router as ingest_router +from api.routes.disks import router as disks_router +from api.routes.machines import router as machines_router + +app = FastAPI(title="Inventaire HDD") +app.include_router(ingest_router, prefix="/api") +app.include_router(disks_router, prefix="/api") +app.include_router(machines_router, prefix="/api") +``` + +- [ ] **Step 6: Vérifier que tous les tests passent** + +```bash +cd api && python -m pytest ../tests/test_disks_api.py ../tests/test_machines_api.py ../tests/test_ingest.py -v +``` +Attendu : `11 passed` + +- [ ] **Step 7: Commit** + +```bash +git add api/routes/disks.py api/routes/machines.py api/main.py tests/test_disks_api.py tests/test_machines_api.py +git commit -m "feat: routes GET /api/disks/* et /api/machines/*" +``` + +--- + +## Task 7 : Routes agents IA + +**Files:** +- Create: `api/routes/ai.py` +- Create: `tests/test_ai_api.py` + +- [ ] **Step 1: Écrire `tests/test_ai_api.py`** + +```python +def _ingest(client, hostname, serial, smart_status="ok", host2=None): + payload = { + "hostname": hostname, "ip": "10.0.0.1", + "os": "debian", "os_version": "12", + "collected_at": "2026-05-28T15:00:00+02:00", + "disks": [{"device": "sda", "path": "/dev/sda", "by_id": None, + "model": "TestDisk", "serial": serial, "type": "HDD", + "capacity_bytes": 1000000, "capacity_human": "1 Mo", + "bus": "sata", + "smart": {"status": smart_status, "label": "Bon état", + "detail": "", "temperature_c": None, + "power_on_hours": None, "reallocated_sectors": None, + "pending_sectors": None, "uncorrectable_sectors": None}, + "partitions": [{ + "name": "sda1", "uuid": None, "fstype": "ext4", + "size_bytes": 1000000, "size_human": "1 Mo", + "used_bytes": 500000, "used_human": "500 Ko", + "free_bytes": 500000, "free_human": "500 Ko", + "used_percent": 50, "mountpoint": "/home", + "home_users": [{"user": "gilles", "size_bytes": 400000, + "size_human": "400 Ko"}], + "lvm": None, + }]}], + } + client.post("/api/ingest", json=payload) + +def test_ai_summary(client): + _ingest(client, "pve1", "SN1") + r = client.get("/api/ai/summary") + assert r.status_code == 200 + data = r.json() + assert data["total_machines"] == 1 + assert data["total_disks"] == 1 + assert "disks_by_status" in data + +def test_ai_at_risk_vide(client): + _ingest(client, "pve1", "SN1", smart_status="ok") + r = client.get("/api/ai/at-risk") + assert r.status_code == 200 + assert r.json() == [] + +def test_ai_at_risk_avec_warn(client): + _ingest(client, "pve1", "SN1", smart_status="warn") + r = client.get("/api/ai/at-risk") + assert len(r.json()) == 1 + assert r.json()[0]["serial"] == "SN1" + +def test_ai_moved_disks(client): + _ingest(client, "pve1", "SN1") + _ingest(client, "pve2", "SN1") + r = client.get("/api/ai/moved-disks") + assert len(r.json()) == 1 + assert r.json()[0]["serial"] == "SN1" + assert r.json()[0]["from_host"] == "pve1" + assert r.json()[0]["to_host"] == "pve2" + +def test_ai_backup_needed(client): + _ingest(client, "pve1", "SN1") + r = client.get("/api/ai/backup-needed") + assert r.status_code == 200 + result = r.json() + assert len(result) == 1 + assert result[0]["serial"] == "SN1" + assert result[0]["home_users"][0]["user"] == "gilles" + +def test_ai_machine_detail(client): + _ingest(client, "pve1", "SN1") + r = client.get("/api/ai/machines/pve1") + assert r.status_code == 200 + assert r.json()["hostname"] == "pve1" + assert len(r.json()["disks"]) == 1 +``` + +- [ ] **Step 2: Créer `api/routes/ai.py`** + +```python +import json +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from api.database import get_db +from api.models import Disk, Machine, Snapshot + +router = APIRouter() + +@router.get("/ai/summary") +def ai_summary(db: Session = Depends(get_db)): + disks = db.query(Disk).all() + machines = db.query(Machine).all() + by_status: dict[str, int] = {} + for d in disks: + by_status[d.smart_status] = by_status.get(d.smart_status, 0) + 1 + return { + "total_machines": len(machines), + "total_disks": len(disks), + "disks_by_status": by_status, + "machines": [{"hostname": m.hostname, "os": m.os, "last_seen": m.last_seen} + for m in machines], + "disks": [{"serial": d.serial, "model": d.model, "type": d.type, + "host": d.last_seen_host, "smart": d.smart_status, + "capacity": d.capacity_human, "moved": d.first_seen_host != d.last_seen_host} + for d in disks], + } + +@router.get("/ai/at-risk") +def ai_at_risk(db: Session = Depends(get_db)): + disks = db.query(Disk).filter(Disk.smart_status.in_(["warn", "fail"])).all() + result = [] + for d in disks: + snap = (db.query(Snapshot) + .filter(Snapshot.serial == d.serial) + .order_by(Snapshot.collected_at.desc()) + .first()) + result.append({ + "serial": d.serial, "model": d.model, "host": d.last_seen_host, + "smart_status": d.smart_status, + "smart_label": snap.smart_label if snap else "", + "smart_detail": snap.smart_detail if snap else "", + "last_seen": d.last_seen_at, + }) + return result + +@router.get("/ai/moved-disks") +def ai_moved_disks(db: Session = Depends(get_db)): + disks = db.query(Disk).all() + return [ + {"serial": d.serial, "model": d.model, + "from_host": d.first_seen_host, "from_date": d.first_seen_at, + "to_host": d.last_seen_host, "to_date": d.last_seen_at} + for d in disks if d.first_seen_host != d.last_seen_host + ] + +@router.get("/ai/backup-needed") +def ai_backup_needed(db: Session = Depends(get_db)): + snaps = (db.query(Snapshot) + .order_by(Snapshot.collected_at.desc()) + .all()) + seen = set() + result = [] + for snap in snaps: + if snap.serial in seen: + continue + seen.add(snap.serial) + parts = json.loads(snap.partitions_json) + home_users = [] + for p in parts: + if p.get("home_users"): + home_users.extend(p["home_users"]) + if home_users: + disk = db.get(Disk, snap.serial) + result.append({ + "serial": snap.serial, + "model": disk.model if disk else "", + "host": snap.hostname, + "home_users": sorted(home_users, key=lambda u: u["size_bytes"], reverse=True), + }) + result.sort(key=lambda x: sum(u["size_bytes"] for u in x["home_users"]), reverse=True) + return result + +@router.get("/ai/machines/{hostname}") +def ai_machine_detail(hostname: str, db: Session = Depends(get_db)): + m = db.get(Machine, hostname) + if m is None: + raise HTTPException(status_code=404, detail="Machine introuvable") + disks = db.query(Disk).filter(Disk.last_seen_host == hostname).all() + disks_data = [] + for d in disks: + snap = (db.query(Snapshot) + .filter(Snapshot.serial == d.serial, Snapshot.hostname == hostname) + .order_by(Snapshot.collected_at.desc()) + .first()) + disks_data.append({ + "serial": d.serial, "model": d.model, "type": d.type, + "capacity": d.capacity_human, "smart_status": d.smart_status, + "smart_label": snap.smart_label if snap else "", + "smart_detail": snap.smart_detail if snap else "", + "partitions": json.loads(snap.partitions_json) if snap else [], + }) + return {"hostname": m.hostname, "ip": m.ip, "os": m.os, + "os_version": m.os_version, "last_seen": m.last_seen, + "disks": disks_data} +``` + +- [ ] **Step 3: Mettre à jour `api/main.py`** + +```python +from fastapi import FastAPI +from api.routes.ingest import router as ingest_router +from api.routes.disks import router as disks_router +from api.routes.machines import router as machines_router +from api.routes.ai import router as ai_router + +app = FastAPI(title="Inventaire HDD") +app.include_router(ingest_router, prefix="/api") +app.include_router(disks_router, prefix="/api") +app.include_router(machines_router, prefix="/api") +app.include_router(ai_router, prefix="/api") +``` + +- [ ] **Step 4: Vérifier que tous les tests passent** + +```bash +cd api && python -m pytest ../tests/ -v --ignore=../tests/test_inventaire.py +``` +Attendu : `~17 passed` + +- [ ] **Step 5: Commit** + +```bash +git add api/routes/ai.py api/main.py tests/test_ai_api.py +git commit -m "feat: routes GET /api/ai/* (summary, at-risk, moved, backup)" +``` + +--- + +## Task 8 : FastAPI — démarrage + fichiers statiques + +**Files:** +- Modify: `api/main.py` + +- [ ] **Step 1: Mettre à jour `api/main.py` avec lifespan + static** + +```python +from contextlib import asynccontextmanager +from fastapi import FastAPI +from fastapi.staticfiles import StaticFiles +from alembic.config import Config +from alembic import command +import os + +from api.routes.ingest import router as ingest_router +from api.routes.disks import router as disks_router +from api.routes.machines import router as machines_router +from api.routes.ai import router as ai_router + +@asynccontextmanager +async def lifespan(app: FastAPI): + alembic_cfg = Config("/app/alembic.ini") + command.upgrade(alembic_cfg, "head") + yield + +app = FastAPI(title="Inventaire HDD", lifespan=lifespan) +app.include_router(ingest_router, prefix="/api") +app.include_router(disks_router, prefix="/api") +app.include_router(machines_router, prefix="/api") +app.include_router(ai_router, prefix="/api") +``` + +- [ ] **Step 2: Vérifier que tous les tests passent encore** + +```bash +cd api && python -m pytest ../tests/ -v --ignore=../tests/test_inventaire.py +``` +Attendu : `~17 passed` + +- [ ] **Step 3: Commit** + +```bash +git add api/main.py +git commit -m "feat: lifespan FastAPI avec alembic upgrade au démarrage" +``` + +--- + +## Task 9 : Script client `inventaire.py` + +**Files:** +- Create: `inventaire.py` +- Create: `tests/test_inventaire.py` + +- [ ] **Step 1: Écrire `tests/test_inventaire.py`** + +```python +import sys, os +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from unittest.mock import patch, MagicMock +import json +import inventaire + +# ── detect_os ───────────────────────────────────────────────────────────── + +def test_detect_proxmox(tmp_path): + pve = tmp_path / "pve" + pve.mkdir() + with patch("os.path.isdir", side_effect=lambda p: p == str(pve) if "pve" in p else os.path.isdir(p)): + with patch("builtins.open", MagicMock(side_effect=FileNotFoundError)): + pass + # Tester via os-release + os_release = tmp_path / "os-release" + os_release.write_text('ID=debian\nVARIANT_ID=proxmox\nVERSION_ID="8.2"\n') + with patch("builtins.open", open.__class__(os_release)): + pass + # Test simple via parsing direct + content = 'ID=debian\nVARIANT_ID=proxmox\nVERSION_ID="8.2"\n' + with patch("builtins.open", MagicMock(return_value=iter(content.splitlines(keepends=True)))): + with patch("os.path.isdir", return_value=False): + os_type, version = inventaire.detect_os() + assert os_type == "proxmox" + assert version == "8.2" + +def test_detect_ubuntu(): + content = 'ID=ubuntu\nVERSION_ID="22.04"\n' + with patch("builtins.open", MagicMock(return_value=iter(content.splitlines(keepends=True)))): + with patch("os.path.isdir", return_value=False): + os_type, version = inventaire.detect_os() + assert os_type == "ubuntu" + +def test_detect_debian(): + content = 'ID=debian\nVERSION_ID="12"\n' + with patch("builtins.open", MagicMock(return_value=iter(content.splitlines(keepends=True)))): + with patch("os.path.isdir", return_value=False): + os_type, version = inventaire.detect_os() + assert os_type == "debian" + +# ── bytes_human ──────────────────────────────────────────────────────────── + +def test_bytes_human_to(): + assert inventaire.bytes_human(1_099_511_627_776) == "1.0 To" + +def test_bytes_human_go(): + assert inventaire.bytes_human(1_073_741_824) == "1.0 Go" + +def test_bytes_human_none(): + assert inventaire.bytes_human(None) == "?" + +# ── get_smart ──────────────────────────────────────────────────────────── + +SMART_PASSED = """ +SMART overall-health self-assessment test result: PASSED +190 Airflow_Temperature_Cel 0x0022 073 ... 000 Old_age ... 27 +9 Power_On_Hours 0x0032 099 ... 000 Old_age ... 2847 +5 Reallocated_Sector_Ct 0x0033 100 ... 000 Pre-fail ... 0 +197 Current_Pending_Sector 0x0012 100 ... 000 Old_age ... 0 +198 Offline_Uncorrectable 0x0010 100 ... 000 Old_age ... 0 +""" + +SMART_FAILED = "SMART overall-health self-assessment test result: FAILED!" + +SMART_WARN = """ +SMART overall-health self-assessment test result: PASSED +5 Reallocated_Sector_Ct 0x0033 100 ... 000 Pre-fail ... 3 +197 Current_Pending_Sector 0x0012 100 ... 000 Old_age ... 0 +198 Offline_Uncorrectable 0x0010 100 ... 000 Old_age ... 0 +""" + +def test_smart_ok(): + with patch.object(inventaire, "run", return_value=SMART_PASSED): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "ok" + assert r["label"] == "Bon état" + assert r["power_on_hours"] == 2847 + assert r["temperature_c"] == 27 + +def test_smart_fail(): + with patch.object(inventaire, "run", return_value=SMART_FAILED): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "fail" + assert r["label"] == "Défaillance probable" + +def test_smart_warn(): + with patch.object(inventaire, "run", return_value=SMART_WARN): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "warn" + assert "réalloué" in r["detail"] + +def test_smart_unavailable(): + with patch.object(inventaire, "run", return_value=None): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "unavailable" + +# ── get_home_users ──────────────────────────────────────────────────────── + +def test_home_users(): + du_output = "400000\t/home/gilles\n100000\t/home/alice\n" + with patch.object(inventaire, "run", return_value=du_output): + with patch("os.path.isdir", return_value=True): + users = inventaire.get_home_users() + assert users[0]["user"] == "gilles" + assert users[0]["size_bytes"] == 400000 + assert users[1]["user"] == "alice" +``` + +- [ ] **Step 2: Vérifier que les tests échouent** + +```bash +python -m pytest tests/test_inventaire.py -v +``` +Attendu : `ImportError` + +- [ ] **Step 3: Créer `inventaire.py`** + +```python +#!/usr/bin/env python3 +""" +inventaire.py — Inventaire disques HDD/SSD/NVMe +Exécuter en root : sudo python3 inventaire.py +Dépendances : stdlib uniquement (Python 3.9+) +""" +import json, os, re, subprocess, sys, urllib.request, urllib.error +from datetime import datetime, timezone + +API_URL = os.environ.get("MES_HDD_API", "http://10.0.0.50:8088").rstrip("/") + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def run(cmd, default=None): + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=15) + return r.stdout.strip() if r.returncode == 0 else default + except Exception: + return default + +def bytes_human(n): + if n is None: + return "?" + for unit in ("o", "Ko", "Mo", "Go", "To"): + if n < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} Po" + + +# ── Détection OS ───────────────────────────────────────────────────────────── + +def detect_os(): + info = {} + try: + with open("/etc/os-release") as f: + for line in f: + if "=" in line: + k, v = line.strip().split("=", 1) + info[k] = v.strip('"') + except FileNotFoundError: + pass + os_id = info.get("ID", "").lower() + variant = info.get("VARIANT_ID", "").lower() + version = info.get("VERSION_ID", "") + if os.path.isdir("/etc/pve") or variant == "proxmox": + return "proxmox", version + if os_id == "ubuntu": + return "ubuntu", version + return "debian", version + + +# ── Machine ─────────────────────────────────────────────────────────────────── + +def get_hostname(): + return run(["hostname"], default="inconnu") + +def get_ip(): + out = run(["ip", "route", "get", "1.1.1.1"]) + if out: + m = re.search(r'src\s+(\S+)', out) + if m: + return m.group(1) + out = run(["hostname", "-I"]) + if out: + return out.split()[0] + return "inconnu" + + +# ── SMART ────────────────────────────────────────────────────────────────────── + +def _extract_attr(output, name): + m = re.search(rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)", output) + return int(m.group(1)) if m else None + +def _smart_unavailable(reason): + return {"status": "unavailable", "label": "SMART indisponible", "detail": reason, + "temperature_c": None, "power_on_hours": None, "reallocated_sectors": None, + "pending_sectors": None, "uncorrectable_sectors": None} + +def get_smart(dev): + out = run(["smartctl", "-H", "-A", "-i", dev]) + if out is None: + return _smart_unavailable("smartctl absent ou accès refusé") + temp = (_extract_attr(out, "Temperature_Celsius") + or _extract_attr(out, "Airflow_Temperature_Cel")) + poh = _extract_attr(out, "Power_On_Hours") + real = _extract_attr(out, "Reallocated_Sector_Ct") + pend = _extract_attr(out, "Current_Pending_Sector") + uncr = _extract_attr(out, "Offline_Uncorrectable") + base = {"temperature_c": temp, "power_on_hours": poh, + "reallocated_sectors": real, "pending_sectors": pend, + "uncorrectable_sectors": uncr} + if "FAILED!" in out: + return {**base, "status": "fail", "label": "Défaillance probable", + "detail": "Prévoir le remplacement du disque"} + if "PASSED" in out or "Passed" in out: + bad = [v for v in [real, pend, uncr] if v and v > 0] + if bad: + issues = [] + if real and real > 0: issues.append(f"{real} secteur(s) réalloué(s)") + if pend and pend > 0: issues.append(f"{pend} secteur(s) en attente") + if uncr and uncr > 0: issues.append(f"{uncr} secteur(s) non corrigeable(s)") + return {**base, "status": "warn", "label": "Attention", + "detail": ", ".join(issues) + " — disque à surveiller"} + detail_parts = [] + if poh is not None: detail_parts.append(f"{poh:,}h d'utilisation".replace(",", " ")) + if temp is not None: detail_parts.append(f"{temp}°C") + detail_parts.append("aucun secteur défectueux") + return {**base, "status": "ok", "label": "Bon état", + "detail": " · ".join(detail_parts)} + return _smart_unavailable("résultat SMART non interprétable") + + +# ── Métadonnées disque ──────────────────────────────────────────────────────── + +def get_by_id(dev_path): + out = run(["find", "/dev/disk/by-id", "-type", "l"]) + if not out: + return None + for link in out.splitlines(): + if "-part" in link: + continue + target = run(["readlink", "-f", link]) + if target == dev_path: + return os.path.basename(link) + return None + +def get_bus(dev_name): + if dev_name.startswith("nvme"): + return "nvme" + out = run(["udevadm", "info", "--query=property", f"--name=/dev/{dev_name}"]) + if out: + m = re.search(r"^ID_BUS=(.+)$", out, re.MULTILINE) + if m: + return m.group(1).lower() + return "inconnu" + +def disk_type(name, rota): + if name.startswith("nvme"): + return "NVMe" + if str(rota) == "1": + return "HDD" + if str(rota) == "0": + return "SSD" + return "inconnu" + + +# ── Espace disque (df) ──────────────────────────────────────────────────────── + +def get_df_map(): + out = run(["df", "--output=target,size,used,avail", "-B1"]) + result = {} + if not out: + return result + for line in out.splitlines()[1:]: + parts = line.split() + if len(parts) < 4: + continue + try: + result[parts[0]] = {"size_bytes": int(parts[1]), + "used_bytes": int(parts[2]), + "free_bytes": int(parts[3])} + except ValueError: + pass + return result + + +# ── LVM ─────────────────────────────────────────────────────────────────────── + +def _lv_size_human(s): + if not s: + return "?" + s = s.strip() + unit_map = {"k": "Ko", "m": "Mo", "g": "Go", "t": "To"} + if s and s[-1].lower() in unit_map: + try: + return f"{float(s[:-1]):.1f} {unit_map[s[-1].lower()]}" + except ValueError: + pass + return s + +def get_lvm_map(): + pvs_out = run(["pvs", "--noheadings", "--reportformat", "json", + "-o", "pv_name,vg_name"]) + if not pvs_out: + return {} + try: + pvs = json.loads(pvs_out)["report"][0]["pv"] + except (json.JSONDecodeError, KeyError, IndexError): + return {} + lvs_by_vg = {} + lvs_out = run(["lvs", "--noheadings", "--reportformat", "json", + "-o", "lv_name,vg_name,lv_size,lv_path"]) + if lvs_out: + try: + for lv in json.loads(lvs_out)["report"][0]["lv"]: + lvs_by_vg.setdefault(lv.get("vg_name", ""), []).append(lv) + except (json.JSONDecodeError, KeyError, IndexError): + pass + result = {} + for pv in pvs: + pv_name = pv.get("pv_name", "") + vg_name = pv.get("vg_name", "") + if pv_name and vg_name: + result[pv_name] = { + "vg_name": vg_name, + "logical_volumes": [ + {"lv_name": lv.get("lv_name", ""), + "size_human": _lv_size_human(lv.get("lv_size", ""))} + for lv in lvs_by_vg.get(vg_name, []) + ], + } + return result + + +# ── /home users ─────────────────────────────────────────────────────────────── + +def get_home_users(): + if not os.path.isdir("/home"): + return [] + out = run(["du", "--max-depth=1", "-b", "/home"]) + if out is None: + return None + entries = [] + for line in out.splitlines(): + parts = line.split("\t", 1) + if len(parts) != 2: + continue + path = parts[1].strip() + if path.rstrip("/") in ("/home",): + continue + try: + size = int(parts[0]) + user = os.path.basename(path.rstrip("/")) + entries.append({"user": user, "size_bytes": size, + "size_human": bytes_human(size)}) + except ValueError: + continue + return sorted(entries, key=lambda x: x["size_bytes"], reverse=True) + + +# ── Proxmox ─────────────────────────────────────────────────────────────────── + +def get_proxmox_role(dev_name): + zpool_out = run(["zpool", "status", "-P"]) + if zpool_out and f"/dev/{dev_name}" in zpool_out: + return "zfs_pool" + ceph_dir = "/var/lib/ceph/osd" + if os.path.isdir(ceph_dir): + try: + for entry in os.listdir(ceph_dir): + link = os.path.join(ceph_dir, entry, "block") + if os.path.islink(link) and dev_name in os.path.realpath(link): + return "ceph_osd" + except OSError: + pass + return None + + +# ── Construction des partitions ─────────────────────────────────────────────── + +def build_partitions(children, df_map, lvm_map, home_done, os_type): + parts = [] + for child in (children or []): + ctype = child.get("type") + if ctype not in ("part", "lvm"): + continue + name = child.get("name", "") + fstype = child.get("fstype") or None + mountpoint = child.get("mountpoint") or None + if fstype == "squashfs": # Ubuntu snap — ignorer + continue + size_b = child.get("size") + part = { + "name": name, "uuid": child.get("uuid") or None, + "fstype": fstype, + "size_bytes": size_b, "size_human": bytes_human(size_b), + "used_bytes": None, "used_human": None, + "free_bytes": None, "free_human": None, + "used_percent": None, "mountpoint": mountpoint, + "home_users": None, "lvm": None, + } + if mountpoint and mountpoint in df_map: + df = df_map[mountpoint] + part["used_bytes"] = df["used_bytes"] + part["used_human"] = bytes_human(df["used_bytes"]) + part["free_bytes"] = df["free_bytes"] + part["free_human"] = bytes_human(df["free_bytes"]) + if df["size_bytes"] > 0: + part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100) + if mountpoint == "/home" and not home_done[0]: + part["home_users"] = get_home_users() + home_done[0] = True + dev_path = f"/dev/{name}" + if fstype == "LVM2_member" and dev_path in lvm_map: + lvm_info = lvm_map[dev_path] + lvs = [] + for lv in lvm_info["logical_volumes"]: + lv_entry = dict(lv) + lv_entry.setdefault("used_human", None) + lv_entry.setdefault("free_human", None) + lv_entry.setdefault("used_percent", None) + lv_entry.setdefault("fstype", None) + lv_entry.setdefault("mountpoint", None) + lvs.append(lv_entry) + part["lvm"] = {"vg_name": lvm_info["vg_name"], "logical_volumes": lvs} + # Traiter récursivement les enfants LVM + if child.get("children"): + for lv_child in child["children"]: + lv_mp = lv_child.get("mountpoint") or None + lv_name = lv_child.get("name", "") + if part["lvm"] and lv_mp and lv_mp in df_map: + df = df_map[lv_mp] + for lv in part["lvm"]["logical_volumes"]: + if lv["lv_name"] in lv_name: + lv["mountpoint"] = lv_mp + lv["fstype"] = lv_child.get("fstype") + lv["used_human"] = bytes_human(df["used_bytes"]) + lv["free_human"] = bytes_human(df["free_bytes"]) + if df["size_bytes"] > 0: + lv["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100) + if lv_mp == "/home" and not home_done[0]: + for lv in (part["lvm"]["logical_volumes"] if part["lvm"] else []): + if lv.get("mountpoint") == "/home": + lv["home_users"] = get_home_users() + home_done[0] = True + parts.append(part) + return parts + + +# ── Collecte principale ──────────────────────────────────────────────────────── + +def collect(): + os_type, os_version = detect_os() + hostname = get_hostname() + ip = get_ip() + df_map = get_df_map() + lvm_map = get_lvm_map() + + output = run(["lsblk", "-J", "-b", + "-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME"]) + if not output: + print("[inventaire] Impossible de lister les disques via lsblk", file=sys.stderr) + sys.exit(1) + try: + blk = json.loads(output) + except json.JSONDecodeError as e: + print(f"[inventaire] Erreur parsing lsblk: {e}", file=sys.stderr) + sys.exit(1) + + disks = [] + home_done = [False] + + for blkdev in blk.get("blockdevices", []): + if blkdev.get("type") != "disk": + continue + name = blkdev.get("name", "") + dev_path = f"/dev/{name}" + model = (blkdev.get("model") or "inconnu").strip() + serial = (blkdev.get("serial") or "inconnu").strip() + size_b = blkdev.get("size") or 0 + rota = blkdev.get("rota", "") + + partitions = build_partitions( + blkdev.get("children") or [], df_map, lvm_map, home_done, os_type) + + # /home sur partition racine si pas encore trouvé + if not home_done[0]: + for p in partitions: + if p.get("mountpoint") == "/": + p["home_users"] = get_home_users() + home_done[0] = True + break + + disk = { + "device": name, "path": dev_path, + "by_id": get_by_id(dev_path), + "model": model, "serial": serial, + "type": disk_type(name, rota), + "capacity_bytes": size_b, "capacity_human": bytes_human(size_b), + "bus": get_bus(name), + "smart": get_smart(dev_path), + "partitions": partitions, + } + if os_type == "proxmox": + role = get_proxmox_role(name) + if role: + disk["proxmox_role"] = role + + disks.append(disk) + + return { + "hostname": hostname, "ip": ip, + "os": os_type, "os_version": os_version, + "collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(), + "disks": disks, + } + + +# ── HTTP POST ───────────────────────────────────────────────────────────────── + +def post_to_api(payload): + url = f"{API_URL}/api/ingest" + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request(url, data=data, + headers={"Content-Type": "application/json"}, + method="POST") + try: + with urllib.request.urlopen(req, timeout=30) as resp: + body = json.loads(resp.read()) + print(f"[inventaire] OK — {body.get('accepted','?')} disque(s) " + f"enregistré(s) pour {body.get('hostname','?')}") + except urllib.error.HTTPError as e: + print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr) + sys.exit(1) + except urllib.error.URLError as e: + print(f"[inventaire] Impossible de joindre {url}: {e.reason}", file=sys.stderr) + sys.exit(1) + + +# ── Entrypoint ──────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + if os.geteuid() != 0: + print("[inventaire] Ce script doit être exécuté en root (sudo).", file=sys.stderr) + sys.exit(1) + print(f"[inventaire] Collecte en cours vers {API_URL}...") + payload = collect() + post_to_api(payload) +``` + +- [ ] **Step 4: Vérifier que les tests du script passent** + +```bash +python -m pytest tests/test_inventaire.py -v +``` +Attendu : `11 passed` + +- [ ] **Step 5: Commit** + +```bash +git add inventaire.py tests/test_inventaire.py +git commit -m "feat: script client inventaire.py (stdlib only)" +``` + +--- + +## Task 10 : Test end-to-end Docker + +**Files:** +- Modify: `api/Dockerfile` (ajout PYTHONPATH) + +- [ ] **Step 1: Corriger `api/Dockerfile` pour le PYTHONPATH** + +```dockerfile +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . +ENV PYTHONPATH=/app +CMD ["sh", "-c", "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 8000"] +``` + +- [ ] **Step 2: Build et démarrage** + +```bash +docker compose up --build -d +``` +Attendu : 2 conteneurs `Up` + +- [ ] **Step 3: Vérifier que l'API répond** + +```bash +curl -s http://10.0.0.50:8088/api/machines | python3 -m json.tool +``` +Attendu : `[]` + +- [ ] **Step 4: Envoyer un payload de test** + +```bash +curl -s -X POST http://10.0.0.50:8088/api/ingest \ + -H "Content-Type: application/json" \ + -d '{ + "hostname":"test-host","ip":"10.0.0.99", + "os":"debian","os_version":"12", + "collected_at":"2026-05-28T15:00:00+02:00", + "disks":[{ + "device":"sda","path":"/dev/sda","by_id":null, + "model":"Test Disk","serial":"SN-TEST-001","type":"HDD", + "capacity_bytes":1000000000,"capacity_human":"1.0 Go","bus":"sata", + "smart":{"status":"ok","label":"Bon état","detail":"100h · 30°C", + "temperature_c":30,"power_on_hours":100, + "reallocated_sectors":0,"pending_sectors":0,"uncorrectable_sectors":0}, + "partitions":[] + }] + }' | python3 -m json.tool +``` +Attendu : `{"accepted": 1, "hostname": "test-host"}` + +- [ ] **Step 5: Vérifier la lecture** + +```bash +curl -s http://10.0.0.50:8088/api/disks | python3 -m json.tool +curl -s http://10.0.0.50:8088/api/ai/summary | python3 -m json.tool +``` + +- [ ] **Step 6: Commit final** + +```bash +git add api/Dockerfile +git commit -m "fix: PYTHONPATH dans Dockerfile pour import api.*" +``` + +--- + +## Récapitulatif des commits attendus + +1. `feat: infrastructure Docker + nginx` +2. `feat: modèles SQLAlchemy Machine/Disk/Snapshot` +3. `feat: Alembic migrations initiales (3 tables)` +4. `feat: schémas Pydantic IngestPayload + réponses API` +5. `feat: service ingest + POST /api/ingest` +6. `feat: routes GET /api/disks/* et /api/machines/*` +7. `feat: routes GET /api/ai/* (summary, at-risk, moved, backup)` +8. `feat: lifespan FastAPI avec alembic upgrade au démarrage` +9. `feat: script client inventaire.py (stdlib only)` +10. `fix: PYTHONPATH dans Dockerfile pour import api.*`