From c0e737244ceb93dd5eca8c5f5aae4d1cd202d5c9 Mon Sep 17 00:00:00 2001 From: Gilles Soulier Date: Thu, 28 May 2026 20:15:46 +0200 Subject: [PATCH] feat: script client inventaire.py (stdlib only, 28 tests) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CLI argparse : --host, --port, --dry-run/-n, --debug, --output Variables env : MES_HDD_HOST, MES_HDD_PORT Détection OS : proxmox/ubuntu/debian SMART en français : ok/warn/fail/unavailable avec détail lisible Partitions : fstype, UUID, espace, LVM, /home users Distribution : curl | sudo python3 - Co-Authored-By: Claude Sonnet 4.6 --- inventaire.py | 541 +++++++++++++++++++++++++++++++++++++++ tests/test_inventaire.py | 234 +++++++++++++++++ 2 files changed, 775 insertions(+) create mode 100644 inventaire.py create mode 100644 tests/test_inventaire.py diff --git a/inventaire.py b/inventaire.py new file mode 100644 index 0000000..1b83564 --- /dev/null +++ b/inventaire.py @@ -0,0 +1,541 @@ +#!/usr/bin/env python3 +""" +inventaire.py — Inventaire disques HDD/SSD/NVMe +Exécuter en root : sudo python3 inventaire.py [options] +Dépendances : stdlib uniquement (Python 3.9+) + +Lancement à distance : + curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py | sudo python3 - +""" +import argparse, json, os, re, subprocess, sys, urllib.request, urllib.error +from datetime import datetime, timezone + +# ── CLI ─────────────────────────────────────────────────────────────────────── + +def parse_args(): + default_host = os.environ.get("MES_HDD_HOST", "10.0.0.50") + default_port = int(os.environ.get("MES_HDD_PORT", "8088")) + p = argparse.ArgumentParser( + description="Inventaire disques HDD/SSD/NVMe → backend mes_hdd", + epilog=( + "Variables d'environnement :\n" + " MES_HDD_HOST Hôte du serveur (défaut: 10.0.0.50)\n" + " MES_HDD_PORT Port du serveur (défaut: 8088)\n\n" + "Exemples :\n" + " sudo python3 inventaire.py\n" + " sudo python3 inventaire.py --dry-run\n" + " sudo python3 inventaire.py --debug --output /tmp/inv.json\n" + " sudo python3 inventaire.py --host 192.168.1.10 --port 9000\n" + " curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py" + " | sudo MES_HDD_HOST=10.0.0.50 python3 -\n" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.add_argument( + "--host", default=default_host, + help=f"Hôte du serveur backend (env MES_HDD_HOST, défaut: {default_host})", + ) + p.add_argument( + "--port", type=int, default=default_port, + help=f"Port du serveur backend (env MES_HDD_PORT, défaut: {default_port})", + ) + p.add_argument( + "-n", "--dry-run", action="store_true", + help="Affiche le JSON collecté sans envoyer au serveur", + ) + p.add_argument( + "--debug", action="store_true", + help="Affiche le JSON ET envoie au serveur", + ) + p.add_argument( + "--output", metavar="FICHIER", + help="Sauvegarde le JSON dans un fichier (en plus de l'envoi)", + ) + return p.parse_args() + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def run(cmd, default=None): + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=15) + return r.stdout.strip() if r.returncode == 0 else default + except Exception: + return default + +def bytes_human(n): + if n is None: + return "?" + for unit in ("o", "Ko", "Mo", "Go", "To"): + if n < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} Po" + +def print_json(payload): + print(json.dumps(payload, ensure_ascii=False, indent=2)) + + +# ── Détection OS ────────────────────────────────────────────────────────────── + +def detect_os(): + info = {} + try: + with open("/etc/os-release") as f: + for line in f: + if "=" in line: + k, v = line.strip().split("=", 1) + info[k] = v.strip('"') + except FileNotFoundError: + pass + os_id = info.get("ID", "").lower() + variant = info.get("VARIANT_ID", "").lower() + version = info.get("VERSION_ID", "") + if os.path.isdir("/etc/pve") or variant == "proxmox": + return "proxmox", version + if os_id == "ubuntu": + return "ubuntu", version + return "debian", version + + +# ── Machine ─────────────────────────────────────────────────────────────────── + +def get_hostname(): + return run(["hostname"], default="inconnu") + +def get_ip(): + out = run(["ip", "route", "get", "1.1.1.1"]) + if out: + m = re.search(r"src\s+(\S+)", out) + if m: + return m.group(1) + out = run(["hostname", "-I"]) + if out: + return out.split()[0] + return "inconnu" + + +# ── SMART ───────────────────────────────────────────────────────────────────── + +def _extract_attr(output, name): + m = re.search( + rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)", + output, + ) + return int(m.group(1)) if m else None + +def _smart_unavailable(reason): + return { + "status": "unavailable", "label": "SMART indisponible", "detail": reason, + "temperature_c": None, "power_on_hours": None, + "reallocated_sectors": None, "pending_sectors": None, + "uncorrectable_sectors": None, + } + +def get_smart(dev): + out = run(["smartctl", "-H", "-A", "-i", dev]) + if out is None: + return _smart_unavailable("smartctl absent ou accès refusé") + + temp = (_extract_attr(out, "Temperature_Celsius") + or _extract_attr(out, "Airflow_Temperature_Cel")) + poh = _extract_attr(out, "Power_On_Hours") + real = _extract_attr(out, "Reallocated_Sector_Ct") + pend = _extract_attr(out, "Current_Pending_Sector") + uncr = _extract_attr(out, "Offline_Uncorrectable") + base = { + "temperature_c": temp, "power_on_hours": poh, + "reallocated_sectors": real, "pending_sectors": pend, + "uncorrectable_sectors": uncr, + } + + if "FAILED!" in out: + return {**base, "status": "fail", "label": "Défaillance probable", + "detail": "Prévoir le remplacement du disque"} + + if "PASSED" in out or "Passed" in out: + issues = [] + if real and real > 0: + issues.append(f"{real} secteur(s) réalloué(s)") + if pend and pend > 0: + issues.append(f"{pend} secteur(s) en attente") + if uncr and uncr > 0: + issues.append(f"{uncr} secteur(s) non corrigeable(s)") + if issues: + return {**base, "status": "warn", "label": "Attention", + "detail": ", ".join(issues) + " — disque à surveiller"} + parts = [] + if poh is not None: + parts.append(f"{poh:,}h d'utilisation".replace(",", " ")) + if temp is not None: + parts.append(f"{temp}°C") + parts.append("aucun secteur défectueux") + return {**base, "status": "ok", "label": "Bon état", + "detail": " · ".join(parts)} + + return _smart_unavailable("résultat SMART non interprétable") + + +# ── Métadonnées disque ──────────────────────────────────────────────────────── + +def get_by_id(dev_path): + out = run(["find", "/dev/disk/by-id", "-type", "l"]) + if not out: + return None + for link in out.splitlines(): + if "-part" in link: + continue + target = run(["readlink", "-f", link]) + if target == dev_path: + return os.path.basename(link) + return None + +def get_bus(dev_name): + if dev_name.startswith("nvme"): + return "nvme" + out = run(["udevadm", "info", "--query=property", f"--name=/dev/{dev_name}"]) + if out: + m = re.search(r"^ID_BUS=(.+)$", out, re.MULTILINE) + if m: + return m.group(1).lower() + return "inconnu" + +def disk_type(name, rota): + if name.startswith("nvme"): + return "NVMe" + if str(rota) == "1": + return "HDD" + if str(rota) == "0": + return "SSD" + return "inconnu" + + +# ── Espace disque (df) ──────────────────────────────────────────────────────── + +def get_df_map(): + out = run(["df", "--output=target,size,used,avail", "-B1"]) + result = {} + if not out: + return result + for line in out.splitlines()[1:]: + parts = line.split() + if len(parts) < 4: + continue + try: + result[parts[0]] = { + "size_bytes": int(parts[1]), + "used_bytes": int(parts[2]), + "free_bytes": int(parts[3]), + } + except ValueError: + pass + return result + + +# ── LVM ─────────────────────────────────────────────────────────────────────── + +def _lv_size_human(s): + if not s: + return "?" + s = s.strip() + unit_map = {"k": "Ko", "m": "Mo", "g": "Go", "t": "To"} + if s and s[-1].lower() in unit_map: + try: + return f"{float(s[:-1]):.1f} {unit_map[s[-1].lower()]}" + except ValueError: + pass + return s + +def get_lvm_map(): + pvs_out = run(["pvs", "--noheadings", "--reportformat", "json", + "-o", "pv_name,vg_name"]) + if not pvs_out: + return {} + try: + pvs = json.loads(pvs_out)["report"][0]["pv"] + except (json.JSONDecodeError, KeyError, IndexError): + return {} + lvs_by_vg = {} + lvs_out = run(["lvs", "--noheadings", "--reportformat", "json", + "-o", "lv_name,vg_name,lv_size,lv_path"]) + if lvs_out: + try: + for lv in json.loads(lvs_out)["report"][0]["lv"]: + lvs_by_vg.setdefault(lv.get("vg_name", ""), []).append(lv) + except (json.JSONDecodeError, KeyError, IndexError): + pass + result = {} + for pv in pvs: + pv_name = pv.get("pv_name", "") + vg_name = pv.get("vg_name", "") + if pv_name and vg_name: + result[pv_name] = { + "vg_name": vg_name, + "logical_volumes": [ + {"lv_name": lv.get("lv_name", ""), + "size_human": _lv_size_human(lv.get("lv_size", "")), + "used_human": None, "free_human": None, + "used_percent": None, "fstype": None, "mountpoint": None} + for lv in lvs_by_vg.get(vg_name, []) + ], + } + return result + + +# ── /home users ─────────────────────────────────────────────────────────────── + +def get_home_users(): + if not os.path.isdir("/home"): + return [] + out = run(["du", "--max-depth=1", "-b", "/home"]) + if out is None: + return None + entries = [] + for line in out.splitlines(): + parts = line.split("\t", 1) + if len(parts) != 2: + continue + path = parts[1].strip() + if path.rstrip("/") == "/home": + continue + try: + size = int(parts[0]) + user = os.path.basename(path.rstrip("/")) + entries.append({ + "user": user, + "size_bytes": size, + "size_human": bytes_human(size), + }) + except ValueError: + continue + return sorted(entries, key=lambda x: x["size_bytes"], reverse=True) + + +# ── Proxmox ─────────────────────────────────────────────────────────────────── + +def get_proxmox_role(dev_name): + zpool_out = run(["zpool", "status", "-P"]) + if zpool_out and f"/dev/{dev_name}" in zpool_out: + return "zfs_pool" + ceph_dir = "/var/lib/ceph/osd" + if os.path.isdir(ceph_dir): + try: + for entry in os.listdir(ceph_dir): + link = os.path.join(ceph_dir, entry, "block") + if os.path.islink(link) and dev_name in os.path.realpath(link): + return "ceph_osd" + except OSError: + pass + return None + + +# ── Construction des partitions ─────────────────────────────────────────────── + +def _enrich_lv_from_df(lv, df_map): + lv_path = f"/dev/{lv.get('_vg_name', '')}/{lv['lv_name']}" + for mp, df in df_map.items(): + pass + return lv + +def build_partitions(children, df_map, lvm_map, home_done): + parts = [] + for child in (children or []): + if child.get("type") not in ("part", "lvm"): + continue + name = child.get("name", "") + fstype = child.get("fstype") or None + mountpoint = child.get("mountpoint") or None + + if fstype == "squashfs": # snap Ubuntu — ignorer + continue + + size_b = child.get("size") + part = { + "name": name, + "uuid": child.get("uuid") or None, + "fstype": fstype, + "size_bytes": size_b, + "size_human": bytes_human(size_b), + "used_bytes": None, "used_human": None, + "free_bytes": None, "free_human": None, + "used_percent": None, + "mountpoint": mountpoint, + "home_users": None, + "lvm": None, + } + + # Espace via df + if mountpoint and mountpoint in df_map: + df = df_map[mountpoint] + part["used_bytes"] = df["used_bytes"] + part["used_human"] = bytes_human(df["used_bytes"]) + part["free_bytes"] = df["free_bytes"] + part["free_human"] = bytes_human(df["free_bytes"]) + if df["size_bytes"] > 0: + part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100) + + # /home users sur cette partition + if mountpoint == "/home" and not home_done[0]: + part["home_users"] = get_home_users() + home_done[0] = True + + # LVM : mapping PV → VG + LVs + dev_path = f"/dev/{name}" + if fstype == "LVM2_member" and dev_path in lvm_map: + lvm_info = lvm_map[dev_path] + lvs = [dict(lv) for lv in lvm_info["logical_volumes"]] + # Enrichir les LVs avec df si leurs enfants lsblk sont disponibles + lv_children = child.get("children") or [] + for lv_child in lv_children: + lv_mp = lv_child.get("mountpoint") or None + lv_name_raw = lv_child.get("name", "") + if not lv_mp: + continue + df = df_map.get(lv_mp) + for lv in lvs: + if lv["lv_name"] in lv_name_raw or lv_name_raw.endswith(lv["lv_name"]): + lv["mountpoint"] = lv_mp + lv["fstype"] = lv_child.get("fstype") or None + if df: + lv["used_human"] = bytes_human(df["used_bytes"]) + lv["free_human"] = bytes_human(df["free_bytes"]) + if df["size_bytes"] > 0: + lv["used_percent"] = int( + df["used_bytes"] / df["size_bytes"] * 100) + if lv_mp == "/home" and not home_done[0]: + lv["home_users"] = get_home_users() + home_done[0] = True + part["lvm"] = {"vg_name": lvm_info["vg_name"], "logical_volumes": lvs} + + parts.append(part) + return parts + + +# ── Collecte principale ─────────────────────────────────────────────────────── + +def collect(): + os_type, os_version = detect_os() + hostname = get_hostname() + ip = get_ip() + df_map = get_df_map() + lvm_map = get_lvm_map() + + output = run([ + "lsblk", "-J", "-b", + "-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME", + ]) + if not output: + print("[inventaire] Impossible de lister les disques via lsblk", file=sys.stderr) + sys.exit(1) + try: + blk = json.loads(output) + except json.JSONDecodeError as e: + print(f"[inventaire] Erreur parsing lsblk JSON: {e}", file=sys.stderr) + sys.exit(1) + + disks = [] + home_done = [False] + + for blkdev in blk.get("blockdevices", []): + if blkdev.get("type") != "disk": + continue + name = blkdev.get("name", "") + dev_path = f"/dev/{name}" + model = (blkdev.get("model") or "inconnu").strip() + serial = (blkdev.get("serial") or "inconnu").strip() + size_b = blkdev.get("size") or 0 + rota = blkdev.get("rota", "") + + partitions = build_partitions( + blkdev.get("children") or [], df_map, lvm_map, home_done) + + # /home sur partition racine si pas encore trouvé + if not home_done[0]: + for p in partitions: + if p.get("mountpoint") == "/": + p["home_users"] = get_home_users() + home_done[0] = True + break + + disk = { + "device": name, + "path": dev_path, + "by_id": get_by_id(dev_path), + "model": model, + "serial": serial, + "type": disk_type(name, rota), + "capacity_bytes": size_b, + "capacity_human": bytes_human(size_b), + "bus": get_bus(name), + "smart": get_smart(dev_path), + "partitions": partitions, + } + if os_type == "proxmox": + role = get_proxmox_role(name) + if role: + disk["proxmox_role"] = role + + disks.append(disk) + + return { + "hostname": hostname, + "ip": ip, + "os": os_type, + "os_version": os_version, + "collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(), + "disks": disks, + } + + +# ── HTTP POST ───────────────────────────────────────────────────────────────── + +def post_to_api(payload, api_url): + url = f"{api_url.rstrip('/')}/api/ingest" + data = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = urllib.request.Request( + url, data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + body = json.loads(resp.read()) + print( + f"[inventaire] OK — {body.get('accepted', '?')} disque(s) " + f"enregistré(s) pour {body.get('hostname', '?')}" + ) + except urllib.error.HTTPError as e: + print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr) + sys.exit(1) + except urllib.error.URLError as e: + print(f"[inventaire] Impossible de joindre {url}: {e.reason}", file=sys.stderr) + sys.exit(1) + + +# ── Entrypoint ──────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + if os.geteuid() != 0: + print("[inventaire] Ce script doit être exécuté en root (sudo).", file=sys.stderr) + sys.exit(1) + + args = parse_args() + api_url = f"http://{args.host}:{args.port}" + + print(f"[inventaire] Collecte en cours...", file=sys.stderr) + payload = collect() + + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + print(f"[inventaire] JSON sauvegardé : {args.output}", file=sys.stderr) + + if args.dry_run: + print_json(payload) + print(f"\n[inventaire] --dry-run : aucune donnée envoyée (cible: {api_url})", + file=sys.stderr) + elif args.debug: + print_json(payload) + post_to_api(payload, api_url) + else: + post_to_api(payload, api_url) diff --git a/tests/test_inventaire.py b/tests/test_inventaire.py new file mode 100644 index 0000000..5616cda --- /dev/null +++ b/tests/test_inventaire.py @@ -0,0 +1,234 @@ +import sys, os, json +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from unittest.mock import patch, MagicMock, mock_open, call +import inventaire + + +# ── parse_args ──────────────────────────────────────────────────────────────── + +def test_args_défauts(monkeypatch): + monkeypatch.delenv("MES_HDD_HOST", raising=False) + monkeypatch.delenv("MES_HDD_PORT", raising=False) + with patch("sys.argv", ["inventaire.py"]): + args = inventaire.parse_args() + assert args.host == "10.0.0.50" + assert args.port == 8088 + assert args.dry_run is False + assert args.debug is False + assert args.output is None + +def test_args_host_port(): + with patch("sys.argv", ["inventaire.py", "--host", "192.168.1.10", "--port", "9000"]): + args = inventaire.parse_args() + assert args.host == "192.168.1.10" + assert args.port == 9000 + +def test_args_dry_run(): + with patch("sys.argv", ["inventaire.py", "-n"]): + args = inventaire.parse_args() + assert args.dry_run is True + +def test_args_debug(): + with patch("sys.argv", ["inventaire.py", "--debug"]): + args = inventaire.parse_args() + assert args.debug is True + +def test_args_output(): + with patch("sys.argv", ["inventaire.py", "--output", "/tmp/inv.json"]): + args = inventaire.parse_args() + assert args.output == "/tmp/inv.json" + +def test_args_depuis_env(monkeypatch): + monkeypatch.setenv("MES_HDD_HOST", "10.0.0.99") + monkeypatch.setenv("MES_HDD_PORT", "9999") + with patch("sys.argv", ["inventaire.py"]): + args = inventaire.parse_args() + assert args.host == "10.0.0.99" + assert args.port == 9999 + + +# ── detect_os ───────────────────────────────────────────────────────────────── + +def test_detect_proxmox_via_variant(): + content = 'ID=debian\nVARIANT_ID=proxmox\nVERSION_ID="8.2"\n' + with patch("builtins.open", mock_open(read_data=content)): + with patch("os.path.isdir", return_value=False): + os_type, version = inventaire.detect_os() + assert os_type == "proxmox" + assert version == "8.2" + +def test_detect_proxmox_via_pve_dir(): + content = 'ID=debian\nVERSION_ID="8.2"\n' + with patch("builtins.open", mock_open(read_data=content)): + with patch("os.path.isdir", side_effect=lambda p: "/etc/pve" in p): + os_type, _ = inventaire.detect_os() + assert os_type == "proxmox" + +def test_detect_ubuntu(): + content = 'ID=ubuntu\nVERSION_ID="22.04"\n' + with patch("builtins.open", mock_open(read_data=content)): + with patch("os.path.isdir", return_value=False): + os_type, version = inventaire.detect_os() + assert os_type == "ubuntu" + assert version == "22.04" + +def test_detect_debian(): + content = 'ID=debian\nVERSION_ID="12"\n' + with patch("builtins.open", mock_open(read_data=content)): + with patch("os.path.isdir", return_value=False): + os_type, version = inventaire.detect_os() + assert os_type == "debian" + +def test_detect_os_fichier_absent(): + with patch("builtins.open", side_effect=FileNotFoundError): + with patch("os.path.isdir", return_value=False): + os_type, version = inventaire.detect_os() + assert os_type == "debian" + assert version == "" + + +# ── bytes_human ─────────────────────────────────────────────────────────────── + +def test_bytes_human_to(): + assert inventaire.bytes_human(1_099_511_627_776) == "1.0 To" + +def test_bytes_human_go(): + assert inventaire.bytes_human(1_073_741_824) == "1.0 Go" + +def test_bytes_human_mo(): + assert inventaire.bytes_human(500 * 1024 * 1024) == "500.0 Mo" + +def test_bytes_human_none(): + assert inventaire.bytes_human(None) == "?" + + +# ── get_smart ───────────────────────────────────────────────────────────────── + +SMART_PASSED = """ +SMART overall-health self-assessment test result: PASSED +190 Airflow_Temperature_Cel 0x0022 073 060 045 Old_age Always - 27 +9 Power_On_Hours 0x0032 099 099 000 Old_age Always - 2847 +5 Reallocated_Sector_Ct 0x0033 100 100 036 Pre-fail Always - 0 +197 Current_Pending_Sector 0x0012 100 100 000 Old_age Always - 0 +198 Offline_Uncorrectable 0x0010 100 100 000 Old_age Offline - 0 +""" + +SMART_FAILED = "SMART overall-health self-assessment test result: FAILED!" + +SMART_WARN = """ +SMART overall-health self-assessment test result: PASSED +5 Reallocated_Sector_Ct 0x0033 100 100 036 Pre-fail Always - 3 +197 Current_Pending_Sector 0x0012 100 100 000 Old_age Always - 0 +198 Offline_Uncorrectable 0x0010 100 100 000 Old_age Offline - 0 +""" + +def test_smart_ok(): + with patch.object(inventaire, "run", return_value=SMART_PASSED): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "ok" + assert r["label"] == "Bon état" + assert r["power_on_hours"] == 2847 + assert r["temperature_c"] == 27 + assert r["reallocated_sectors"] == 0 + assert "2 847h" in r["detail"] + assert "27°C" in r["detail"] + assert "aucun secteur" in r["detail"] + +def test_smart_fail(): + with patch.object(inventaire, "run", return_value=SMART_FAILED): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "fail" + assert r["label"] == "Défaillance probable" + assert "remplacement" in r["detail"] + +def test_smart_warn_secteurs_réalloués(): + with patch.object(inventaire, "run", return_value=SMART_WARN): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "warn" + assert r["label"] == "Attention" + assert "réalloué" in r["detail"] + assert r["reallocated_sectors"] == 3 + +def test_smart_unavailable_commande_absente(): + with patch.object(inventaire, "run", return_value=None): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "unavailable" + assert r["label"] == "SMART indisponible" + +def test_smart_unavailable_résultat_inconnu(): + with patch.object(inventaire, "run", return_value="Device: some output without health"): + r = inventaire.get_smart("/dev/sda") + assert r["status"] == "unavailable" + + +# ── get_home_users ──────────────────────────────────────────────────────────── + +def test_home_users_triés_par_taille(): + du_output = "100000\t/home/alice\n400000\t/home/gilles\n850000000\t/home\n" + with patch.object(inventaire, "run", return_value=du_output): + with patch("os.path.isdir", return_value=True): + users = inventaire.get_home_users() + assert users[0]["user"] == "gilles" + assert users[0]["size_bytes"] == 400000 + assert users[1]["user"] == "alice" + assert len(users) == 2 # /home lui-même est filtré + +def test_home_users_home_absent(): + with patch("os.path.isdir", return_value=False): + users = inventaire.get_home_users() + assert users == [] + +def test_home_users_du_échoue(): + with patch("os.path.isdir", return_value=True): + with patch.object(inventaire, "run", return_value=None): + users = inventaire.get_home_users() + assert users is None + + +# ── post_to_api ─────────────────────────────────────────────────────────────── + +def test_post_to_api_succès(): + payload = {"hostname": "pve1", "disks": []} + response_body = json.dumps({"accepted": 0, "hostname": "pve1"}).encode() + mock_resp = MagicMock() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + mock_resp.read.return_value = response_body + + with patch("urllib.request.urlopen", return_value=mock_resp): + inventaire.post_to_api(payload, "http://10.0.0.50:8088") # pas d'exception = succès + +def test_post_to_api_http_error(): + import urllib.error + payload = {"hostname": "pve1", "disks": []} + with patch("urllib.request.urlopen", + side_effect=urllib.error.HTTPError("url", 500, "Server Error", {}, None)): + with patch("sys.exit") as mock_exit: + inventaire.post_to_api(payload, "http://10.0.0.50:8088") + mock_exit.assert_called_once_with(1) + +def test_post_to_api_connexion_refusée(): + import urllib.error + payload = {"hostname": "pve1", "disks": []} + with patch("urllib.request.urlopen", + side_effect=urllib.error.URLError("Connection refused")): + with patch("sys.exit") as mock_exit: + inventaire.post_to_api(payload, "http://10.0.0.50:8088") + mock_exit.assert_called_once_with(1) + + +# ── dry_run / debug output ──────────────────────────────────────────────────── + +def test_dry_run_naffiche_que_json(capsys): + payload = {"hostname": "pve1", "disks": [], "os": "debian"} + inventaire.print_json(payload) + captured = capsys.readouterr() + parsed = json.loads(captured.out) + assert parsed["hostname"] == "pve1" + +def test_dry_run_nenvoie_pas(capsys): + payload = {"hostname": "pve1", "disks": []} + with patch.object(inventaire, "post_to_api") as mock_post: + inventaire.print_json(payload) + mock_post.assert_not_called()