Files
mes_hdd/inventaire.py
T
Gilles Soulier c0e737244c feat: script client inventaire.py (stdlib only, 28 tests)
CLI argparse : --host, --port, --dry-run/-n, --debug, --output
Variables env : MES_HDD_HOST, MES_HDD_PORT
Détection OS : proxmox/ubuntu/debian
SMART en français : ok/warn/fail/unavailable avec détail lisible
Partitions : fstype, UUID, espace, LVM, /home users
Distribution : curl | sudo python3 -

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 20:15:46 +02:00

542 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
inventaire.py — Inventaire disques HDD/SSD/NVMe
Exécuter en root : sudo python3 inventaire.py [options]
Dépendances : stdlib uniquement (Python 3.9+)
Lancement à distance :
curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py | sudo python3 -
"""
import argparse, json, os, re, subprocess, sys, urllib.request, urllib.error
from datetime import datetime, timezone
# ── CLI ───────────────────────────────────────────────────────────────────────
def parse_args():
default_host = os.environ.get("MES_HDD_HOST", "10.0.0.50")
default_port = int(os.environ.get("MES_HDD_PORT", "8088"))
p = argparse.ArgumentParser(
description="Inventaire disques HDD/SSD/NVMe → backend mes_hdd",
epilog=(
"Variables d'environnement :\n"
" MES_HDD_HOST Hôte du serveur (défaut: 10.0.0.50)\n"
" MES_HDD_PORT Port du serveur (défaut: 8088)\n\n"
"Exemples :\n"
" sudo python3 inventaire.py\n"
" sudo python3 inventaire.py --dry-run\n"
" sudo python3 inventaire.py --debug --output /tmp/inv.json\n"
" sudo python3 inventaire.py --host 192.168.1.10 --port 9000\n"
" curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py"
" | sudo MES_HDD_HOST=10.0.0.50 python3 -\n"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument(
"--host", default=default_host,
help=f"Hôte du serveur backend (env MES_HDD_HOST, défaut: {default_host})",
)
p.add_argument(
"--port", type=int, default=default_port,
help=f"Port du serveur backend (env MES_HDD_PORT, défaut: {default_port})",
)
p.add_argument(
"-n", "--dry-run", action="store_true",
help="Affiche le JSON collecté sans envoyer au serveur",
)
p.add_argument(
"--debug", action="store_true",
help="Affiche le JSON ET envoie au serveur",
)
p.add_argument(
"--output", metavar="FICHIER",
help="Sauvegarde le JSON dans un fichier (en plus de l'envoi)",
)
return p.parse_args()
# ── Helpers ───────────────────────────────────────────────────────────────────
def run(cmd, default=None):
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
return r.stdout.strip() if r.returncode == 0 else default
except Exception:
return default
def bytes_human(n):
if n is None:
return "?"
for unit in ("o", "Ko", "Mo", "Go", "To"):
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} Po"
def print_json(payload):
print(json.dumps(payload, ensure_ascii=False, indent=2))
# ── Détection OS ──────────────────────────────────────────────────────────────
def detect_os():
info = {}
try:
with open("/etc/os-release") as f:
for line in f:
if "=" in line:
k, v = line.strip().split("=", 1)
info[k] = v.strip('"')
except FileNotFoundError:
pass
os_id = info.get("ID", "").lower()
variant = info.get("VARIANT_ID", "").lower()
version = info.get("VERSION_ID", "")
if os.path.isdir("/etc/pve") or variant == "proxmox":
return "proxmox", version
if os_id == "ubuntu":
return "ubuntu", version
return "debian", version
# ── Machine ───────────────────────────────────────────────────────────────────
def get_hostname():
return run(["hostname"], default="inconnu")
def get_ip():
out = run(["ip", "route", "get", "1.1.1.1"])
if out:
m = re.search(r"src\s+(\S+)", out)
if m:
return m.group(1)
out = run(["hostname", "-I"])
if out:
return out.split()[0]
return "inconnu"
# ── SMART ─────────────────────────────────────────────────────────────────────
def _extract_attr(output, name):
m = re.search(
rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)",
output,
)
return int(m.group(1)) if m else None
def _smart_unavailable(reason):
return {
"status": "unavailable", "label": "SMART indisponible", "detail": reason,
"temperature_c": None, "power_on_hours": None,
"reallocated_sectors": None, "pending_sectors": None,
"uncorrectable_sectors": None,
}
def get_smart(dev):
out = run(["smartctl", "-H", "-A", "-i", dev])
if out is None:
return _smart_unavailable("smartctl absent ou accès refusé")
temp = (_extract_attr(out, "Temperature_Celsius")
or _extract_attr(out, "Airflow_Temperature_Cel"))
poh = _extract_attr(out, "Power_On_Hours")
real = _extract_attr(out, "Reallocated_Sector_Ct")
pend = _extract_attr(out, "Current_Pending_Sector")
uncr = _extract_attr(out, "Offline_Uncorrectable")
base = {
"temperature_c": temp, "power_on_hours": poh,
"reallocated_sectors": real, "pending_sectors": pend,
"uncorrectable_sectors": uncr,
}
if "FAILED!" in out:
return {**base, "status": "fail", "label": "Défaillance probable",
"detail": "Prévoir le remplacement du disque"}
if "PASSED" in out or "Passed" in out:
issues = []
if real and real > 0:
issues.append(f"{real} secteur(s) réalloué(s)")
if pend and pend > 0:
issues.append(f"{pend} secteur(s) en attente")
if uncr and uncr > 0:
issues.append(f"{uncr} secteur(s) non corrigeable(s)")
if issues:
return {**base, "status": "warn", "label": "Attention",
"detail": ", ".join(issues) + " — disque à surveiller"}
parts = []
if poh is not None:
parts.append(f"{poh:,}h d'utilisation".replace(",", ""))
if temp is not None:
parts.append(f"{temp}°C")
parts.append("aucun secteur défectueux")
return {**base, "status": "ok", "label": "Bon état",
"detail": " · ".join(parts)}
return _smart_unavailable("résultat SMART non interprétable")
# ── Métadonnées disque ────────────────────────────────────────────────────────
def get_by_id(dev_path):
out = run(["find", "/dev/disk/by-id", "-type", "l"])
if not out:
return None
for link in out.splitlines():
if "-part" in link:
continue
target = run(["readlink", "-f", link])
if target == dev_path:
return os.path.basename(link)
return None
def get_bus(dev_name):
if dev_name.startswith("nvme"):
return "nvme"
out = run(["udevadm", "info", "--query=property", f"--name=/dev/{dev_name}"])
if out:
m = re.search(r"^ID_BUS=(.+)$", out, re.MULTILINE)
if m:
return m.group(1).lower()
return "inconnu"
def disk_type(name, rota):
if name.startswith("nvme"):
return "NVMe"
if str(rota) == "1":
return "HDD"
if str(rota) == "0":
return "SSD"
return "inconnu"
# ── Espace disque (df) ────────────────────────────────────────────────────────
def get_df_map():
out = run(["df", "--output=target,size,used,avail", "-B1"])
result = {}
if not out:
return result
for line in out.splitlines()[1:]:
parts = line.split()
if len(parts) < 4:
continue
try:
result[parts[0]] = {
"size_bytes": int(parts[1]),
"used_bytes": int(parts[2]),
"free_bytes": int(parts[3]),
}
except ValueError:
pass
return result
# ── LVM ───────────────────────────────────────────────────────────────────────
def _lv_size_human(s):
if not s:
return "?"
s = s.strip()
unit_map = {"k": "Ko", "m": "Mo", "g": "Go", "t": "To"}
if s and s[-1].lower() in unit_map:
try:
return f"{float(s[:-1]):.1f} {unit_map[s[-1].lower()]}"
except ValueError:
pass
return s
def get_lvm_map():
pvs_out = run(["pvs", "--noheadings", "--reportformat", "json",
"-o", "pv_name,vg_name"])
if not pvs_out:
return {}
try:
pvs = json.loads(pvs_out)["report"][0]["pv"]
except (json.JSONDecodeError, KeyError, IndexError):
return {}
lvs_by_vg = {}
lvs_out = run(["lvs", "--noheadings", "--reportformat", "json",
"-o", "lv_name,vg_name,lv_size,lv_path"])
if lvs_out:
try:
for lv in json.loads(lvs_out)["report"][0]["lv"]:
lvs_by_vg.setdefault(lv.get("vg_name", ""), []).append(lv)
except (json.JSONDecodeError, KeyError, IndexError):
pass
result = {}
for pv in pvs:
pv_name = pv.get("pv_name", "")
vg_name = pv.get("vg_name", "")
if pv_name and vg_name:
result[pv_name] = {
"vg_name": vg_name,
"logical_volumes": [
{"lv_name": lv.get("lv_name", ""),
"size_human": _lv_size_human(lv.get("lv_size", "")),
"used_human": None, "free_human": None,
"used_percent": None, "fstype": None, "mountpoint": None}
for lv in lvs_by_vg.get(vg_name, [])
],
}
return result
# ── /home users ───────────────────────────────────────────────────────────────
def get_home_users():
if not os.path.isdir("/home"):
return []
out = run(["du", "--max-depth=1", "-b", "/home"])
if out is None:
return None
entries = []
for line in out.splitlines():
parts = line.split("\t", 1)
if len(parts) != 2:
continue
path = parts[1].strip()
if path.rstrip("/") == "/home":
continue
try:
size = int(parts[0])
user = os.path.basename(path.rstrip("/"))
entries.append({
"user": user,
"size_bytes": size,
"size_human": bytes_human(size),
})
except ValueError:
continue
return sorted(entries, key=lambda x: x["size_bytes"], reverse=True)
# ── Proxmox ───────────────────────────────────────────────────────────────────
def get_proxmox_role(dev_name):
zpool_out = run(["zpool", "status", "-P"])
if zpool_out and f"/dev/{dev_name}" in zpool_out:
return "zfs_pool"
ceph_dir = "/var/lib/ceph/osd"
if os.path.isdir(ceph_dir):
try:
for entry in os.listdir(ceph_dir):
link = os.path.join(ceph_dir, entry, "block")
if os.path.islink(link) and dev_name in os.path.realpath(link):
return "ceph_osd"
except OSError:
pass
return None
# ── Construction des partitions ───────────────────────────────────────────────
def _enrich_lv_from_df(lv, df_map):
lv_path = f"/dev/{lv.get('_vg_name', '')}/{lv['lv_name']}"
for mp, df in df_map.items():
pass
return lv
def build_partitions(children, df_map, lvm_map, home_done):
parts = []
for child in (children or []):
if child.get("type") not in ("part", "lvm"):
continue
name = child.get("name", "")
fstype = child.get("fstype") or None
mountpoint = child.get("mountpoint") or None
if fstype == "squashfs": # snap Ubuntu — ignorer
continue
size_b = child.get("size")
part = {
"name": name,
"uuid": child.get("uuid") or None,
"fstype": fstype,
"size_bytes": size_b,
"size_human": bytes_human(size_b),
"used_bytes": None, "used_human": None,
"free_bytes": None, "free_human": None,
"used_percent": None,
"mountpoint": mountpoint,
"home_users": None,
"lvm": None,
}
# Espace via df
if mountpoint and mountpoint in df_map:
df = df_map[mountpoint]
part["used_bytes"] = df["used_bytes"]
part["used_human"] = bytes_human(df["used_bytes"])
part["free_bytes"] = df["free_bytes"]
part["free_human"] = bytes_human(df["free_bytes"])
if df["size_bytes"] > 0:
part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100)
# /home users sur cette partition
if mountpoint == "/home" and not home_done[0]:
part["home_users"] = get_home_users()
home_done[0] = True
# LVM : mapping PV → VG + LVs
dev_path = f"/dev/{name}"
if fstype == "LVM2_member" and dev_path in lvm_map:
lvm_info = lvm_map[dev_path]
lvs = [dict(lv) for lv in lvm_info["logical_volumes"]]
# Enrichir les LVs avec df si leurs enfants lsblk sont disponibles
lv_children = child.get("children") or []
for lv_child in lv_children:
lv_mp = lv_child.get("mountpoint") or None
lv_name_raw = lv_child.get("name", "")
if not lv_mp:
continue
df = df_map.get(lv_mp)
for lv in lvs:
if lv["lv_name"] in lv_name_raw or lv_name_raw.endswith(lv["lv_name"]):
lv["mountpoint"] = lv_mp
lv["fstype"] = lv_child.get("fstype") or None
if df:
lv["used_human"] = bytes_human(df["used_bytes"])
lv["free_human"] = bytes_human(df["free_bytes"])
if df["size_bytes"] > 0:
lv["used_percent"] = int(
df["used_bytes"] / df["size_bytes"] * 100)
if lv_mp == "/home" and not home_done[0]:
lv["home_users"] = get_home_users()
home_done[0] = True
part["lvm"] = {"vg_name": lvm_info["vg_name"], "logical_volumes": lvs}
parts.append(part)
return parts
# ── Collecte principale ───────────────────────────────────────────────────────
def collect():
os_type, os_version = detect_os()
hostname = get_hostname()
ip = get_ip()
df_map = get_df_map()
lvm_map = get_lvm_map()
output = run([
"lsblk", "-J", "-b",
"-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME",
])
if not output:
print("[inventaire] Impossible de lister les disques via lsblk", file=sys.stderr)
sys.exit(1)
try:
blk = json.loads(output)
except json.JSONDecodeError as e:
print(f"[inventaire] Erreur parsing lsblk JSON: {e}", file=sys.stderr)
sys.exit(1)
disks = []
home_done = [False]
for blkdev in blk.get("blockdevices", []):
if blkdev.get("type") != "disk":
continue
name = blkdev.get("name", "")
dev_path = f"/dev/{name}"
model = (blkdev.get("model") or "inconnu").strip()
serial = (blkdev.get("serial") or "inconnu").strip()
size_b = blkdev.get("size") or 0
rota = blkdev.get("rota", "")
partitions = build_partitions(
blkdev.get("children") or [], df_map, lvm_map, home_done)
# /home sur partition racine si pas encore trouvé
if not home_done[0]:
for p in partitions:
if p.get("mountpoint") == "/":
p["home_users"] = get_home_users()
home_done[0] = True
break
disk = {
"device": name,
"path": dev_path,
"by_id": get_by_id(dev_path),
"model": model,
"serial": serial,
"type": disk_type(name, rota),
"capacity_bytes": size_b,
"capacity_human": bytes_human(size_b),
"bus": get_bus(name),
"smart": get_smart(dev_path),
"partitions": partitions,
}
if os_type == "proxmox":
role = get_proxmox_role(name)
if role:
disk["proxmox_role"] = role
disks.append(disk)
return {
"hostname": hostname,
"ip": ip,
"os": os_type,
"os_version": os_version,
"collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(),
"disks": disks,
}
# ── HTTP POST ─────────────────────────────────────────────────────────────────
def post_to_api(payload, api_url):
url = f"{api_url.rstrip('/')}/api/ingest"
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
body = json.loads(resp.read())
print(
f"[inventaire] OK — {body.get('accepted', '?')} disque(s) "
f"enregistré(s) pour {body.get('hostname', '?')}"
)
except urllib.error.HTTPError as e:
print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as e:
print(f"[inventaire] Impossible de joindre {url}: {e.reason}", file=sys.stderr)
sys.exit(1)
# ── Entrypoint ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
if os.geteuid() != 0:
print("[inventaire] Ce script doit être exécuté en root (sudo).", file=sys.stderr)
sys.exit(1)
args = parse_args()
api_url = f"http://{args.host}:{args.port}"
print(f"[inventaire] Collecte en cours...", file=sys.stderr)
payload = collect()
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
print(f"[inventaire] JSON sauvegardé : {args.output}", file=sys.stderr)
if args.dry_run:
print_json(payload)
print(f"\n[inventaire] --dry-run : aucune donnée envoyée (cible: {api_url})",
file=sys.stderr)
elif args.debug:
print_json(payload)
post_to_api(payload, api_url)
else:
post_to_api(payload, api_url)