Script Python stdlib, FastAPI + SQLite + Alembic, Docker Compose nginx, routes dashboard + agents IA, tests pytest par tâche. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
66 KiB
Inventaire HDD — Plan d'implémentation
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Construire le système complet d'inventaire disques : script client Python stdlib only, backend FastAPI + SQLite avec Alembic, déployé en Docker Compose avec nginx.
Architecture: Script inventaire.py one-shot (root, stdlib only) POSTe un payload JSON vers POST /api/ingest. Backend FastAPI stocke dans SQLite via SQLAlchemy 2. nginx sur port 8088 proxifie /api/* vers FastAPI et sert le frontend statique. Alembic applique les migrations au démarrage.
Tech Stack: Python 3.11, FastAPI 0.111, SQLAlchemy 2.0, Alembic 1.13, Pydantic v2, pytest + httpx, SQLite, Docker + nginx:alpine
Structure des fichiers
mes_hdd/
├── inventaire.py # Script client stdlib only
├── api/
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── main.py # App FastAPI + startup
│ ├── database.py # Engine SQLAlchemy + get_db
│ ├── models.py # ORM : Machine, Disk, Snapshot
│ ├── schemas.py # Pydantic : IngestPayload + réponses
│ ├── routes/
│ │ ├── __init__.py
│ │ ├── ingest.py # POST /api/ingest
│ │ ├── disks.py # GET /api/disks/*
│ │ ├── machines.py # GET /api/machines/*
│ │ └── ai.py # GET /api/ai/*
│ ├── services/
│ │ ├── __init__.py
│ │ └── ingest_service.py # Logique upsert + détection mouvement
│ └── migrations/
│ ├── env.py
│ ├── script.py.mako
│ └── versions/
│ └── 0001_initial.py
├── frontend/
│ └── index.html # Placeholder étape 2
├── tests/
│ ├── conftest.py # Fixtures pytest (DB in-memory, client)
│ ├── test_ingest.py
│ ├── test_disks_api.py
│ ├── test_machines_api.py
│ ├── test_ai_api.py
│ └── test_inventaire.py # Tests script client (mock subprocess)
├── alembic.ini
├── docker-compose.yml
└── nginx.conf
Task 1 : Infrastructure Docker + nginx
Files:
-
Create:
docker-compose.yml -
Create:
nginx.conf -
Create:
api/Dockerfile -
Create:
api/requirements.txt -
Create:
frontend/index.html -
Step 1: Créer
api/requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.29.0
sqlalchemy==2.0.30
alembic==1.13.1
pydantic==2.7.1
httpx==0.27.0
pytest==8.2.0
pytest-asyncio==0.23.6
- Step 2: Créer
api/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["sh", "-c", "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 8000"]
- Step 3: Créer
nginx.conf
server {
listen 80;
location /api/ {
proxy_pass http://api:8000/api/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location / {
root /usr/share/nginx/html;
try_files $uri $uri/ /index.html;
}
}
- Step 4: Créer
docker-compose.yml
services:
api:
build: ./api
restart: unless-stopped
volumes:
- mes_hdd_db:/data
environment:
DB_PATH: /data/mes_hdd.db
expose:
- "8000"
web:
image: nginx:alpine
restart: unless-stopped
ports:
- "8088:80"
volumes:
- ./frontend:/usr/share/nginx/html:ro
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
depends_on:
- api
volumes:
mes_hdd_db:
- Step 5: Créer
frontend/index.html(placeholder)
<!DOCTYPE html>
<html data-theme="dark">
<head><meta charset="UTF-8"><title>Inventaire HDD</title></head>
<body><p>Frontend — étape 2</p></body>
</html>
- Step 6: Commit
git add docker-compose.yml nginx.conf api/Dockerfile api/requirements.txt frontend/
git commit -m "feat: infrastructure Docker + nginx"
Task 2 : Modèles SQLAlchemy
Files:
-
Create:
api/database.py -
Create:
api/models.py -
Step 1: Écrire le test
# tests/test_models.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from api.models import Base, Machine, Disk, Snapshot
def test_tables_créées():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
db = Session()
db.add(Machine(hostname="pve1", ip="10.0.0.1", os="proxmox",
os_version="8.2", last_seen="2026-05-28T10:00:00"))
db.add(Disk(serial="SN123", model="Test Disk", type="HDD",
capacity_bytes=1000000000, capacity_human="1.0 Go",
first_seen_host="pve1", first_seen_at="2026-05-28T10:00:00",
last_seen_host="pve1", last_seen_at="2026-05-28T10:00:00",
smart_status="ok"))
db.add(Snapshot(serial="SN123", hostname="pve1", device="sda",
smart_status="ok", smart_label="Bon état",
smart_detail="100h · 30°C", smart_raw_json="{}",
partitions_json="[]", collected_at="2026-05-28T10:00:00"))
db.commit()
assert db.query(Machine).count() == 1
assert db.query(Disk).count() == 1
assert db.query(Snapshot).count() == 1
- Step 2: Vérifier que le test échoue
cd api && python -m pytest ../tests/test_models.py -v
Attendu : ImportError (modules non créés)
- Step 3: Créer
api/database.py
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DB_PATH = os.environ.get("DB_PATH", "/data/mes_hdd.db")
engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
- Step 4: Créer
api/models.py
from sqlalchemy import Column, String, Integer, Text
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Machine(Base):
__tablename__ = "machines"
hostname = Column(String, primary_key=True)
ip = Column(String)
os = Column(String)
os_version = Column(String)
last_seen = Column(String)
class Disk(Base):
__tablename__ = "disks"
serial = Column(String, primary_key=True)
model = Column(String)
type = Column(String)
capacity_bytes = Column(Integer)
capacity_human = Column(String)
first_seen_host = Column(String)
first_seen_at = Column(String)
last_seen_host = Column(String)
last_seen_at = Column(String)
smart_status = Column(String)
class Snapshot(Base):
__tablename__ = "snapshots"
id = Column(Integer, primary_key=True, autoincrement=True)
serial = Column(String, index=True)
hostname = Column(String)
device = Column(String)
smart_status = Column(String)
smart_label = Column(String)
smart_detail = Column(String)
smart_raw_json = Column(Text)
partitions_json = Column(Text)
collected_at = Column(String)
- Step 5: Vérifier que le test passe
cd api && python -m pytest ../tests/test_models.py -v
Attendu : PASSED
- Step 6: Commit
git add api/database.py api/models.py tests/test_models.py
git commit -m "feat: modèles SQLAlchemy Machine/Disk/Snapshot"
Task 3 : Alembic — migration initiale
Files:
-
Create:
alembic.ini -
Create:
api/migrations/env.py -
Create:
api/migrations/script.py.mako -
Create:
api/migrations/versions/0001_initial.py -
Step 1: Créer
alembic.ini(à la racine deapi/)
[alembic]
script_location = migrations
sqlalchemy.url = sqlite:////data/mes_hdd.db
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
- Step 2: Créer
api/migrations/env.py
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
config = context.config
if config.config_file_name:
fileConfig(config.config_file_name)
# Override URL depuis env si présent
db_path = os.environ.get("DB_PATH", "/data/mes_hdd.db")
config.set_main_option("sqlalchemy.url", f"sqlite:///{db_path}")
from api.models import Base
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url, target_metadata=target_metadata,
literal_binds=True, dialect_opts={"paramstyle": "named"})
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
- Step 3: Créer
api/migrations/script.py.mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}
- Step 4: Créer
api/migrations/versions/0001_initial.py
"""Initial schema
Revision ID: 0001
Revises:
Create Date: 2026-05-28
"""
from alembic import op
import sqlalchemy as sa
revision = "0001"
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table("machines",
sa.Column("hostname", sa.String(), primary_key=True),
sa.Column("ip", sa.String()),
sa.Column("os", sa.String()),
sa.Column("os_version", sa.String()),
sa.Column("last_seen", sa.String()),
)
op.create_table("disks",
sa.Column("serial", sa.String(), primary_key=True),
sa.Column("model", sa.String()),
sa.Column("type", sa.String()),
sa.Column("capacity_bytes", sa.Integer()),
sa.Column("capacity_human", sa.String()),
sa.Column("first_seen_host", sa.String()),
sa.Column("first_seen_at", sa.String()),
sa.Column("last_seen_host", sa.String()),
sa.Column("last_seen_at", sa.String()),
sa.Column("smart_status", sa.String()),
)
op.create_table("snapshots",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column("serial", sa.String(), index=True),
sa.Column("hostname", sa.String()),
sa.Column("device", sa.String()),
sa.Column("smart_status", sa.String()),
sa.Column("smart_label", sa.String()),
sa.Column("smart_detail", sa.String()),
sa.Column("smart_raw_json", sa.Text()),
sa.Column("partitions_json", sa.Text()),
sa.Column("collected_at", sa.String()),
)
def downgrade():
op.drop_table("snapshots")
op.drop_table("disks")
op.drop_table("machines")
- Step 5: Vérifier que la migration s'applique
cd api && DB_PATH=/tmp/test_mes_hdd.db alembic upgrade head
Attendu : Running upgrade -> 0001, Initial schema
- Step 6: Nettoyer et committer
rm -f /tmp/test_mes_hdd.db
git add alembic.ini api/migrations/
git commit -m "feat: Alembic migrations initiales (3 tables)"
Task 4 : Schémas Pydantic
Files:
-
Create:
api/schemas.py -
Step 1: Écrire le test de validation
# tests/test_schemas.py
from api.schemas import IngestPayload
def _payload():
return {
"hostname": "pve1", "ip": "10.0.0.1",
"os": "proxmox", "os_version": "8.2",
"collected_at": "2026-05-28T15:00:00+02:00",
"disks": [{
"device": "sda", "path": "/dev/sda",
"by_id": "ata-SN123", "model": "Test Disk",
"serial": "SN123", "type": "HDD",
"capacity_bytes": 1000000000, "capacity_human": "1.0 Go",
"bus": "sata",
"smart": {
"status": "ok", "label": "Bon état",
"detail": "100h · 30°C · aucun secteur défectueux",
"temperature_c": 30, "power_on_hours": 100,
"reallocated_sectors": 0, "pending_sectors": 0,
"uncorrectable_sectors": 0,
},
"partitions": [{
"name": "sda1", "uuid": "abc-123",
"fstype": "ext4", "size_bytes": 500000000,
"size_human": "500 Mo", "used_bytes": 100000000,
"used_human": "100 Mo", "free_bytes": 400000000,
"free_human": "400 Mo", "used_percent": 20,
"mountpoint": "/", "home_users": None, "lvm": None,
}],
}],
}
def test_ingest_payload_valide():
p = IngestPayload.model_validate(_payload())
assert p.hostname == "pve1"
assert len(p.disks) == 1
assert p.disks[0].serial == "SN123"
assert p.disks[0].smart.status == "ok"
assert p.disks[0].partitions[0].uuid == "abc-123"
def test_partition_lvm():
data = _payload()
data["disks"][0]["partitions"][0]["lvm"] = {
"vg_name": "vg_data",
"logical_volumes": [{
"lv_name": "lv_home", "size_human": "300 Go",
"used_human": "50 Go", "free_human": "250 Go",
"used_percent": 17, "fstype": "ext4", "mountpoint": "/home",
}],
}
data["disks"][0]["partitions"][0]["fstype"] = "LVM2_member"
p = IngestPayload.model_validate(data)
lvm = p.disks[0].partitions[0].lvm
assert lvm.vg_name == "vg_data"
assert lvm.logical_volumes[0].lv_name == "lv_home"
- Step 2: Vérifier que le test échoue
cd api && python -m pytest ../tests/test_schemas.py -v
Attendu : ImportError
- Step 3: Créer
api/schemas.py
from __future__ import annotations
from pydantic import BaseModel
from typing import Optional, List
class SmartData(BaseModel):
status: str
label: str
detail: str
temperature_c: Optional[int] = None
power_on_hours: Optional[int] = None
reallocated_sectors: Optional[int] = None
pending_sectors: Optional[int] = None
uncorrectable_sectors: Optional[int] = None
class HomeUser(BaseModel):
user: str
size_bytes: int
size_human: str
class LogicalVolume(BaseModel):
lv_name: str
size_human: str
used_human: Optional[str] = None
free_human: Optional[str] = None
used_percent: Optional[int] = None
fstype: Optional[str] = None
mountpoint: Optional[str] = None
class LvmInfo(BaseModel):
vg_name: str
logical_volumes: List[LogicalVolume]
class Partition(BaseModel):
name: str
uuid: Optional[str] = None
fstype: Optional[str] = None
size_bytes: Optional[int] = None
size_human: Optional[str] = None
used_bytes: Optional[int] = None
used_human: Optional[str] = None
free_bytes: Optional[int] = None
free_human: Optional[str] = None
used_percent: Optional[int] = None
mountpoint: Optional[str] = None
home_users: Optional[List[HomeUser]] = None
lvm: Optional[LvmInfo] = None
class DiskPayload(BaseModel):
device: str
path: str
by_id: Optional[str] = None
model: str
serial: str
type: str
capacity_bytes: int
capacity_human: str
bus: str
smart: SmartData
partitions: List[Partition] = []
proxmox_role: Optional[str] = None
class IngestPayload(BaseModel):
hostname: str
ip: str
os: str
os_version: str
collected_at: str
disks: List[DiskPayload]
class IngestResponse(BaseModel):
accepted: int
hostname: str
class DiskSummary(BaseModel):
serial: str
model: str
type: str
capacity_human: str
last_seen_host: str
last_seen_at: str
smart_status: str
moved: bool
class DiskDetail(BaseModel):
serial: str
model: str
type: str
capacity_human: str
first_seen_host: str
first_seen_at: str
last_seen_host: str
last_seen_at: str
smart_status: str
moved: bool
snapshots: List[dict]
class MachineSummary(BaseModel):
hostname: str
ip: str
os: str
os_version: str
last_seen: str
disk_count: int
class MachineDetail(BaseModel):
hostname: str
ip: str
os: str
os_version: str
last_seen: str
disks: List[DiskSummary]
- Step 4: Vérifier que les tests passent
cd api && python -m pytest ../tests/test_schemas.py -v
Attendu : 2 passed
- Step 5: Commit
git add api/schemas.py tests/test_schemas.py
git commit -m "feat: schémas Pydantic IngestPayload + réponses API"
Task 5 : Service d'ingest + route POST /api/ingest
Files:
-
Create:
api/services/__init__.py -
Create:
api/services/ingest_service.py -
Create:
api/routes/__init__.py -
Create:
api/routes/ingest.py -
Create:
tests/conftest.py -
Create:
tests/test_ingest.py -
Step 1: Créer
tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from api.models import Base
from api.database import get_db
@pytest.fixture(scope="function")
def db_session():
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(engine)
@pytest.fixture(scope="function")
def client(db_session):
from api.main import app
def override():
yield db_session
app.dependency_overrides[get_db] = override
yield TestClient(app)
app.dependency_overrides.clear()
- Step 2: Écrire
tests/test_ingest.py
def _disk(serial="SN123"):
return {
"device": "sda", "path": "/dev/sda", "by_id": None,
"model": "Test Disk", "serial": serial, "type": "HDD",
"capacity_bytes": 1_000_000_000, "capacity_human": "1.0 Go",
"bus": "sata",
"smart": {"status": "ok", "label": "Bon état",
"detail": "100h · 30°C", "temperature_c": 30,
"power_on_hours": 100, "reallocated_sectors": 0,
"pending_sectors": 0, "uncorrectable_sectors": 0},
"partitions": [],
}
def _payload(**kwargs):
base = {"hostname": "pve1", "ip": "10.0.0.1",
"os": "proxmox", "os_version": "8.2",
"collected_at": "2026-05-28T15:00:00+02:00",
"disks": [_disk()]}
base.update(kwargs)
return base
def test_ingest_crée_disque_et_machine(client):
r = client.post("/api/ingest", json=_payload())
assert r.status_code == 200
assert r.json() == {"accepted": 1, "hostname": "pve1"}
def test_ingest_upsert_machine(client):
client.post("/api/ingest", json=_payload(ip="10.0.0.1"))
client.post("/api/ingest", json=_payload(ip="10.0.0.99"))
r = client.get("/api/machines/pve1")
assert r.json()["ip"] == "10.0.0.99"
def test_ingest_détecte_mouvement_disque(client):
client.post("/api/ingest", json=_payload(hostname="pve1"))
client.post("/api/ingest", json={**_payload(hostname="pve2"),
"disks": [_disk("SN123")]})
r = client.get("/api/disks/SN123")
data = r.json()
assert data["first_seen_host"] == "pve1"
assert data["last_seen_host"] == "pve2"
assert data["moved"] is True
def test_ingest_crée_snapshot_par_envoi(client):
client.post("/api/ingest", json=_payload())
client.post("/api/ingest", json=_payload())
r = client.get("/api/disks/SN123")
assert len(r.json()["snapshots"]) == 2
- Step 3: Vérifier que les tests échouent
cd api && python -m pytest ../tests/test_ingest.py -v
Attendu : ImportError ou 404
- Step 4: Créer
api/services/__init__.py(vide)
touch api/services/__init__.py api/routes/__init__.py
- Step 5: Créer
api/services/ingest_service.py
import json
from sqlalchemy.orm import Session
from api.models import Machine, Disk, Snapshot
from api.schemas import IngestPayload
def process_ingest(payload: IngestPayload, db: Session) -> int:
ts = payload.collected_at
machine = db.get(Machine, payload.hostname)
if machine is None:
machine = Machine(hostname=payload.hostname)
machine.ip = payload.ip
machine.os = payload.os
machine.os_version = payload.os_version
machine.last_seen = ts
db.merge(machine)
count = 0
for d in payload.disks:
disk = db.get(Disk, d.serial)
if disk is None:
disk = Disk(serial=d.serial,
first_seen_host=payload.hostname,
first_seen_at=ts)
disk.model = d.model
disk.type = d.type
disk.capacity_bytes = d.capacity_bytes
disk.capacity_human = d.capacity_human
disk.last_seen_host = payload.hostname
disk.last_seen_at = ts
disk.smart_status = d.smart.status
db.merge(disk)
snap = Snapshot(
serial=d.serial,
hostname=payload.hostname,
device=d.device,
smart_status=d.smart.status,
smart_label=d.smart.label,
smart_detail=d.smart.detail,
smart_raw_json=json.dumps({
"temperature_c": d.smart.temperature_c,
"power_on_hours": d.smart.power_on_hours,
"reallocated_sectors": d.smart.reallocated_sectors,
"pending_sectors": d.smart.pending_sectors,
"uncorrectable_sectors": d.smart.uncorrectable_sectors,
}),
partitions_json=json.dumps([p.model_dump() for p in d.partitions]),
collected_at=ts,
)
db.add(snap)
count += 1
db.commit()
return count
- Step 6: Créer
api/routes/ingest.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from api.database import get_db
from api.schemas import IngestPayload, IngestResponse
from api.services.ingest_service import process_ingest
router = APIRouter()
@router.post("/ingest", response_model=IngestResponse)
def ingest(payload: IngestPayload, db: Session = Depends(get_db)):
count = process_ingest(payload, db)
return IngestResponse(accepted=count, hostname=payload.hostname)
- Step 7: Créer
api/main.pyminimal pour que les tests tournent
from fastapi import FastAPI
from api.routes.ingest import router as ingest_router
app = FastAPI(title="Inventaire HDD")
app.include_router(ingest_router, prefix="/api")
- Step 8: Vérifier que les tests passent
cd api && python -m pytest ../tests/test_ingest.py -v
Attendu : 4 passed
- Step 9: Commit
git add api/services/ api/routes/ingest.py api/routes/__init__.py api/main.py tests/conftest.py tests/test_ingest.py
git commit -m "feat: service ingest + POST /api/ingest"
Task 6 : Routes lecture disques + machines
Files:
-
Create:
api/routes/disks.py -
Create:
api/routes/machines.py -
Create:
tests/test_disks_api.py -
Create:
tests/test_machines_api.py -
Step 1: Écrire
tests/test_disks_api.py
def _ingest(client, hostname="pve1", serial="SN123", smart_status="ok"):
client.post("/api/ingest", json={
"hostname": hostname, "ip": "10.0.0.1",
"os": "proxmox", "os_version": "8.2",
"collected_at": "2026-05-28T15:00:00+02:00",
"disks": [{"device": "sda", "path": "/dev/sda", "by_id": None,
"model": "Test", "serial": serial, "type": "HDD",
"capacity_bytes": 1000000, "capacity_human": "1 Mo",
"bus": "sata",
"smart": {"status": smart_status, "label": "Bon état",
"detail": "", "temperature_c": None,
"power_on_hours": None, "reallocated_sectors": None,
"pending_sectors": None, "uncorrectable_sectors": None},
"partitions": []}],
})
def test_liste_disques(client):
_ingest(client)
r = client.get("/api/disks")
assert r.status_code == 200
assert len(r.json()) == 1
assert r.json()[0]["serial"] == "SN123"
def test_détail_disque(client):
_ingest(client, hostname="pve1")
r = client.get("/api/disks/SN123")
assert r.status_code == 200
data = r.json()
assert data["serial"] == "SN123"
assert data["moved"] is False
assert len(data["snapshots"]) == 1
def test_disque_inexistant(client):
r = client.get("/api/disks/INCONNU")
assert r.status_code == 404
- Step 2: Écrire
tests/test_machines_api.py
def _ingest(client, hostname="pve1"):
client.post("/api/ingest", json={
"hostname": hostname, "ip": "10.0.0.5",
"os": "debian", "os_version": "12",
"collected_at": "2026-05-28T15:00:00+02:00",
"disks": [{"device": "sda", "path": "/dev/sda", "by_id": None,
"model": "M", "serial": f"SN-{hostname}", "type": "SSD",
"capacity_bytes": 500000, "capacity_human": "500 Ko",
"bus": "sata",
"smart": {"status": "ok", "label": "Bon état", "detail": "",
"temperature_c": None, "power_on_hours": None,
"reallocated_sectors": None, "pending_sectors": None,
"uncorrectable_sectors": None},
"partitions": []}],
})
def test_liste_machines(client):
_ingest(client, "pve1")
_ingest(client, "pve2")
r = client.get("/api/machines")
assert r.status_code == 200
assert len(r.json()) == 2
def test_détail_machine(client):
_ingest(client, "pve1")
r = client.get("/api/machines/pve1")
assert r.status_code == 200
data = r.json()
assert data["hostname"] == "pve1"
assert data["disk_count"] == 1
def test_disques_machine(client):
_ingest(client, "pve1")
r = client.get("/api/machines/pve1/disks")
assert r.status_code == 200
assert len(r.json()) == 1
def test_machine_inexistante(client):
r = client.get("/api/machines/INCONNU")
assert r.status_code == 404
- Step 3: Créer
api/routes/disks.py
import json
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from api.database import get_db
from api.models import Disk, Snapshot
from api.schemas import DiskSummary, DiskDetail
router = APIRouter()
def _to_summary(disk: Disk) -> DiskSummary:
return DiskSummary(
serial=disk.serial, model=disk.model, type=disk.type,
capacity_human=disk.capacity_human,
last_seen_host=disk.last_seen_host, last_seen_at=disk.last_seen_at,
smart_status=disk.smart_status,
moved=disk.first_seen_host != disk.last_seen_host,
)
@router.get("/disks", response_model=list[DiskSummary])
def list_disks(db: Session = Depends(get_db)):
return [_to_summary(d) for d in db.query(Disk).all()]
@router.get("/disks/{serial}", response_model=DiskDetail)
def get_disk(serial: str, db: Session = Depends(get_db)):
disk = db.get(Disk, serial)
if disk is None:
raise HTTPException(status_code=404, detail="Disque introuvable")
snaps = (db.query(Snapshot)
.filter(Snapshot.serial == serial)
.order_by(Snapshot.collected_at.desc())
.all())
return DiskDetail(
serial=disk.serial, model=disk.model, type=disk.type,
capacity_human=disk.capacity_human,
first_seen_host=disk.first_seen_host, first_seen_at=disk.first_seen_at,
last_seen_host=disk.last_seen_host, last_seen_at=disk.last_seen_at,
smart_status=disk.smart_status,
moved=disk.first_seen_host != disk.last_seen_host,
snapshots=[{
"hostname": s.hostname, "device": s.device,
"smart_status": s.smart_status, "smart_label": s.smart_label,
"smart_detail": s.smart_detail,
"smart_raw": json.loads(s.smart_raw_json),
"partitions": json.loads(s.partitions_json),
"collected_at": s.collected_at,
} for s in snaps],
)
- Step 4: Créer
api/routes/machines.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from api.database import get_db
from api.models import Machine, Disk
from api.schemas import MachineSummary, MachineDetail, DiskSummary
router = APIRouter()
def _disk_summary(disk: Disk) -> DiskSummary:
return DiskSummary(
serial=disk.serial, model=disk.model, type=disk.type,
capacity_human=disk.capacity_human,
last_seen_host=disk.last_seen_host, last_seen_at=disk.last_seen_at,
smart_status=disk.smart_status,
moved=disk.first_seen_host != disk.last_seen_host,
)
@router.get("/machines", response_model=list[MachineSummary])
def list_machines(db: Session = Depends(get_db)):
result = []
for m in db.query(Machine).all():
count = db.query(Disk).filter(Disk.last_seen_host == m.hostname).count()
result.append(MachineSummary(
hostname=m.hostname, ip=m.ip, os=m.os, os_version=m.os_version,
last_seen=m.last_seen, disk_count=count,
))
return result
@router.get("/machines/{hostname}", response_model=MachineDetail)
def get_machine(hostname: str, db: Session = Depends(get_db)):
m = db.get(Machine, hostname)
if m is None:
raise HTTPException(status_code=404, detail="Machine introuvable")
disks = db.query(Disk).filter(Disk.last_seen_host == hostname).all()
return MachineDetail(
hostname=m.hostname, ip=m.ip, os=m.os, os_version=m.os_version,
last_seen=m.last_seen, disk_count=len(disks),
disks=[_disk_summary(d) for d in disks],
)
@router.get("/machines/{hostname}/disks", response_model=list[DiskSummary])
def get_machine_disks(hostname: str, db: Session = Depends(get_db)):
m = db.get(Machine, hostname)
if m is None:
raise HTTPException(status_code=404, detail="Machine introuvable")
disks = db.query(Disk).filter(Disk.last_seen_host == hostname).all()
return [_disk_summary(d) for d in disks]
- Step 5: Mettre à jour
api/main.pypour inclure les nouvelles routes
from fastapi import FastAPI
from api.routes.ingest import router as ingest_router
from api.routes.disks import router as disks_router
from api.routes.machines import router as machines_router
app = FastAPI(title="Inventaire HDD")
app.include_router(ingest_router, prefix="/api")
app.include_router(disks_router, prefix="/api")
app.include_router(machines_router, prefix="/api")
- Step 6: Vérifier que tous les tests passent
cd api && python -m pytest ../tests/test_disks_api.py ../tests/test_machines_api.py ../tests/test_ingest.py -v
Attendu : 11 passed
- Step 7: Commit
git add api/routes/disks.py api/routes/machines.py api/main.py tests/test_disks_api.py tests/test_machines_api.py
git commit -m "feat: routes GET /api/disks/* et /api/machines/*"
Task 7 : Routes agents IA
Files:
-
Create:
api/routes/ai.py -
Create:
tests/test_ai_api.py -
Step 1: Écrire
tests/test_ai_api.py
def _ingest(client, hostname, serial, smart_status="ok", host2=None):
payload = {
"hostname": hostname, "ip": "10.0.0.1",
"os": "debian", "os_version": "12",
"collected_at": "2026-05-28T15:00:00+02:00",
"disks": [{"device": "sda", "path": "/dev/sda", "by_id": None,
"model": "TestDisk", "serial": serial, "type": "HDD",
"capacity_bytes": 1000000, "capacity_human": "1 Mo",
"bus": "sata",
"smart": {"status": smart_status, "label": "Bon état",
"detail": "", "temperature_c": None,
"power_on_hours": None, "reallocated_sectors": None,
"pending_sectors": None, "uncorrectable_sectors": None},
"partitions": [{
"name": "sda1", "uuid": None, "fstype": "ext4",
"size_bytes": 1000000, "size_human": "1 Mo",
"used_bytes": 500000, "used_human": "500 Ko",
"free_bytes": 500000, "free_human": "500 Ko",
"used_percent": 50, "mountpoint": "/home",
"home_users": [{"user": "gilles", "size_bytes": 400000,
"size_human": "400 Ko"}],
"lvm": None,
}]}],
}
client.post("/api/ingest", json=payload)
def test_ai_summary(client):
_ingest(client, "pve1", "SN1")
r = client.get("/api/ai/summary")
assert r.status_code == 200
data = r.json()
assert data["total_machines"] == 1
assert data["total_disks"] == 1
assert "disks_by_status" in data
def test_ai_at_risk_vide(client):
_ingest(client, "pve1", "SN1", smart_status="ok")
r = client.get("/api/ai/at-risk")
assert r.status_code == 200
assert r.json() == []
def test_ai_at_risk_avec_warn(client):
_ingest(client, "pve1", "SN1", smart_status="warn")
r = client.get("/api/ai/at-risk")
assert len(r.json()) == 1
assert r.json()[0]["serial"] == "SN1"
def test_ai_moved_disks(client):
_ingest(client, "pve1", "SN1")
_ingest(client, "pve2", "SN1")
r = client.get("/api/ai/moved-disks")
assert len(r.json()) == 1
assert r.json()[0]["serial"] == "SN1"
assert r.json()[0]["from_host"] == "pve1"
assert r.json()[0]["to_host"] == "pve2"
def test_ai_backup_needed(client):
_ingest(client, "pve1", "SN1")
r = client.get("/api/ai/backup-needed")
assert r.status_code == 200
result = r.json()
assert len(result) == 1
assert result[0]["serial"] == "SN1"
assert result[0]["home_users"][0]["user"] == "gilles"
def test_ai_machine_detail(client):
_ingest(client, "pve1", "SN1")
r = client.get("/api/ai/machines/pve1")
assert r.status_code == 200
assert r.json()["hostname"] == "pve1"
assert len(r.json()["disks"]) == 1
- Step 2: Créer
api/routes/ai.py
import json
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from api.database import get_db
from api.models import Disk, Machine, Snapshot
router = APIRouter()
@router.get("/ai/summary")
def ai_summary(db: Session = Depends(get_db)):
disks = db.query(Disk).all()
machines = db.query(Machine).all()
by_status: dict[str, int] = {}
for d in disks:
by_status[d.smart_status] = by_status.get(d.smart_status, 0) + 1
return {
"total_machines": len(machines),
"total_disks": len(disks),
"disks_by_status": by_status,
"machines": [{"hostname": m.hostname, "os": m.os, "last_seen": m.last_seen}
for m in machines],
"disks": [{"serial": d.serial, "model": d.model, "type": d.type,
"host": d.last_seen_host, "smart": d.smart_status,
"capacity": d.capacity_human, "moved": d.first_seen_host != d.last_seen_host}
for d in disks],
}
@router.get("/ai/at-risk")
def ai_at_risk(db: Session = Depends(get_db)):
disks = db.query(Disk).filter(Disk.smart_status.in_(["warn", "fail"])).all()
result = []
for d in disks:
snap = (db.query(Snapshot)
.filter(Snapshot.serial == d.serial)
.order_by(Snapshot.collected_at.desc())
.first())
result.append({
"serial": d.serial, "model": d.model, "host": d.last_seen_host,
"smart_status": d.smart_status,
"smart_label": snap.smart_label if snap else "",
"smart_detail": snap.smart_detail if snap else "",
"last_seen": d.last_seen_at,
})
return result
@router.get("/ai/moved-disks")
def ai_moved_disks(db: Session = Depends(get_db)):
disks = db.query(Disk).all()
return [
{"serial": d.serial, "model": d.model,
"from_host": d.first_seen_host, "from_date": d.first_seen_at,
"to_host": d.last_seen_host, "to_date": d.last_seen_at}
for d in disks if d.first_seen_host != d.last_seen_host
]
@router.get("/ai/backup-needed")
def ai_backup_needed(db: Session = Depends(get_db)):
snaps = (db.query(Snapshot)
.order_by(Snapshot.collected_at.desc())
.all())
seen = set()
result = []
for snap in snaps:
if snap.serial in seen:
continue
seen.add(snap.serial)
parts = json.loads(snap.partitions_json)
home_users = []
for p in parts:
if p.get("home_users"):
home_users.extend(p["home_users"])
if home_users:
disk = db.get(Disk, snap.serial)
result.append({
"serial": snap.serial,
"model": disk.model if disk else "",
"host": snap.hostname,
"home_users": sorted(home_users, key=lambda u: u["size_bytes"], reverse=True),
})
result.sort(key=lambda x: sum(u["size_bytes"] for u in x["home_users"]), reverse=True)
return result
@router.get("/ai/machines/{hostname}")
def ai_machine_detail(hostname: str, db: Session = Depends(get_db)):
m = db.get(Machine, hostname)
if m is None:
raise HTTPException(status_code=404, detail="Machine introuvable")
disks = db.query(Disk).filter(Disk.last_seen_host == hostname).all()
disks_data = []
for d in disks:
snap = (db.query(Snapshot)
.filter(Snapshot.serial == d.serial, Snapshot.hostname == hostname)
.order_by(Snapshot.collected_at.desc())
.first())
disks_data.append({
"serial": d.serial, "model": d.model, "type": d.type,
"capacity": d.capacity_human, "smart_status": d.smart_status,
"smart_label": snap.smart_label if snap else "",
"smart_detail": snap.smart_detail if snap else "",
"partitions": json.loads(snap.partitions_json) if snap else [],
})
return {"hostname": m.hostname, "ip": m.ip, "os": m.os,
"os_version": m.os_version, "last_seen": m.last_seen,
"disks": disks_data}
- Step 3: Mettre à jour
api/main.py
from fastapi import FastAPI
from api.routes.ingest import router as ingest_router
from api.routes.disks import router as disks_router
from api.routes.machines import router as machines_router
from api.routes.ai import router as ai_router
app = FastAPI(title="Inventaire HDD")
app.include_router(ingest_router, prefix="/api")
app.include_router(disks_router, prefix="/api")
app.include_router(machines_router, prefix="/api")
app.include_router(ai_router, prefix="/api")
- Step 4: Vérifier que tous les tests passent
cd api && python -m pytest ../tests/ -v --ignore=../tests/test_inventaire.py
Attendu : ~17 passed
- Step 5: Commit
git add api/routes/ai.py api/main.py tests/test_ai_api.py
git commit -m "feat: routes GET /api/ai/* (summary, at-risk, moved, backup)"
Task 8 : FastAPI — démarrage + fichiers statiques
Files:
-
Modify:
api/main.py -
Step 1: Mettre à jour
api/main.pyavec lifespan + static
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from alembic.config import Config
from alembic import command
import os
from api.routes.ingest import router as ingest_router
from api.routes.disks import router as disks_router
from api.routes.machines import router as machines_router
from api.routes.ai import router as ai_router
@asynccontextmanager
async def lifespan(app: FastAPI):
alembic_cfg = Config("/app/alembic.ini")
command.upgrade(alembic_cfg, "head")
yield
app = FastAPI(title="Inventaire HDD", lifespan=lifespan)
app.include_router(ingest_router, prefix="/api")
app.include_router(disks_router, prefix="/api")
app.include_router(machines_router, prefix="/api")
app.include_router(ai_router, prefix="/api")
- Step 2: Vérifier que tous les tests passent encore
cd api && python -m pytest ../tests/ -v --ignore=../tests/test_inventaire.py
Attendu : ~17 passed
- Step 3: Commit
git add api/main.py
git commit -m "feat: lifespan FastAPI avec alembic upgrade au démarrage"
Task 9 : Script client inventaire.py
Files:
-
Create:
inventaire.py -
Create:
tests/test_inventaire.py -
Step 1: Écrire
tests/test_inventaire.py
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from unittest.mock import patch, MagicMock
import json
import inventaire
# ── detect_os ─────────────────────────────────────────────────────────────
def test_detect_proxmox(tmp_path):
pve = tmp_path / "pve"
pve.mkdir()
with patch("os.path.isdir", side_effect=lambda p: p == str(pve) if "pve" in p else os.path.isdir(p)):
with patch("builtins.open", MagicMock(side_effect=FileNotFoundError)):
pass
# Tester via os-release
os_release = tmp_path / "os-release"
os_release.write_text('ID=debian\nVARIANT_ID=proxmox\nVERSION_ID="8.2"\n')
with patch("builtins.open", open.__class__(os_release)):
pass
# Test simple via parsing direct
content = 'ID=debian\nVARIANT_ID=proxmox\nVERSION_ID="8.2"\n'
with patch("builtins.open", MagicMock(return_value=iter(content.splitlines(keepends=True)))):
with patch("os.path.isdir", return_value=False):
os_type, version = inventaire.detect_os()
assert os_type == "proxmox"
assert version == "8.2"
def test_detect_ubuntu():
content = 'ID=ubuntu\nVERSION_ID="22.04"\n'
with patch("builtins.open", MagicMock(return_value=iter(content.splitlines(keepends=True)))):
with patch("os.path.isdir", return_value=False):
os_type, version = inventaire.detect_os()
assert os_type == "ubuntu"
def test_detect_debian():
content = 'ID=debian\nVERSION_ID="12"\n'
with patch("builtins.open", MagicMock(return_value=iter(content.splitlines(keepends=True)))):
with patch("os.path.isdir", return_value=False):
os_type, version = inventaire.detect_os()
assert os_type == "debian"
# ── bytes_human ────────────────────────────────────────────────────────────
def test_bytes_human_to():
assert inventaire.bytes_human(1_099_511_627_776) == "1.0 To"
def test_bytes_human_go():
assert inventaire.bytes_human(1_073_741_824) == "1.0 Go"
def test_bytes_human_none():
assert inventaire.bytes_human(None) == "?"
# ── get_smart ────────────────────────────────────────────────────────────
SMART_PASSED = """
SMART overall-health self-assessment test result: PASSED
190 Airflow_Temperature_Cel 0x0022 073 ... 000 Old_age ... 27
9 Power_On_Hours 0x0032 099 ... 000 Old_age ... 2847
5 Reallocated_Sector_Ct 0x0033 100 ... 000 Pre-fail ... 0
197 Current_Pending_Sector 0x0012 100 ... 000 Old_age ... 0
198 Offline_Uncorrectable 0x0010 100 ... 000 Old_age ... 0
"""
SMART_FAILED = "SMART overall-health self-assessment test result: FAILED!"
SMART_WARN = """
SMART overall-health self-assessment test result: PASSED
5 Reallocated_Sector_Ct 0x0033 100 ... 000 Pre-fail ... 3
197 Current_Pending_Sector 0x0012 100 ... 000 Old_age ... 0
198 Offline_Uncorrectable 0x0010 100 ... 000 Old_age ... 0
"""
def test_smart_ok():
with patch.object(inventaire, "run", return_value=SMART_PASSED):
r = inventaire.get_smart("/dev/sda")
assert r["status"] == "ok"
assert r["label"] == "Bon état"
assert r["power_on_hours"] == 2847
assert r["temperature_c"] == 27
def test_smart_fail():
with patch.object(inventaire, "run", return_value=SMART_FAILED):
r = inventaire.get_smart("/dev/sda")
assert r["status"] == "fail"
assert r["label"] == "Défaillance probable"
def test_smart_warn():
with patch.object(inventaire, "run", return_value=SMART_WARN):
r = inventaire.get_smart("/dev/sda")
assert r["status"] == "warn"
assert "réalloué" in r["detail"]
def test_smart_unavailable():
with patch.object(inventaire, "run", return_value=None):
r = inventaire.get_smart("/dev/sda")
assert r["status"] == "unavailable"
# ── get_home_users ────────────────────────────────────────────────────────
def test_home_users():
du_output = "400000\t/home/gilles\n100000\t/home/alice\n"
with patch.object(inventaire, "run", return_value=du_output):
with patch("os.path.isdir", return_value=True):
users = inventaire.get_home_users()
assert users[0]["user"] == "gilles"
assert users[0]["size_bytes"] == 400000
assert users[1]["user"] == "alice"
- Step 2: Vérifier que les tests échouent
python -m pytest tests/test_inventaire.py -v
Attendu : ImportError
- Step 3: Créer
inventaire.py
#!/usr/bin/env python3
"""
inventaire.py — Inventaire disques HDD/SSD/NVMe
Exécuter en root : sudo python3 inventaire.py
Dépendances : stdlib uniquement (Python 3.9+)
"""
import json, os, re, subprocess, sys, urllib.request, urllib.error
from datetime import datetime, timezone
API_URL = os.environ.get("MES_HDD_API", "http://10.0.0.50:8088").rstrip("/")
# ── Helpers ──────────────────────────────────────────────────────────────────
def run(cmd, default=None):
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
return r.stdout.strip() if r.returncode == 0 else default
except Exception:
return default
def bytes_human(n):
if n is None:
return "?"
for unit in ("o", "Ko", "Mo", "Go", "To"):
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} Po"
# ── Détection OS ─────────────────────────────────────────────────────────────
def detect_os():
info = {}
try:
with open("/etc/os-release") as f:
for line in f:
if "=" in line:
k, v = line.strip().split("=", 1)
info[k] = v.strip('"')
except FileNotFoundError:
pass
os_id = info.get("ID", "").lower()
variant = info.get("VARIANT_ID", "").lower()
version = info.get("VERSION_ID", "")
if os.path.isdir("/etc/pve") or variant == "proxmox":
return "proxmox", version
if os_id == "ubuntu":
return "ubuntu", version
return "debian", version
# ── Machine ───────────────────────────────────────────────────────────────────
def get_hostname():
return run(["hostname"], default="inconnu")
def get_ip():
out = run(["ip", "route", "get", "1.1.1.1"])
if out:
m = re.search(r'src\s+(\S+)', out)
if m:
return m.group(1)
out = run(["hostname", "-I"])
if out:
return out.split()[0]
return "inconnu"
# ── SMART ──────────────────────────────────────────────────────────────────────
def _extract_attr(output, name):
m = re.search(rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)", output)
return int(m.group(1)) if m else None
def _smart_unavailable(reason):
return {"status": "unavailable", "label": "SMART indisponible", "detail": reason,
"temperature_c": None, "power_on_hours": None, "reallocated_sectors": None,
"pending_sectors": None, "uncorrectable_sectors": None}
def get_smart(dev):
out = run(["smartctl", "-H", "-A", "-i", dev])
if out is None:
return _smart_unavailable("smartctl absent ou accès refusé")
temp = (_extract_attr(out, "Temperature_Celsius")
or _extract_attr(out, "Airflow_Temperature_Cel"))
poh = _extract_attr(out, "Power_On_Hours")
real = _extract_attr(out, "Reallocated_Sector_Ct")
pend = _extract_attr(out, "Current_Pending_Sector")
uncr = _extract_attr(out, "Offline_Uncorrectable")
base = {"temperature_c": temp, "power_on_hours": poh,
"reallocated_sectors": real, "pending_sectors": pend,
"uncorrectable_sectors": uncr}
if "FAILED!" in out:
return {**base, "status": "fail", "label": "Défaillance probable",
"detail": "Prévoir le remplacement du disque"}
if "PASSED" in out or "Passed" in out:
bad = [v for v in [real, pend, uncr] if v and v > 0]
if bad:
issues = []
if real and real > 0: issues.append(f"{real} secteur(s) réalloué(s)")
if pend and pend > 0: issues.append(f"{pend} secteur(s) en attente")
if uncr and uncr > 0: issues.append(f"{uncr} secteur(s) non corrigeable(s)")
return {**base, "status": "warn", "label": "Attention",
"detail": ", ".join(issues) + " — disque à surveiller"}
detail_parts = []
if poh is not None: detail_parts.append(f"{poh:,}h d'utilisation".replace(",", " "))
if temp is not None: detail_parts.append(f"{temp}°C")
detail_parts.append("aucun secteur défectueux")
return {**base, "status": "ok", "label": "Bon état",
"detail": " · ".join(detail_parts)}
return _smart_unavailable("résultat SMART non interprétable")
# ── Métadonnées disque ────────────────────────────────────────────────────────
def get_by_id(dev_path):
out = run(["find", "/dev/disk/by-id", "-type", "l"])
if not out:
return None
for link in out.splitlines():
if "-part" in link:
continue
target = run(["readlink", "-f", link])
if target == dev_path:
return os.path.basename(link)
return None
def get_bus(dev_name):
if dev_name.startswith("nvme"):
return "nvme"
out = run(["udevadm", "info", "--query=property", f"--name=/dev/{dev_name}"])
if out:
m = re.search(r"^ID_BUS=(.+)$", out, re.MULTILINE)
if m:
return m.group(1).lower()
return "inconnu"
def disk_type(name, rota):
if name.startswith("nvme"):
return "NVMe"
if str(rota) == "1":
return "HDD"
if str(rota) == "0":
return "SSD"
return "inconnu"
# ── Espace disque (df) ────────────────────────────────────────────────────────
def get_df_map():
out = run(["df", "--output=target,size,used,avail", "-B1"])
result = {}
if not out:
return result
for line in out.splitlines()[1:]:
parts = line.split()
if len(parts) < 4:
continue
try:
result[parts[0]] = {"size_bytes": int(parts[1]),
"used_bytes": int(parts[2]),
"free_bytes": int(parts[3])}
except ValueError:
pass
return result
# ── LVM ───────────────────────────────────────────────────────────────────────
def _lv_size_human(s):
if not s:
return "?"
s = s.strip()
unit_map = {"k": "Ko", "m": "Mo", "g": "Go", "t": "To"}
if s and s[-1].lower() in unit_map:
try:
return f"{float(s[:-1]):.1f} {unit_map[s[-1].lower()]}"
except ValueError:
pass
return s
def get_lvm_map():
pvs_out = run(["pvs", "--noheadings", "--reportformat", "json",
"-o", "pv_name,vg_name"])
if not pvs_out:
return {}
try:
pvs = json.loads(pvs_out)["report"][0]["pv"]
except (json.JSONDecodeError, KeyError, IndexError):
return {}
lvs_by_vg = {}
lvs_out = run(["lvs", "--noheadings", "--reportformat", "json",
"-o", "lv_name,vg_name,lv_size,lv_path"])
if lvs_out:
try:
for lv in json.loads(lvs_out)["report"][0]["lv"]:
lvs_by_vg.setdefault(lv.get("vg_name", ""), []).append(lv)
except (json.JSONDecodeError, KeyError, IndexError):
pass
result = {}
for pv in pvs:
pv_name = pv.get("pv_name", "")
vg_name = pv.get("vg_name", "")
if pv_name and vg_name:
result[pv_name] = {
"vg_name": vg_name,
"logical_volumes": [
{"lv_name": lv.get("lv_name", ""),
"size_human": _lv_size_human(lv.get("lv_size", ""))}
for lv in lvs_by_vg.get(vg_name, [])
],
}
return result
# ── /home users ───────────────────────────────────────────────────────────────
def get_home_users():
if not os.path.isdir("/home"):
return []
out = run(["du", "--max-depth=1", "-b", "/home"])
if out is None:
return None
entries = []
for line in out.splitlines():
parts = line.split("\t", 1)
if len(parts) != 2:
continue
path = parts[1].strip()
if path.rstrip("/") in ("/home",):
continue
try:
size = int(parts[0])
user = os.path.basename(path.rstrip("/"))
entries.append({"user": user, "size_bytes": size,
"size_human": bytes_human(size)})
except ValueError:
continue
return sorted(entries, key=lambda x: x["size_bytes"], reverse=True)
# ── Proxmox ───────────────────────────────────────────────────────────────────
def get_proxmox_role(dev_name):
zpool_out = run(["zpool", "status", "-P"])
if zpool_out and f"/dev/{dev_name}" in zpool_out:
return "zfs_pool"
ceph_dir = "/var/lib/ceph/osd"
if os.path.isdir(ceph_dir):
try:
for entry in os.listdir(ceph_dir):
link = os.path.join(ceph_dir, entry, "block")
if os.path.islink(link) and dev_name in os.path.realpath(link):
return "ceph_osd"
except OSError:
pass
return None
# ── Construction des partitions ───────────────────────────────────────────────
def build_partitions(children, df_map, lvm_map, home_done, os_type):
parts = []
for child in (children or []):
ctype = child.get("type")
if ctype not in ("part", "lvm"):
continue
name = child.get("name", "")
fstype = child.get("fstype") or None
mountpoint = child.get("mountpoint") or None
if fstype == "squashfs": # Ubuntu snap — ignorer
continue
size_b = child.get("size")
part = {
"name": name, "uuid": child.get("uuid") or None,
"fstype": fstype,
"size_bytes": size_b, "size_human": bytes_human(size_b),
"used_bytes": None, "used_human": None,
"free_bytes": None, "free_human": None,
"used_percent": None, "mountpoint": mountpoint,
"home_users": None, "lvm": None,
}
if mountpoint and mountpoint in df_map:
df = df_map[mountpoint]
part["used_bytes"] = df["used_bytes"]
part["used_human"] = bytes_human(df["used_bytes"])
part["free_bytes"] = df["free_bytes"]
part["free_human"] = bytes_human(df["free_bytes"])
if df["size_bytes"] > 0:
part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100)
if mountpoint == "/home" and not home_done[0]:
part["home_users"] = get_home_users()
home_done[0] = True
dev_path = f"/dev/{name}"
if fstype == "LVM2_member" and dev_path in lvm_map:
lvm_info = lvm_map[dev_path]
lvs = []
for lv in lvm_info["logical_volumes"]:
lv_entry = dict(lv)
lv_entry.setdefault("used_human", None)
lv_entry.setdefault("free_human", None)
lv_entry.setdefault("used_percent", None)
lv_entry.setdefault("fstype", None)
lv_entry.setdefault("mountpoint", None)
lvs.append(lv_entry)
part["lvm"] = {"vg_name": lvm_info["vg_name"], "logical_volumes": lvs}
# Traiter récursivement les enfants LVM
if child.get("children"):
for lv_child in child["children"]:
lv_mp = lv_child.get("mountpoint") or None
lv_name = lv_child.get("name", "")
if part["lvm"] and lv_mp and lv_mp in df_map:
df = df_map[lv_mp]
for lv in part["lvm"]["logical_volumes"]:
if lv["lv_name"] in lv_name:
lv["mountpoint"] = lv_mp
lv["fstype"] = lv_child.get("fstype")
lv["used_human"] = bytes_human(df["used_bytes"])
lv["free_human"] = bytes_human(df["free_bytes"])
if df["size_bytes"] > 0:
lv["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100)
if lv_mp == "/home" and not home_done[0]:
for lv in (part["lvm"]["logical_volumes"] if part["lvm"] else []):
if lv.get("mountpoint") == "/home":
lv["home_users"] = get_home_users()
home_done[0] = True
parts.append(part)
return parts
# ── Collecte principale ────────────────────────────────────────────────────────
def collect():
os_type, os_version = detect_os()
hostname = get_hostname()
ip = get_ip()
df_map = get_df_map()
lvm_map = get_lvm_map()
output = run(["lsblk", "-J", "-b",
"-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME"])
if not output:
print("[inventaire] Impossible de lister les disques via lsblk", file=sys.stderr)
sys.exit(1)
try:
blk = json.loads(output)
except json.JSONDecodeError as e:
print(f"[inventaire] Erreur parsing lsblk: {e}", file=sys.stderr)
sys.exit(1)
disks = []
home_done = [False]
for blkdev in blk.get("blockdevices", []):
if blkdev.get("type") != "disk":
continue
name = blkdev.get("name", "")
dev_path = f"/dev/{name}"
model = (blkdev.get("model") or "inconnu").strip()
serial = (blkdev.get("serial") or "inconnu").strip()
size_b = blkdev.get("size") or 0
rota = blkdev.get("rota", "")
partitions = build_partitions(
blkdev.get("children") or [], df_map, lvm_map, home_done, os_type)
# /home sur partition racine si pas encore trouvé
if not home_done[0]:
for p in partitions:
if p.get("mountpoint") == "/":
p["home_users"] = get_home_users()
home_done[0] = True
break
disk = {
"device": name, "path": dev_path,
"by_id": get_by_id(dev_path),
"model": model, "serial": serial,
"type": disk_type(name, rota),
"capacity_bytes": size_b, "capacity_human": bytes_human(size_b),
"bus": get_bus(name),
"smart": get_smart(dev_path),
"partitions": partitions,
}
if os_type == "proxmox":
role = get_proxmox_role(name)
if role:
disk["proxmox_role"] = role
disks.append(disk)
return {
"hostname": hostname, "ip": ip,
"os": os_type, "os_version": os_version,
"collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(),
"disks": disks,
}
# ── HTTP POST ─────────────────────────────────────────────────────────────────
def post_to_api(payload):
url = f"{API_URL}/api/ingest"
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data,
headers={"Content-Type": "application/json"},
method="POST")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
body = json.loads(resp.read())
print(f"[inventaire] OK — {body.get('accepted','?')} disque(s) "
f"enregistré(s) pour {body.get('hostname','?')}")
except urllib.error.HTTPError as e:
print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as e:
print(f"[inventaire] Impossible de joindre {url}: {e.reason}", file=sys.stderr)
sys.exit(1)
# ── Entrypoint ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
if os.geteuid() != 0:
print("[inventaire] Ce script doit être exécuté en root (sudo).", file=sys.stderr)
sys.exit(1)
print(f"[inventaire] Collecte en cours vers {API_URL}...")
payload = collect()
post_to_api(payload)
- Step 4: Vérifier que les tests du script passent
python -m pytest tests/test_inventaire.py -v
Attendu : 11 passed
- Step 5: Commit
git add inventaire.py tests/test_inventaire.py
git commit -m "feat: script client inventaire.py (stdlib only)"
Task 10 : Test end-to-end Docker
Files:
-
Modify:
api/Dockerfile(ajout PYTHONPATH) -
Step 1: Corriger
api/Dockerfilepour le PYTHONPATH
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV PYTHONPATH=/app
CMD ["sh", "-c", "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 8000"]
- Step 2: Build et démarrage
docker compose up --build -d
Attendu : 2 conteneurs Up
- Step 3: Vérifier que l'API répond
curl -s http://10.0.0.50:8088/api/machines | python3 -m json.tool
Attendu : []
- Step 4: Envoyer un payload de test
curl -s -X POST http://10.0.0.50:8088/api/ingest \
-H "Content-Type: application/json" \
-d '{
"hostname":"test-host","ip":"10.0.0.99",
"os":"debian","os_version":"12",
"collected_at":"2026-05-28T15:00:00+02:00",
"disks":[{
"device":"sda","path":"/dev/sda","by_id":null,
"model":"Test Disk","serial":"SN-TEST-001","type":"HDD",
"capacity_bytes":1000000000,"capacity_human":"1.0 Go","bus":"sata",
"smart":{"status":"ok","label":"Bon état","detail":"100h · 30°C",
"temperature_c":30,"power_on_hours":100,
"reallocated_sectors":0,"pending_sectors":0,"uncorrectable_sectors":0},
"partitions":[]
}]
}' | python3 -m json.tool
Attendu : {"accepted": 1, "hostname": "test-host"}
- Step 5: Vérifier la lecture
curl -s http://10.0.0.50:8088/api/disks | python3 -m json.tool
curl -s http://10.0.0.50:8088/api/ai/summary | python3 -m json.tool
- Step 6: Commit final
git add api/Dockerfile
git commit -m "fix: PYTHONPATH dans Dockerfile pour import api.*"
Récapitulatif des commits attendus
feat: infrastructure Docker + nginxfeat: modèles SQLAlchemy Machine/Disk/Snapshotfeat: Alembic migrations initiales (3 tables)feat: schémas Pydantic IngestPayload + réponses APIfeat: service ingest + POST /api/ingestfeat: routes GET /api/disks/* et /api/machines/*feat: routes GET /api/ai/* (summary, at-risk, moved, backup)feat: lifespan FastAPI avec alembic upgrade au démarragefeat: script client inventaire.py (stdlib only)fix: PYTHONPATH dans Dockerfile pour import api.*