feat: script client inventaire.py (stdlib only, 28 tests)
CLI argparse : --host, --port, --dry-run/-n, --debug, --output Variables env : MES_HDD_HOST, MES_HDD_PORT Détection OS : proxmox/ubuntu/debian SMART en français : ok/warn/fail/unavailable avec détail lisible Partitions : fstype, UUID, espace, LVM, /home users Distribution : curl | sudo python3 - Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+541
@@ -0,0 +1,541 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
inventaire.py — Inventaire disques HDD/SSD/NVMe
|
||||
Exécuter en root : sudo python3 inventaire.py [options]
|
||||
Dépendances : stdlib uniquement (Python 3.9+)
|
||||
|
||||
Lancement à distance :
|
||||
curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py | sudo python3 -
|
||||
"""
|
||||
import argparse, json, os, re, subprocess, sys, urllib.request, urllib.error
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# ── CLI ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def parse_args():
|
||||
default_host = os.environ.get("MES_HDD_HOST", "10.0.0.50")
|
||||
default_port = int(os.environ.get("MES_HDD_PORT", "8088"))
|
||||
p = argparse.ArgumentParser(
|
||||
description="Inventaire disques HDD/SSD/NVMe → backend mes_hdd",
|
||||
epilog=(
|
||||
"Variables d'environnement :\n"
|
||||
" MES_HDD_HOST Hôte du serveur (défaut: 10.0.0.50)\n"
|
||||
" MES_HDD_PORT Port du serveur (défaut: 8088)\n\n"
|
||||
"Exemples :\n"
|
||||
" sudo python3 inventaire.py\n"
|
||||
" sudo python3 inventaire.py --dry-run\n"
|
||||
" sudo python3 inventaire.py --debug --output /tmp/inv.json\n"
|
||||
" sudo python3 inventaire.py --host 192.168.1.10 --port 9000\n"
|
||||
" curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py"
|
||||
" | sudo MES_HDD_HOST=10.0.0.50 python3 -\n"
|
||||
),
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
p.add_argument(
|
||||
"--host", default=default_host,
|
||||
help=f"Hôte du serveur backend (env MES_HDD_HOST, défaut: {default_host})",
|
||||
)
|
||||
p.add_argument(
|
||||
"--port", type=int, default=default_port,
|
||||
help=f"Port du serveur backend (env MES_HDD_PORT, défaut: {default_port})",
|
||||
)
|
||||
p.add_argument(
|
||||
"-n", "--dry-run", action="store_true",
|
||||
help="Affiche le JSON collecté sans envoyer au serveur",
|
||||
)
|
||||
p.add_argument(
|
||||
"--debug", action="store_true",
|
||||
help="Affiche le JSON ET envoie au serveur",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output", metavar="FICHIER",
|
||||
help="Sauvegarde le JSON dans un fichier (en plus de l'envoi)",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def run(cmd, default=None):
|
||||
try:
|
||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
return r.stdout.strip() if r.returncode == 0 else default
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
def bytes_human(n):
|
||||
if n is None:
|
||||
return "?"
|
||||
for unit in ("o", "Ko", "Mo", "Go", "To"):
|
||||
if n < 1024:
|
||||
return f"{n:.1f} {unit}"
|
||||
n /= 1024
|
||||
return f"{n:.1f} Po"
|
||||
|
||||
def print_json(payload):
|
||||
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
||||
|
||||
|
||||
# ── Détection OS ──────────────────────────────────────────────────────────────
|
||||
|
||||
def detect_os():
|
||||
info = {}
|
||||
try:
|
||||
with open("/etc/os-release") as f:
|
||||
for line in f:
|
||||
if "=" in line:
|
||||
k, v = line.strip().split("=", 1)
|
||||
info[k] = v.strip('"')
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
os_id = info.get("ID", "").lower()
|
||||
variant = info.get("VARIANT_ID", "").lower()
|
||||
version = info.get("VERSION_ID", "")
|
||||
if os.path.isdir("/etc/pve") or variant == "proxmox":
|
||||
return "proxmox", version
|
||||
if os_id == "ubuntu":
|
||||
return "ubuntu", version
|
||||
return "debian", version
|
||||
|
||||
|
||||
# ── Machine ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_hostname():
|
||||
return run(["hostname"], default="inconnu")
|
||||
|
||||
def get_ip():
|
||||
out = run(["ip", "route", "get", "1.1.1.1"])
|
||||
if out:
|
||||
m = re.search(r"src\s+(\S+)", out)
|
||||
if m:
|
||||
return m.group(1)
|
||||
out = run(["hostname", "-I"])
|
||||
if out:
|
||||
return out.split()[0]
|
||||
return "inconnu"
|
||||
|
||||
|
||||
# ── SMART ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _extract_attr(output, name):
|
||||
m = re.search(
|
||||
rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)",
|
||||
output,
|
||||
)
|
||||
return int(m.group(1)) if m else None
|
||||
|
||||
def _smart_unavailable(reason):
|
||||
return {
|
||||
"status": "unavailable", "label": "SMART indisponible", "detail": reason,
|
||||
"temperature_c": None, "power_on_hours": None,
|
||||
"reallocated_sectors": None, "pending_sectors": None,
|
||||
"uncorrectable_sectors": None,
|
||||
}
|
||||
|
||||
def get_smart(dev):
|
||||
out = run(["smartctl", "-H", "-A", "-i", dev])
|
||||
if out is None:
|
||||
return _smart_unavailable("smartctl absent ou accès refusé")
|
||||
|
||||
temp = (_extract_attr(out, "Temperature_Celsius")
|
||||
or _extract_attr(out, "Airflow_Temperature_Cel"))
|
||||
poh = _extract_attr(out, "Power_On_Hours")
|
||||
real = _extract_attr(out, "Reallocated_Sector_Ct")
|
||||
pend = _extract_attr(out, "Current_Pending_Sector")
|
||||
uncr = _extract_attr(out, "Offline_Uncorrectable")
|
||||
base = {
|
||||
"temperature_c": temp, "power_on_hours": poh,
|
||||
"reallocated_sectors": real, "pending_sectors": pend,
|
||||
"uncorrectable_sectors": uncr,
|
||||
}
|
||||
|
||||
if "FAILED!" in out:
|
||||
return {**base, "status": "fail", "label": "Défaillance probable",
|
||||
"detail": "Prévoir le remplacement du disque"}
|
||||
|
||||
if "PASSED" in out or "Passed" in out:
|
||||
issues = []
|
||||
if real and real > 0:
|
||||
issues.append(f"{real} secteur(s) réalloué(s)")
|
||||
if pend and pend > 0:
|
||||
issues.append(f"{pend} secteur(s) en attente")
|
||||
if uncr and uncr > 0:
|
||||
issues.append(f"{uncr} secteur(s) non corrigeable(s)")
|
||||
if issues:
|
||||
return {**base, "status": "warn", "label": "Attention",
|
||||
"detail": ", ".join(issues) + " — disque à surveiller"}
|
||||
parts = []
|
||||
if poh is not None:
|
||||
parts.append(f"{poh:,}h d'utilisation".replace(",", " "))
|
||||
if temp is not None:
|
||||
parts.append(f"{temp}°C")
|
||||
parts.append("aucun secteur défectueux")
|
||||
return {**base, "status": "ok", "label": "Bon état",
|
||||
"detail": " · ".join(parts)}
|
||||
|
||||
return _smart_unavailable("résultat SMART non interprétable")
|
||||
|
||||
|
||||
# ── Métadonnées disque ────────────────────────────────────────────────────────
|
||||
|
||||
def get_by_id(dev_path):
|
||||
out = run(["find", "/dev/disk/by-id", "-type", "l"])
|
||||
if not out:
|
||||
return None
|
||||
for link in out.splitlines():
|
||||
if "-part" in link:
|
||||
continue
|
||||
target = run(["readlink", "-f", link])
|
||||
if target == dev_path:
|
||||
return os.path.basename(link)
|
||||
return None
|
||||
|
||||
def get_bus(dev_name):
|
||||
if dev_name.startswith("nvme"):
|
||||
return "nvme"
|
||||
out = run(["udevadm", "info", "--query=property", f"--name=/dev/{dev_name}"])
|
||||
if out:
|
||||
m = re.search(r"^ID_BUS=(.+)$", out, re.MULTILINE)
|
||||
if m:
|
||||
return m.group(1).lower()
|
||||
return "inconnu"
|
||||
|
||||
def disk_type(name, rota):
|
||||
if name.startswith("nvme"):
|
||||
return "NVMe"
|
||||
if str(rota) == "1":
|
||||
return "HDD"
|
||||
if str(rota) == "0":
|
||||
return "SSD"
|
||||
return "inconnu"
|
||||
|
||||
|
||||
# ── Espace disque (df) ────────────────────────────────────────────────────────
|
||||
|
||||
def get_df_map():
|
||||
out = run(["df", "--output=target,size,used,avail", "-B1"])
|
||||
result = {}
|
||||
if not out:
|
||||
return result
|
||||
for line in out.splitlines()[1:]:
|
||||
parts = line.split()
|
||||
if len(parts) < 4:
|
||||
continue
|
||||
try:
|
||||
result[parts[0]] = {
|
||||
"size_bytes": int(parts[1]),
|
||||
"used_bytes": int(parts[2]),
|
||||
"free_bytes": int(parts[3]),
|
||||
}
|
||||
except ValueError:
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
# ── LVM ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _lv_size_human(s):
|
||||
if not s:
|
||||
return "?"
|
||||
s = s.strip()
|
||||
unit_map = {"k": "Ko", "m": "Mo", "g": "Go", "t": "To"}
|
||||
if s and s[-1].lower() in unit_map:
|
||||
try:
|
||||
return f"{float(s[:-1]):.1f} {unit_map[s[-1].lower()]}"
|
||||
except ValueError:
|
||||
pass
|
||||
return s
|
||||
|
||||
def get_lvm_map():
|
||||
pvs_out = run(["pvs", "--noheadings", "--reportformat", "json",
|
||||
"-o", "pv_name,vg_name"])
|
||||
if not pvs_out:
|
||||
return {}
|
||||
try:
|
||||
pvs = json.loads(pvs_out)["report"][0]["pv"]
|
||||
except (json.JSONDecodeError, KeyError, IndexError):
|
||||
return {}
|
||||
lvs_by_vg = {}
|
||||
lvs_out = run(["lvs", "--noheadings", "--reportformat", "json",
|
||||
"-o", "lv_name,vg_name,lv_size,lv_path"])
|
||||
if lvs_out:
|
||||
try:
|
||||
for lv in json.loads(lvs_out)["report"][0]["lv"]:
|
||||
lvs_by_vg.setdefault(lv.get("vg_name", ""), []).append(lv)
|
||||
except (json.JSONDecodeError, KeyError, IndexError):
|
||||
pass
|
||||
result = {}
|
||||
for pv in pvs:
|
||||
pv_name = pv.get("pv_name", "")
|
||||
vg_name = pv.get("vg_name", "")
|
||||
if pv_name and vg_name:
|
||||
result[pv_name] = {
|
||||
"vg_name": vg_name,
|
||||
"logical_volumes": [
|
||||
{"lv_name": lv.get("lv_name", ""),
|
||||
"size_human": _lv_size_human(lv.get("lv_size", "")),
|
||||
"used_human": None, "free_human": None,
|
||||
"used_percent": None, "fstype": None, "mountpoint": None}
|
||||
for lv in lvs_by_vg.get(vg_name, [])
|
||||
],
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
# ── /home users ───────────────────────────────────────────────────────────────
|
||||
|
||||
def get_home_users():
|
||||
if not os.path.isdir("/home"):
|
||||
return []
|
||||
out = run(["du", "--max-depth=1", "-b", "/home"])
|
||||
if out is None:
|
||||
return None
|
||||
entries = []
|
||||
for line in out.splitlines():
|
||||
parts = line.split("\t", 1)
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
path = parts[1].strip()
|
||||
if path.rstrip("/") == "/home":
|
||||
continue
|
||||
try:
|
||||
size = int(parts[0])
|
||||
user = os.path.basename(path.rstrip("/"))
|
||||
entries.append({
|
||||
"user": user,
|
||||
"size_bytes": size,
|
||||
"size_human": bytes_human(size),
|
||||
})
|
||||
except ValueError:
|
||||
continue
|
||||
return sorted(entries, key=lambda x: x["size_bytes"], reverse=True)
|
||||
|
||||
|
||||
# ── Proxmox ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_proxmox_role(dev_name):
|
||||
zpool_out = run(["zpool", "status", "-P"])
|
||||
if zpool_out and f"/dev/{dev_name}" in zpool_out:
|
||||
return "zfs_pool"
|
||||
ceph_dir = "/var/lib/ceph/osd"
|
||||
if os.path.isdir(ceph_dir):
|
||||
try:
|
||||
for entry in os.listdir(ceph_dir):
|
||||
link = os.path.join(ceph_dir, entry, "block")
|
||||
if os.path.islink(link) and dev_name in os.path.realpath(link):
|
||||
return "ceph_osd"
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
# ── Construction des partitions ───────────────────────────────────────────────
|
||||
|
||||
def _enrich_lv_from_df(lv, df_map):
|
||||
lv_path = f"/dev/{lv.get('_vg_name', '')}/{lv['lv_name']}"
|
||||
for mp, df in df_map.items():
|
||||
pass
|
||||
return lv
|
||||
|
||||
def build_partitions(children, df_map, lvm_map, home_done):
|
||||
parts = []
|
||||
for child in (children or []):
|
||||
if child.get("type") not in ("part", "lvm"):
|
||||
continue
|
||||
name = child.get("name", "")
|
||||
fstype = child.get("fstype") or None
|
||||
mountpoint = child.get("mountpoint") or None
|
||||
|
||||
if fstype == "squashfs": # snap Ubuntu — ignorer
|
||||
continue
|
||||
|
||||
size_b = child.get("size")
|
||||
part = {
|
||||
"name": name,
|
||||
"uuid": child.get("uuid") or None,
|
||||
"fstype": fstype,
|
||||
"size_bytes": size_b,
|
||||
"size_human": bytes_human(size_b),
|
||||
"used_bytes": None, "used_human": None,
|
||||
"free_bytes": None, "free_human": None,
|
||||
"used_percent": None,
|
||||
"mountpoint": mountpoint,
|
||||
"home_users": None,
|
||||
"lvm": None,
|
||||
}
|
||||
|
||||
# Espace via df
|
||||
if mountpoint and mountpoint in df_map:
|
||||
df = df_map[mountpoint]
|
||||
part["used_bytes"] = df["used_bytes"]
|
||||
part["used_human"] = bytes_human(df["used_bytes"])
|
||||
part["free_bytes"] = df["free_bytes"]
|
||||
part["free_human"] = bytes_human(df["free_bytes"])
|
||||
if df["size_bytes"] > 0:
|
||||
part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100)
|
||||
|
||||
# /home users sur cette partition
|
||||
if mountpoint == "/home" and not home_done[0]:
|
||||
part["home_users"] = get_home_users()
|
||||
home_done[0] = True
|
||||
|
||||
# LVM : mapping PV → VG + LVs
|
||||
dev_path = f"/dev/{name}"
|
||||
if fstype == "LVM2_member" and dev_path in lvm_map:
|
||||
lvm_info = lvm_map[dev_path]
|
||||
lvs = [dict(lv) for lv in lvm_info["logical_volumes"]]
|
||||
# Enrichir les LVs avec df si leurs enfants lsblk sont disponibles
|
||||
lv_children = child.get("children") or []
|
||||
for lv_child in lv_children:
|
||||
lv_mp = lv_child.get("mountpoint") or None
|
||||
lv_name_raw = lv_child.get("name", "")
|
||||
if not lv_mp:
|
||||
continue
|
||||
df = df_map.get(lv_mp)
|
||||
for lv in lvs:
|
||||
if lv["lv_name"] in lv_name_raw or lv_name_raw.endswith(lv["lv_name"]):
|
||||
lv["mountpoint"] = lv_mp
|
||||
lv["fstype"] = lv_child.get("fstype") or None
|
||||
if df:
|
||||
lv["used_human"] = bytes_human(df["used_bytes"])
|
||||
lv["free_human"] = bytes_human(df["free_bytes"])
|
||||
if df["size_bytes"] > 0:
|
||||
lv["used_percent"] = int(
|
||||
df["used_bytes"] / df["size_bytes"] * 100)
|
||||
if lv_mp == "/home" and not home_done[0]:
|
||||
lv["home_users"] = get_home_users()
|
||||
home_done[0] = True
|
||||
part["lvm"] = {"vg_name": lvm_info["vg_name"], "logical_volumes": lvs}
|
||||
|
||||
parts.append(part)
|
||||
return parts
|
||||
|
||||
|
||||
# ── Collecte principale ───────────────────────────────────────────────────────
|
||||
|
||||
def collect():
|
||||
os_type, os_version = detect_os()
|
||||
hostname = get_hostname()
|
||||
ip = get_ip()
|
||||
df_map = get_df_map()
|
||||
lvm_map = get_lvm_map()
|
||||
|
||||
output = run([
|
||||
"lsblk", "-J", "-b",
|
||||
"-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME",
|
||||
])
|
||||
if not output:
|
||||
print("[inventaire] Impossible de lister les disques via lsblk", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
try:
|
||||
blk = json.loads(output)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"[inventaire] Erreur parsing lsblk JSON: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
disks = []
|
||||
home_done = [False]
|
||||
|
||||
for blkdev in blk.get("blockdevices", []):
|
||||
if blkdev.get("type") != "disk":
|
||||
continue
|
||||
name = blkdev.get("name", "")
|
||||
dev_path = f"/dev/{name}"
|
||||
model = (blkdev.get("model") or "inconnu").strip()
|
||||
serial = (blkdev.get("serial") or "inconnu").strip()
|
||||
size_b = blkdev.get("size") or 0
|
||||
rota = blkdev.get("rota", "")
|
||||
|
||||
partitions = build_partitions(
|
||||
blkdev.get("children") or [], df_map, lvm_map, home_done)
|
||||
|
||||
# /home sur partition racine si pas encore trouvé
|
||||
if not home_done[0]:
|
||||
for p in partitions:
|
||||
if p.get("mountpoint") == "/":
|
||||
p["home_users"] = get_home_users()
|
||||
home_done[0] = True
|
||||
break
|
||||
|
||||
disk = {
|
||||
"device": name,
|
||||
"path": dev_path,
|
||||
"by_id": get_by_id(dev_path),
|
||||
"model": model,
|
||||
"serial": serial,
|
||||
"type": disk_type(name, rota),
|
||||
"capacity_bytes": size_b,
|
||||
"capacity_human": bytes_human(size_b),
|
||||
"bus": get_bus(name),
|
||||
"smart": get_smart(dev_path),
|
||||
"partitions": partitions,
|
||||
}
|
||||
if os_type == "proxmox":
|
||||
role = get_proxmox_role(name)
|
||||
if role:
|
||||
disk["proxmox_role"] = role
|
||||
|
||||
disks.append(disk)
|
||||
|
||||
return {
|
||||
"hostname": hostname,
|
||||
"ip": ip,
|
||||
"os": os_type,
|
||||
"os_version": os_version,
|
||||
"collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(),
|
||||
"disks": disks,
|
||||
}
|
||||
|
||||
|
||||
# ── HTTP POST ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def post_to_api(payload, api_url):
|
||||
url = f"{api_url.rstrip('/')}/api/ingest"
|
||||
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
url, data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
body = json.loads(resp.read())
|
||||
print(
|
||||
f"[inventaire] OK — {body.get('accepted', '?')} disque(s) "
|
||||
f"enregistré(s) pour {body.get('hostname', '?')}"
|
||||
)
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except urllib.error.URLError as e:
|
||||
print(f"[inventaire] Impossible de joindre {url}: {e.reason}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ── Entrypoint ────────────────────────────────────────────────────────────────
|
||||
|
||||
if __name__ == "__main__":
|
||||
if os.geteuid() != 0:
|
||||
print("[inventaire] Ce script doit être exécuté en root (sudo).", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
args = parse_args()
|
||||
api_url = f"http://{args.host}:{args.port}"
|
||||
|
||||
print(f"[inventaire] Collecte en cours...", file=sys.stderr)
|
||||
payload = collect()
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w", encoding="utf-8") as f:
|
||||
json.dump(payload, f, ensure_ascii=False, indent=2)
|
||||
print(f"[inventaire] JSON sauvegardé : {args.output}", file=sys.stderr)
|
||||
|
||||
if args.dry_run:
|
||||
print_json(payload)
|
||||
print(f"\n[inventaire] --dry-run : aucune donnée envoyée (cible: {api_url})",
|
||||
file=sys.stderr)
|
||||
elif args.debug:
|
||||
print_json(payload)
|
||||
post_to_api(payload, api_url)
|
||||
else:
|
||||
post_to_api(payload, api_url)
|
||||
@@ -0,0 +1,234 @@
|
||||
import sys, os, json
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
from unittest.mock import patch, MagicMock, mock_open, call
|
||||
import inventaire
|
||||
|
||||
|
||||
# ── parse_args ────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_args_défauts(monkeypatch):
|
||||
monkeypatch.delenv("MES_HDD_HOST", raising=False)
|
||||
monkeypatch.delenv("MES_HDD_PORT", raising=False)
|
||||
with patch("sys.argv", ["inventaire.py"]):
|
||||
args = inventaire.parse_args()
|
||||
assert args.host == "10.0.0.50"
|
||||
assert args.port == 8088
|
||||
assert args.dry_run is False
|
||||
assert args.debug is False
|
||||
assert args.output is None
|
||||
|
||||
def test_args_host_port():
|
||||
with patch("sys.argv", ["inventaire.py", "--host", "192.168.1.10", "--port", "9000"]):
|
||||
args = inventaire.parse_args()
|
||||
assert args.host == "192.168.1.10"
|
||||
assert args.port == 9000
|
||||
|
||||
def test_args_dry_run():
|
||||
with patch("sys.argv", ["inventaire.py", "-n"]):
|
||||
args = inventaire.parse_args()
|
||||
assert args.dry_run is True
|
||||
|
||||
def test_args_debug():
|
||||
with patch("sys.argv", ["inventaire.py", "--debug"]):
|
||||
args = inventaire.parse_args()
|
||||
assert args.debug is True
|
||||
|
||||
def test_args_output():
|
||||
with patch("sys.argv", ["inventaire.py", "--output", "/tmp/inv.json"]):
|
||||
args = inventaire.parse_args()
|
||||
assert args.output == "/tmp/inv.json"
|
||||
|
||||
def test_args_depuis_env(monkeypatch):
|
||||
monkeypatch.setenv("MES_HDD_HOST", "10.0.0.99")
|
||||
monkeypatch.setenv("MES_HDD_PORT", "9999")
|
||||
with patch("sys.argv", ["inventaire.py"]):
|
||||
args = inventaire.parse_args()
|
||||
assert args.host == "10.0.0.99"
|
||||
assert args.port == 9999
|
||||
|
||||
|
||||
# ── detect_os ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_detect_proxmox_via_variant():
|
||||
content = 'ID=debian\nVARIANT_ID=proxmox\nVERSION_ID="8.2"\n'
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
with patch("os.path.isdir", return_value=False):
|
||||
os_type, version = inventaire.detect_os()
|
||||
assert os_type == "proxmox"
|
||||
assert version == "8.2"
|
||||
|
||||
def test_detect_proxmox_via_pve_dir():
|
||||
content = 'ID=debian\nVERSION_ID="8.2"\n'
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
with patch("os.path.isdir", side_effect=lambda p: "/etc/pve" in p):
|
||||
os_type, _ = inventaire.detect_os()
|
||||
assert os_type == "proxmox"
|
||||
|
||||
def test_detect_ubuntu():
|
||||
content = 'ID=ubuntu\nVERSION_ID="22.04"\n'
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
with patch("os.path.isdir", return_value=False):
|
||||
os_type, version = inventaire.detect_os()
|
||||
assert os_type == "ubuntu"
|
||||
assert version == "22.04"
|
||||
|
||||
def test_detect_debian():
|
||||
content = 'ID=debian\nVERSION_ID="12"\n'
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
with patch("os.path.isdir", return_value=False):
|
||||
os_type, version = inventaire.detect_os()
|
||||
assert os_type == "debian"
|
||||
|
||||
def test_detect_os_fichier_absent():
|
||||
with patch("builtins.open", side_effect=FileNotFoundError):
|
||||
with patch("os.path.isdir", return_value=False):
|
||||
os_type, version = inventaire.detect_os()
|
||||
assert os_type == "debian"
|
||||
assert version == ""
|
||||
|
||||
|
||||
# ── bytes_human ───────────────────────────────────────────────────────────────
|
||||
|
||||
def test_bytes_human_to():
|
||||
assert inventaire.bytes_human(1_099_511_627_776) == "1.0 To"
|
||||
|
||||
def test_bytes_human_go():
|
||||
assert inventaire.bytes_human(1_073_741_824) == "1.0 Go"
|
||||
|
||||
def test_bytes_human_mo():
|
||||
assert inventaire.bytes_human(500 * 1024 * 1024) == "500.0 Mo"
|
||||
|
||||
def test_bytes_human_none():
|
||||
assert inventaire.bytes_human(None) == "?"
|
||||
|
||||
|
||||
# ── get_smart ─────────────────────────────────────────────────────────────────
|
||||
|
||||
SMART_PASSED = """
|
||||
SMART overall-health self-assessment test result: PASSED
|
||||
190 Airflow_Temperature_Cel 0x0022 073 060 045 Old_age Always - 27
|
||||
9 Power_On_Hours 0x0032 099 099 000 Old_age Always - 2847
|
||||
5 Reallocated_Sector_Ct 0x0033 100 100 036 Pre-fail Always - 0
|
||||
197 Current_Pending_Sector 0x0012 100 100 000 Old_age Always - 0
|
||||
198 Offline_Uncorrectable 0x0010 100 100 000 Old_age Offline - 0
|
||||
"""
|
||||
|
||||
SMART_FAILED = "SMART overall-health self-assessment test result: FAILED!"
|
||||
|
||||
SMART_WARN = """
|
||||
SMART overall-health self-assessment test result: PASSED
|
||||
5 Reallocated_Sector_Ct 0x0033 100 100 036 Pre-fail Always - 3
|
||||
197 Current_Pending_Sector 0x0012 100 100 000 Old_age Always - 0
|
||||
198 Offline_Uncorrectable 0x0010 100 100 000 Old_age Offline - 0
|
||||
"""
|
||||
|
||||
def test_smart_ok():
|
||||
with patch.object(inventaire, "run", return_value=SMART_PASSED):
|
||||
r = inventaire.get_smart("/dev/sda")
|
||||
assert r["status"] == "ok"
|
||||
assert r["label"] == "Bon état"
|
||||
assert r["power_on_hours"] == 2847
|
||||
assert r["temperature_c"] == 27
|
||||
assert r["reallocated_sectors"] == 0
|
||||
assert "2 847h" in r["detail"]
|
||||
assert "27°C" in r["detail"]
|
||||
assert "aucun secteur" in r["detail"]
|
||||
|
||||
def test_smart_fail():
|
||||
with patch.object(inventaire, "run", return_value=SMART_FAILED):
|
||||
r = inventaire.get_smart("/dev/sda")
|
||||
assert r["status"] == "fail"
|
||||
assert r["label"] == "Défaillance probable"
|
||||
assert "remplacement" in r["detail"]
|
||||
|
||||
def test_smart_warn_secteurs_réalloués():
|
||||
with patch.object(inventaire, "run", return_value=SMART_WARN):
|
||||
r = inventaire.get_smart("/dev/sda")
|
||||
assert r["status"] == "warn"
|
||||
assert r["label"] == "Attention"
|
||||
assert "réalloué" in r["detail"]
|
||||
assert r["reallocated_sectors"] == 3
|
||||
|
||||
def test_smart_unavailable_commande_absente():
|
||||
with patch.object(inventaire, "run", return_value=None):
|
||||
r = inventaire.get_smart("/dev/sda")
|
||||
assert r["status"] == "unavailable"
|
||||
assert r["label"] == "SMART indisponible"
|
||||
|
||||
def test_smart_unavailable_résultat_inconnu():
|
||||
with patch.object(inventaire, "run", return_value="Device: some output without health"):
|
||||
r = inventaire.get_smart("/dev/sda")
|
||||
assert r["status"] == "unavailable"
|
||||
|
||||
|
||||
# ── get_home_users ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_home_users_triés_par_taille():
|
||||
du_output = "100000\t/home/alice\n400000\t/home/gilles\n850000000\t/home\n"
|
||||
with patch.object(inventaire, "run", return_value=du_output):
|
||||
with patch("os.path.isdir", return_value=True):
|
||||
users = inventaire.get_home_users()
|
||||
assert users[0]["user"] == "gilles"
|
||||
assert users[0]["size_bytes"] == 400000
|
||||
assert users[1]["user"] == "alice"
|
||||
assert len(users) == 2 # /home lui-même est filtré
|
||||
|
||||
def test_home_users_home_absent():
|
||||
with patch("os.path.isdir", return_value=False):
|
||||
users = inventaire.get_home_users()
|
||||
assert users == []
|
||||
|
||||
def test_home_users_du_échoue():
|
||||
with patch("os.path.isdir", return_value=True):
|
||||
with patch.object(inventaire, "run", return_value=None):
|
||||
users = inventaire.get_home_users()
|
||||
assert users is None
|
||||
|
||||
|
||||
# ── post_to_api ───────────────────────────────────────────────────────────────
|
||||
|
||||
def test_post_to_api_succès():
|
||||
payload = {"hostname": "pve1", "disks": []}
|
||||
response_body = json.dumps({"accepted": 0, "hostname": "pve1"}).encode()
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.__enter__ = MagicMock(return_value=mock_resp)
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
mock_resp.read.return_value = response_body
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=mock_resp):
|
||||
inventaire.post_to_api(payload, "http://10.0.0.50:8088") # pas d'exception = succès
|
||||
|
||||
def test_post_to_api_http_error():
|
||||
import urllib.error
|
||||
payload = {"hostname": "pve1", "disks": []}
|
||||
with patch("urllib.request.urlopen",
|
||||
side_effect=urllib.error.HTTPError("url", 500, "Server Error", {}, None)):
|
||||
with patch("sys.exit") as mock_exit:
|
||||
inventaire.post_to_api(payload, "http://10.0.0.50:8088")
|
||||
mock_exit.assert_called_once_with(1)
|
||||
|
||||
def test_post_to_api_connexion_refusée():
|
||||
import urllib.error
|
||||
payload = {"hostname": "pve1", "disks": []}
|
||||
with patch("urllib.request.urlopen",
|
||||
side_effect=urllib.error.URLError("Connection refused")):
|
||||
with patch("sys.exit") as mock_exit:
|
||||
inventaire.post_to_api(payload, "http://10.0.0.50:8088")
|
||||
mock_exit.assert_called_once_with(1)
|
||||
|
||||
|
||||
# ── dry_run / debug output ────────────────────────────────────────────────────
|
||||
|
||||
def test_dry_run_naffiche_que_json(capsys):
|
||||
payload = {"hostname": "pve1", "disks": [], "os": "debian"}
|
||||
inventaire.print_json(payload)
|
||||
captured = capsys.readouterr()
|
||||
parsed = json.loads(captured.out)
|
||||
assert parsed["hostname"] == "pve1"
|
||||
|
||||
def test_dry_run_nenvoie_pas(capsys):
|
||||
payload = {"hostname": "pve1", "disks": []}
|
||||
with patch.object(inventaire, "post_to_api") as mock_post:
|
||||
inventaire.print_json(payload)
|
||||
mock_post.assert_not_called()
|
||||
Reference in New Issue
Block a user