From ac51959eb21e0ae71b9b482eebf5800b5ddbc10e Mon Sep 17 00:00:00 2001 From: Gilles Soulier Date: Thu, 28 May 2026 20:35:32 +0200 Subject: [PATCH] fix: SMART NVMe + messages debug verbeux MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Parsing NVMe séparé de SATA (Temperature/Power On Hours/Available Spare) - Flag --debug et --dry-run activent les messages détaillés sur stderr - run() affiche les erreurs et exceptions en mode verbose - Fallback du -d 1 -> --max-depth=1 pour home_users - dprint() sur chaque étape : OS, disques, partitions, SMART, LVM, /home Co-Authored-By: Claude Sonnet 4.6 --- inventaire.py | 293 ++++++++++++++++++++++++--------------- tests/test_inventaire.py | 2 +- 2 files changed, 185 insertions(+), 110 deletions(-) diff --git a/inventaire.py b/inventaire.py index 1b83564..bfa950e 100644 --- a/inventaire.py +++ b/inventaire.py @@ -1,26 +1,33 @@ #!/usr/bin/env python3 """ -inventaire.py — Inventaire disques HDD/SSD/NVMe -Exécuter en root : sudo python3 inventaire.py [options] -Dépendances : stdlib uniquement (Python 3.9+) +inventaire.py - Inventaire disques HDD/SSD/NVMe +Executer en root : sudo python3 inventaire.py [options] +Dependances : stdlib uniquement (Python 3.9+) -Lancement à distance : +Lancement a distance : curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py | sudo python3 - """ import argparse, json, os, re, subprocess, sys, urllib.request, urllib.error from datetime import datetime, timezone -# ── CLI ─────────────────────────────────────────────────────────────────────── +_verbose = [False] # mutable : _verbose[0] = True pour activer + +def dprint(msg): + if _verbose[0]: + print(f"[debug] {msg}", file=sys.stderr) + + +# -- CLI ---------------------------------------------------------------------- def parse_args(): default_host = os.environ.get("MES_HDD_HOST", "10.0.0.50") default_port = int(os.environ.get("MES_HDD_PORT", "8088")) p = argparse.ArgumentParser( - description="Inventaire disques HDD/SSD/NVMe → backend mes_hdd", + description="Inventaire disques HDD/SSD/NVMe -> backend mes_hdd", epilog=( "Variables d'environnement :\n" - " MES_HDD_HOST Hôte du serveur (défaut: 10.0.0.50)\n" - " MES_HDD_PORT Port du serveur (défaut: 8088)\n\n" + " MES_HDD_HOST Hote du serveur (defaut: 10.0.0.50)\n" + " MES_HDD_PORT Port du serveur (defaut: 8088)\n\n" "Exemples :\n" " sudo python3 inventaire.py\n" " sudo python3 inventaire.py --dry-run\n" @@ -33,19 +40,19 @@ def parse_args(): ) p.add_argument( "--host", default=default_host, - help=f"Hôte du serveur backend (env MES_HDD_HOST, défaut: {default_host})", + help=f"Hote du serveur backend (env MES_HDD_HOST, defaut: {default_host})", ) p.add_argument( "--port", type=int, default=default_port, - help=f"Port du serveur backend (env MES_HDD_PORT, défaut: {default_port})", + help=f"Port du serveur backend (env MES_HDD_PORT, defaut: {default_port})", ) p.add_argument( "-n", "--dry-run", action="store_true", - help="Affiche le JSON collecté sans envoyer au serveur", + help="Affiche le JSON collecte sans envoyer au serveur", ) p.add_argument( "--debug", action="store_true", - help="Affiche le JSON ET envoie au serveur", + help="Mode verbeux : messages detailles + affiche JSON + envoie au serveur", ) p.add_argument( "--output", metavar="FICHIER", @@ -54,13 +61,18 @@ def parse_args(): return p.parse_args() -# ── Helpers ─────────────────────────────────────────────────────────────────── +# -- Helpers ------------------------------------------------------------------ def run(cmd, default=None): try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=15) - return r.stdout.strip() if r.returncode == 0 else default - except Exception: + if r.returncode != 0: + dprint(f"echec {r.returncode}: {' '.join(str(c) for c in cmd)}" + + (f" -> {r.stderr.strip()}" if r.stderr.strip() else "")) + return default + return r.stdout.strip() + except Exception as e: + dprint(f"exception: {' '.join(str(c) for c in cmd)} -> {e}") return default def bytes_human(n): @@ -76,7 +88,7 @@ def print_json(payload): print(json.dumps(payload, ensure_ascii=False, indent=2)) -# ── Détection OS ────────────────────────────────────────────────────────────── +# -- Detection OS ------------------------------------------------------------- def detect_os(): info = {} @@ -98,7 +110,7 @@ def detect_os(): return "debian", version -# ── Machine ─────────────────────────────────────────────────────────────────── +# -- Machine ------------------------------------------------------------------ def get_hostname(): return run(["hostname"], default="inconnu") @@ -115,14 +127,7 @@ def get_ip(): return "inconnu" -# ── SMART ───────────────────────────────────────────────────────────────────── - -def _extract_attr(output, name): - m = re.search( - rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)", - output, - ) - return int(m.group(1)) if m else None +# -- SMART -------------------------------------------------------------------- def _smart_unavailable(reason): return { @@ -132,51 +137,97 @@ def _smart_unavailable(reason): "uncorrectable_sectors": None, } -def get_smart(dev): - out = run(["smartctl", "-H", "-A", "-i", dev]) - if out is None: - return _smart_unavailable("smartctl absent ou accès refusé") - - temp = (_extract_attr(out, "Temperature_Celsius") - or _extract_attr(out, "Airflow_Temperature_Cel")) - poh = _extract_attr(out, "Power_On_Hours") - real = _extract_attr(out, "Reallocated_Sector_Ct") - pend = _extract_attr(out, "Current_Pending_Sector") - uncr = _extract_attr(out, "Offline_Uncorrectable") - base = { - "temperature_c": temp, "power_on_hours": poh, - "reallocated_sectors": real, "pending_sectors": pend, - "uncorrectable_sectors": uncr, - } +def _extract_sata_attr(output, name): + m = re.search( + rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)", + output, + ) + return int(m.group(1)) if m else None +def _parse_sata_smart(out): + temp = (_extract_sata_attr(out, "Temperature_Celsius") + or _extract_sata_attr(out, "Airflow_Temperature_Cel")) + poh = _extract_sata_attr(out, "Power_On_Hours") + real = _extract_sata_attr(out, "Reallocated_Sector_Ct") + pend = _extract_sata_attr(out, "Current_Pending_Sector") + uncr = _extract_sata_attr(out, "Offline_Uncorrectable") + base = {"temperature_c": temp, "power_on_hours": poh, + "reallocated_sectors": real, "pending_sectors": pend, + "uncorrectable_sectors": uncr} if "FAILED!" in out: return {**base, "status": "fail", "label": "Défaillance probable", "detail": "Prévoir le remplacement du disque"} - if "PASSED" in out or "Passed" in out: issues = [] - if real and real > 0: - issues.append(f"{real} secteur(s) réalloué(s)") - if pend and pend > 0: - issues.append(f"{pend} secteur(s) en attente") - if uncr and uncr > 0: - issues.append(f"{uncr} secteur(s) non corrigeable(s)") + if real and real > 0: issues.append(f"{real} secteur(s) réalloué(s)") + if pend and pend > 0: issues.append(f"{pend} secteur(s) en attente") + if uncr and uncr > 0: issues.append(f"{uncr} secteur(s) non corrigeable(s)") if issues: return {**base, "status": "warn", "label": "Attention", "detail": ", ".join(issues) + " — disque à surveiller"} parts = [] - if poh is not None: - parts.append(f"{poh:,}h d'utilisation".replace(",", " ")) - if temp is not None: - parts.append(f"{temp}°C") + if poh is not None: parts.append(f"{poh:,}h d'utilisation".replace(",", " ")) + if temp is not None: parts.append(f"{temp}°C") parts.append("aucun secteur défectueux") - return {**base, "status": "ok", "label": "Bon état", - "detail": " · ".join(parts)} - + return {**base, "status": "ok", "label": "Bon état", "detail": " · ".join(parts)} return _smart_unavailable("résultat SMART non interprétable") +def _parse_nvme_smart(out): + m = re.search(r"Temperature:\s+(\d+)\s+Celsius", out) + temp = int(m.group(1)) if m else None -# ── Métadonnées disque ──────────────────────────────────────────────────────── + m = re.search(r"Power On Hours:\s+([\d,]+)", out) + poh = int(m.group(1).replace(",", "")) if m else None + + m = re.search(r"Media and Data Integrity Errors:\s+(\d+)", out) + media_err = int(m.group(1)) if m else None + + m = re.search(r"Available Spare:\s+(\d+)%", out) + spare = int(m.group(1)) if m else None + + m = re.search(r"Percentage Used:\s+(\d+)%", out) + pct_used = int(m.group(1)) if m else None + + base = {"temperature_c": temp, "power_on_hours": poh, + "reallocated_sectors": None, "pending_sectors": None, + "uncorrectable_sectors": media_err} + + if "FAILED!" in out: + return {**base, "status": "fail", "label": "Défaillance probable", + "detail": "Prévoir le remplacement du disque"} + if "PASSED" in out or "Passed" in out: + issues = [] + if spare is not None and spare < 20: + issues.append(f"espace de réserve faible ({spare}%)") + if pct_used is not None and pct_used > 80: + issues.append(f"usure avancée ({pct_used}% de durée de vie)") + if media_err and media_err > 0: + issues.append(f"{media_err} erreur(s) d'intégrité") + if issues: + return {**base, "status": "warn", "label": "Attention", + "detail": ", ".join(issues) + " — disque à surveiller"} + parts = [] + if poh is not None: parts.append(f"{poh:,}h d'utilisation".replace(",", " ")) + if temp is not None: parts.append(f"{temp}°C") + if pct_used is not None: parts.append(f"usure : {pct_used}%") + parts.append("aucune erreur NVMe détectée") + return {**base, "status": "ok", "label": "Bon état", "detail": " · ".join(parts)} + return _smart_unavailable("résultat SMART NVMe non interprétable") + +def get_smart(dev): + dprint(f"lecture SMART {dev}...") + out = run(["smartctl", "-H", "-A", "-i", dev]) + if out is None: + dprint(f"SMART {dev} : smartctl indisponible ou acces refuse") + return _smart_unavailable("smartctl absent ou acces refuse") + is_nvme = bool(re.search(r"NVMe|Transport protocol:\s*NVMe", out, re.IGNORECASE)) + dprint(f"SMART {dev} : protocole {'NVMe' if is_nvme else 'SATA/SAS'}") + result = _parse_nvme_smart(out) if is_nvme else _parse_sata_smart(out) + dprint(f"SMART {dev} : {result['status']} - {result['detail']}") + return result + + +# -- Metadonnees disque ------------------------------------------------------- def get_by_id(dev_path): out = run(["find", "/dev/disk/by-id", "-type", "l"]) @@ -210,9 +261,10 @@ def disk_type(name, rota): return "inconnu" -# ── Espace disque (df) ──────────────────────────────────────────────────────── +# -- Espace disque (df) ------------------------------------------------------- def get_df_map(): + dprint("lecture espace disques via df...") out = run(["df", "--output=target,size,used,avail", "-B1"]) result = {} if not out: @@ -229,10 +281,11 @@ def get_df_map(): } except ValueError: pass + dprint(f"df : {len(result)} point(s) de montage") return result -# ── LVM ─────────────────────────────────────────────────────────────────────── +# -- LVM ---------------------------------------------------------------------- def _lv_size_human(s): if not s: @@ -247,9 +300,11 @@ def _lv_size_human(s): return s def get_lvm_map(): + dprint("detection LVM...") pvs_out = run(["pvs", "--noheadings", "--reportformat", "json", "-o", "pv_name,vg_name"]) if not pvs_out: + dprint("LVM : aucun volume physique detecte (pvs indisponible ou absent)") return {} try: pvs = json.loads(pvs_out)["report"][0]["pv"] @@ -279,16 +334,23 @@ def get_lvm_map(): for lv in lvs_by_vg.get(vg_name, []) ], } + dprint(f"LVM : {len(result)} volume(s) physique(s)") return result -# ── /home users ─────────────────────────────────────────────────────────────── +# -- /home users -------------------------------------------------------------- def get_home_users(): if not os.path.isdir("/home"): + dprint("/home absent") return [] - out = run(["du", "--max-depth=1", "-b", "/home"]) + dprint("calcul taille /home par utilisateur...") + out = run(["du", "-d", "1", "-b", "/home"]) if out is None: + dprint("/home : echec de du, essai avec --max-depth...") + out = run(["du", "--max-depth=1", "-b", "/home"]) + if out is None: + dprint("/home : impossible de calculer les tailles") return None entries = [] for line in out.splitlines(): @@ -308,10 +370,13 @@ def get_home_users(): }) except ValueError: continue - return sorted(entries, key=lambda x: x["size_bytes"], reverse=True) + entries = sorted(entries, key=lambda x: x["size_bytes"], reverse=True) + dprint(f"/home : {len(entries)} utilisateur(s) : " + + ", ".join(f"{e['user']} ({e['size_human']})" for e in entries)) + return entries -# ── Proxmox ─────────────────────────────────────────────────────────────────── +# -- Proxmox ------------------------------------------------------------------ def get_proxmox_role(dev_name): zpool_out = run(["zpool", "status", "-P"]) @@ -329,24 +394,18 @@ def get_proxmox_role(dev_name): return None -# ── Construction des partitions ─────────────────────────────────────────────── - -def _enrich_lv_from_df(lv, df_map): - lv_path = f"/dev/{lv.get('_vg_name', '')}/{lv['lv_name']}" - for mp, df in df_map.items(): - pass - return lv +# -- Construction des partitions ---------------------------------------------- def build_partitions(children, df_map, lvm_map, home_done): parts = [] for child in (children or []): if child.get("type") not in ("part", "lvm"): continue - name = child.get("name", "") - fstype = child.get("fstype") or None + name = child.get("name", "") + fstype = child.get("fstype") or None mountpoint = child.get("mountpoint") or None - if fstype == "squashfs": # snap Ubuntu — ignorer + if fstype == "squashfs": # snap Ubuntu - ignorer continue size_b = child.get("size") @@ -373,32 +432,35 @@ def build_partitions(children, df_map, lvm_map, home_done): part["free_human"] = bytes_human(df["free_bytes"]) if df["size_bytes"] > 0: part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100) + dprint(f" partition {name} montee sur {mountpoint} : " + f"{part['used_human']} / {part['size_human']} ({part['used_percent']}%)") + else: + dprint(f" partition {name} : fstype={fstype}, non montee") # /home users sur cette partition if mountpoint == "/home" and not home_done[0]: part["home_users"] = get_home_users() home_done[0] = True - # LVM : mapping PV → VG + LVs + # LVM dev_path = f"/dev/{name}" if fstype == "LVM2_member" and dev_path in lvm_map: lvm_info = lvm_map[dev_path] lvs = [dict(lv) for lv in lvm_info["logical_volumes"]] - # Enrichir les LVs avec df si leurs enfants lsblk sont disponibles lv_children = child.get("children") or [] for lv_child in lv_children: - lv_mp = lv_child.get("mountpoint") or None + lv_mp = lv_child.get("mountpoint") or None lv_name_raw = lv_child.get("name", "") if not lv_mp: continue df = df_map.get(lv_mp) for lv in lvs: if lv["lv_name"] in lv_name_raw or lv_name_raw.endswith(lv["lv_name"]): - lv["mountpoint"] = lv_mp - lv["fstype"] = lv_child.get("fstype") or None + lv["mountpoint"] = lv_mp + lv["fstype"] = lv_child.get("fstype") or None if df: - lv["used_human"] = bytes_human(df["used_bytes"]) - lv["free_human"] = bytes_human(df["free_bytes"]) + lv["used_human"] = bytes_human(df["used_bytes"]) + lv["free_human"] = bytes_human(df["free_bytes"]) if df["size_bytes"] > 0: lv["used_percent"] = int( df["used_bytes"] / df["size_bytes"] * 100) @@ -411,15 +473,20 @@ def build_partitions(children, df_map, lvm_map, home_done): return parts -# ── Collecte principale ─────────────────────────────────────────────────────── +# -- Collecte principale ------------------------------------------------------ def collect(): os_type, os_version = detect_os() + dprint(f"OS detecte : {os_type} {os_version}") + hostname = get_hostname() - ip = get_ip() + ip = get_ip() + dprint(f"Machine : {hostname} / {ip}") + df_map = get_df_map() lvm_map = get_lvm_map() + dprint("liste des disques via lsblk...") output = run([ "lsblk", "-J", "-b", "-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME", @@ -433,61 +500,67 @@ def collect(): print(f"[inventaire] Erreur parsing lsblk JSON: {e}", file=sys.stderr) sys.exit(1) - disks = [] + disks = [] home_done = [False] for blkdev in blk.get("blockdevices", []): if blkdev.get("type") != "disk": continue - name = blkdev.get("name", "") + name = blkdev.get("name", "") dev_path = f"/dev/{name}" - model = (blkdev.get("model") or "inconnu").strip() - serial = (blkdev.get("serial") or "inconnu").strip() - size_b = blkdev.get("size") or 0 - rota = blkdev.get("rota", "") + model = (blkdev.get("model") or "inconnu").strip() + serial = (blkdev.get("serial") or "inconnu").strip() + size_b = blkdev.get("size") or 0 + rota = blkdev.get("rota", "") + dtype = disk_type(name, rota) + + dprint(f"disque {name} : {model} / serial {serial} / {dtype} / {bytes_human(size_b)}") partitions = build_partitions( blkdev.get("children") or [], df_map, lvm_map, home_done) - # /home sur partition racine si pas encore trouvé + # /home sur partition racine si pas encore trouve if not home_done[0]: for p in partitions: if p.get("mountpoint") == "/": + dprint("/home non dedie, calcul depuis la partition racine /") p["home_users"] = get_home_users() home_done[0] = True break disk = { - "device": name, - "path": dev_path, - "by_id": get_by_id(dev_path), - "model": model, - "serial": serial, - "type": disk_type(name, rota), + "device": name, + "path": dev_path, + "by_id": get_by_id(dev_path), + "model": model, + "serial": serial, + "type": dtype, "capacity_bytes": size_b, "capacity_human": bytes_human(size_b), - "bus": get_bus(name), - "smart": get_smart(dev_path), - "partitions": partitions, + "bus": get_bus(name), + "smart": get_smart(dev_path), + "partitions": partitions, } if os_type == "proxmox": role = get_proxmox_role(name) if role: disk["proxmox_role"] = role + dprint(f" role Proxmox : {role}") disks.append(disk) + dprint(f"collecte terminee : {len(disks)} disque(s)") return { - "hostname": hostname, - "ip": ip, - "os": os_type, - "os_version": os_version, + "hostname": hostname, + "ip": ip, + "os": os_type, + "os_version": os_version, "collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(), - "disks": disks, + "disks": disks, } -# ── HTTP POST ───────────────────────────────────────────────────────────────── +# -- HTTP POST ---------------------------------------------------------------- def post_to_api(payload, api_url): url = f"{api_url.rstrip('/')}/api/ingest" @@ -501,8 +574,8 @@ def post_to_api(payload, api_url): with urllib.request.urlopen(req, timeout=30) as resp: body = json.loads(resp.read()) print( - f"[inventaire] OK — {body.get('accepted', '?')} disque(s) " - f"enregistré(s) pour {body.get('hostname', '?')}" + f"[inventaire] OK - {body.get('accepted', '?')} disque(s) " + f"enregistre(s) pour {body.get('hostname', '?')}" ) except urllib.error.HTTPError as e: print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr) @@ -512,27 +585,29 @@ def post_to_api(payload, api_url): sys.exit(1) -# ── Entrypoint ──────────────────────────────────────────────────────────────── +# -- Entrypoint --------------------------------------------------------------- if __name__ == "__main__": if os.geteuid() != 0: - print("[inventaire] Ce script doit être exécuté en root (sudo).", file=sys.stderr) + print("[inventaire] Ce script doit etre execute en root (sudo).", file=sys.stderr) sys.exit(1) args = parse_args() api_url = f"http://{args.host}:{args.port}" + _verbose[0] = args.debug or args.dry_run + print(f"[inventaire] Collecte en cours...", file=sys.stderr) payload = collect() if args.output: with open(args.output, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) - print(f"[inventaire] JSON sauvegardé : {args.output}", file=sys.stderr) + print(f"[inventaire] JSON sauvegarde : {args.output}", file=sys.stderr) if args.dry_run: print_json(payload) - print(f"\n[inventaire] --dry-run : aucune donnée envoyée (cible: {api_url})", + print(f"\n[inventaire] --dry-run : aucune donnee envoyee (cible: {api_url})", file=sys.stderr) elif args.debug: print_json(payload) diff --git a/tests/test_inventaire.py b/tests/test_inventaire.py index 5616cda..ad52b21 100644 --- a/tests/test_inventaire.py +++ b/tests/test_inventaire.py @@ -131,7 +131,7 @@ def test_smart_ok(): assert r["power_on_hours"] == 2847 assert r["temperature_c"] == 27 assert r["reallocated_sectors"] == 0 - assert "2 847h" in r["detail"] + assert "2 847h" in r["detail"].replace(" ", " ") assert "27°C" in r["detail"] assert "aucun secteur" in r["detail"]