#!/usr/bin/env python3 """ inventaire.py - Inventaire disques HDD/SSD/NVMe Executer en root : sudo python3 inventaire.py [options] Dependances : stdlib uniquement (Python 3.9+) Lancement a distance : curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py | sudo python3 - """ import argparse, json, os, re, subprocess, sys, urllib.request, urllib.error from datetime import datetime, timezone _verbose = [False] # mutable : _verbose[0] = True pour activer def dprint(msg): if _verbose[0]: print(f"[debug] {msg}", file=sys.stderr) # -- CLI ---------------------------------------------------------------------- def parse_args(): default_host = os.environ.get("MES_HDD_HOST", "10.0.0.50") default_port = int(os.environ.get("MES_HDD_PORT", "8088")) p = argparse.ArgumentParser( description="Inventaire disques HDD/SSD/NVMe -> backend mes_hdd", epilog=( "Variables d'environnement :\n" " MES_HDD_HOST Hote du serveur (defaut: 10.0.0.50)\n" " MES_HDD_PORT Port du serveur (defaut: 8088)\n\n" "Exemples :\n" " sudo python3 inventaire.py\n" " sudo python3 inventaire.py --dry-run\n" " sudo python3 inventaire.py --debug --output /tmp/inv.json\n" " sudo python3 inventaire.py --host 192.168.1.10 --port 9000\n" " curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py" " | sudo MES_HDD_HOST=10.0.0.50 python3 -\n" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) p.add_argument( "--host", default=default_host, help=f"Hote du serveur backend (env MES_HDD_HOST, defaut: {default_host})", ) p.add_argument( "--port", type=int, default=default_port, help=f"Port du serveur backend (env MES_HDD_PORT, defaut: {default_port})", ) p.add_argument( "-n", "--dry-run", action="store_true", help="Affiche le JSON collecte sans envoyer au serveur", ) p.add_argument( "--debug", action="store_true", help="Mode verbeux : messages detailles + affiche JSON + envoie au serveur", ) p.add_argument( "--output", metavar="FICHIER", help="Sauvegarde le JSON dans un fichier (en plus de l'envoi)", ) return p.parse_args() # -- Helpers ------------------------------------------------------------------ def run(cmd, default=None): try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=15) if r.returncode != 0: dprint(f"echec {r.returncode}: {' '.join(str(c) for c in cmd)}" + (f" -> {r.stderr.strip()}" if r.stderr.strip() else "")) return default return r.stdout.strip() except Exception as e: dprint(f"exception: {' '.join(str(c) for c in cmd)} -> {e}") return default def bytes_human(n): if n is None: return "?" for unit in ("o", "Ko", "Mo", "Go", "To"): if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} Po" def print_json(payload): print(json.dumps(payload, ensure_ascii=False, indent=2)) # -- Detection OS ------------------------------------------------------------- def detect_os(): info = {} try: with open("/etc/os-release") as f: for line in f: if "=" in line: k, v = line.strip().split("=", 1) info[k] = v.strip('"') except FileNotFoundError: pass os_id = info.get("ID", "").lower() variant = info.get("VARIANT_ID", "").lower() version = info.get("VERSION_ID", "") if os.path.isdir("/etc/pve") or variant == "proxmox": return "proxmox", version if os_id == "ubuntu": return "ubuntu", version return "debian", version # -- Machine ------------------------------------------------------------------ def get_hostname(): return run(["hostname"], default="inconnu") def get_ip(): out = run(["ip", "route", "get", "1.1.1.1"]) if out: m = re.search(r"src\s+(\S+)", out) if m: return m.group(1) out = run(["hostname", "-I"]) if out: return out.split()[0] return "inconnu" # -- SMART -------------------------------------------------------------------- def _smart_unavailable(reason): return { "status": "unavailable", "label": "SMART indisponible", "detail": reason, "temperature_c": None, "power_on_hours": None, "reallocated_sectors": None, "pending_sectors": None, "uncorrectable_sectors": None, } def _extract_sata_attr(output, name): m = re.search( rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)", output, ) return int(m.group(1)) if m else None def _parse_sata_smart(out): temp = (_extract_sata_attr(out, "Temperature_Celsius") or _extract_sata_attr(out, "Airflow_Temperature_Cel")) poh = _extract_sata_attr(out, "Power_On_Hours") real = _extract_sata_attr(out, "Reallocated_Sector_Ct") pend = _extract_sata_attr(out, "Current_Pending_Sector") uncr = _extract_sata_attr(out, "Offline_Uncorrectable") base = {"temperature_c": temp, "power_on_hours": poh, "reallocated_sectors": real, "pending_sectors": pend, "uncorrectable_sectors": uncr} if "FAILED!" in out: return {**base, "status": "fail", "label": "Défaillance probable", "detail": "Prévoir le remplacement du disque"} if "PASSED" in out or "Passed" in out: issues = [] if real and real > 0: issues.append(f"{real} secteur(s) réalloué(s)") if pend and pend > 0: issues.append(f"{pend} secteur(s) en attente") if uncr and uncr > 0: issues.append(f"{uncr} secteur(s) non corrigeable(s)") if issues: return {**base, "status": "warn", "label": "Attention", "detail": ", ".join(issues) + " — disque à surveiller"} parts = [] if poh is not None: parts.append(f"{poh:,}h d'utilisation".replace(",", " ")) if temp is not None: parts.append(f"{temp}°C") parts.append("aucun secteur défectueux") return {**base, "status": "ok", "label": "Bon état", "detail": " · ".join(parts)} return _smart_unavailable("résultat SMART non interprétable") def _parse_nvme_smart(out): m = re.search(r"Temperature:\s+(\d+)\s+Celsius", out) temp = int(m.group(1)) if m else None m = re.search(r"Power On Hours:\s+([\d,]+)", out) poh = int(m.group(1).replace(",", "")) if m else None m = re.search(r"Media and Data Integrity Errors:\s+(\d+)", out) media_err = int(m.group(1)) if m else None m = re.search(r"Available Spare:\s+(\d+)%", out) spare = int(m.group(1)) if m else None m = re.search(r"Percentage Used:\s+(\d+)%", out) pct_used = int(m.group(1)) if m else None base = {"temperature_c": temp, "power_on_hours": poh, "reallocated_sectors": None, "pending_sectors": None, "uncorrectable_sectors": media_err} if "FAILED!" in out: return {**base, "status": "fail", "label": "Défaillance probable", "detail": "Prévoir le remplacement du disque"} if "PASSED" in out or "Passed" in out: issues = [] if spare is not None and spare < 20: issues.append(f"espace de réserve faible ({spare}%)") if pct_used is not None and pct_used > 80: issues.append(f"usure avancée ({pct_used}% de durée de vie)") if media_err and media_err > 0: issues.append(f"{media_err} erreur(s) d'intégrité") if issues: return {**base, "status": "warn", "label": "Attention", "detail": ", ".join(issues) + " — disque à surveiller"} parts = [] if poh is not None: parts.append(f"{poh:,}h d'utilisation".replace(",", " ")) if temp is not None: parts.append(f"{temp}°C") if pct_used is not None: parts.append(f"usure : {pct_used}%") parts.append("aucune erreur NVMe détectée") return {**base, "status": "ok", "label": "Bon état", "detail": " · ".join(parts)} return _smart_unavailable("résultat SMART NVMe non interprétable") def get_smart(dev): dprint(f"lecture SMART {dev}...") out = run(["smartctl", "-H", "-A", "-i", dev]) if out is None: dprint(f"SMART {dev} : smartctl indisponible ou acces refuse") return _smart_unavailable("smartctl absent ou acces refuse") is_nvme = bool(re.search(r"NVMe|Transport protocol:\s*NVMe", out, re.IGNORECASE)) dprint(f"SMART {dev} : protocole {'NVMe' if is_nvme else 'SATA/SAS'}") result = _parse_nvme_smart(out) if is_nvme else _parse_sata_smart(out) dprint(f"SMART {dev} : {result['status']} - {result['detail']}") return result # -- Metadonnees disque ------------------------------------------------------- def get_by_id(dev_path): out = run(["find", "/dev/disk/by-id", "-type", "l"]) if not out: return None for link in out.splitlines(): if "-part" in link: continue target = run(["readlink", "-f", link]) if target == dev_path: return os.path.basename(link) return None def get_bus(dev_name): if dev_name.startswith("nvme"): return "nvme" out = run(["udevadm", "info", "--query=property", f"--name=/dev/{dev_name}"]) if out: m = re.search(r"^ID_BUS=(.+)$", out, re.MULTILINE) if m: return m.group(1).lower() return "inconnu" def disk_type(name, rota): if name.startswith("nvme"): return "NVMe" # lsblk renvoie rota en entier (0/1) ou booléen (False/True) selon la version if rota in (1, True, "1"): return "HDD" if rota in (0, False, "0"): return "SSD" return "inconnu" # -- Espace disque (df) ------------------------------------------------------- def get_df_map(): dprint("lecture espace disques via df...") # --local evite les montages réseau/NFS qui peuvent bloquer (ex: Proxmox) out = run(["df", "--output=target,size,used,avail", "-B1", "--local"]) result = {} if not out: return result for line in out.splitlines()[1:]: parts = line.split() if len(parts) < 4: continue try: result[parts[0]] = { "size_bytes": int(parts[1]), "used_bytes": int(parts[2]), "free_bytes": int(parts[3]), } except ValueError: pass dprint(f"df : {len(result)} point(s) de montage") return result # -- LVM ---------------------------------------------------------------------- def _lv_size_human(s): if not s: return "?" s = s.strip() unit_map = {"k": "Ko", "m": "Mo", "g": "Go", "t": "To"} if s and s[-1].lower() in unit_map: try: return f"{float(s[:-1]):.1f} {unit_map[s[-1].lower()]}" except ValueError: pass return s def get_lvm_map(): dprint("detection LVM...") pvs_out = run(["pvs", "--noheadings", "--reportformat", "json", "-o", "pv_name,vg_name"]) if not pvs_out: dprint("LVM : aucun volume physique detecte (pvs indisponible ou absent)") return {} try: pvs = json.loads(pvs_out)["report"][0]["pv"] except (json.JSONDecodeError, KeyError, IndexError): return {} lvs_by_vg = {} lvs_out = run(["lvs", "--noheadings", "--reportformat", "json", "-o", "lv_name,vg_name,lv_size,lv_path,lv_dm_path,data_percent,lv_attr"]) if lvs_out: try: for lv in json.loads(lvs_out)["report"][0]["lv"]: lvs_by_vg.setdefault(lv.get("vg_name", ""), []).append(lv) except (json.JSONDecodeError, KeyError, IndexError): pass result = {} for pv in pvs: pv_name = pv.get("pv_name", "") vg_name = pv.get("vg_name", "") if pv_name and vg_name: result[pv_name] = { "vg_name": vg_name, "logical_volumes": [_format_lv(lv) for lv in lvs_by_vg.get(vg_name, [])], } dprint(f"LVM : {len(result)} volume(s) physique(s)") return result def _format_lv(lv): name = lv.get("lv_name", "") dm_path = lv.get("lv_dm_path", "") or lv.get("lv_path", "") or "" attr = lv.get("lv_attr", "") entry = { "lv_name": name, "size_human": _lv_size_human(lv.get("lv_size", "")), "used_human": None, "free_human": None, "used_percent": None, "fstype": None, "mountpoint": None, "thin_pool_used_percent": None, } # Thin pool : data_percent indique le remplissage du pool dp = lv.get("data_percent", "") if dp and dp.strip() not in ("", "0.00"): try: entry["thin_pool_used_percent"] = round(float(dp), 1) dprint(f" LV {name} thin pool : {entry['thin_pool_used_percent']}% utilise") except ValueError: pass # LV ext4 non monté → lecture offline via tune2fs # (attr[0] 'V'/'v' = thin vol, 't'/'T' = thin pool; les deux ont data_percent) is_pool = attr[:1].lower() in ("t",) if not is_pool and dm_path: space = _ext4_space(dm_path) if space: entry["used_human"] = bytes_human(space["used_bytes"]) entry["free_human"] = bytes_human(space["free_bytes"]) entry["used_percent"] = space["used_percent"] dprint(f" LV {name} offline : {entry['used_human']} utilise ({entry['used_percent']}%)") return entry # -- /home users -------------------------------------------------------------- def get_home_users(): if not os.path.isdir("/home"): dprint("/home absent") return [] dprint("calcul taille /home par utilisateur...") out = run(["du", "-d", "1", "-b", "/home"]) if out is None: dprint("/home : echec de du, essai avec --max-depth...") out = run(["du", "--max-depth=1", "-b", "/home"]) if out is None: dprint("/home : impossible de calculer les tailles") return None entries = [] for line in out.splitlines(): parts = line.split("\t", 1) if len(parts) != 2: continue path = parts[1].strip() if path.rstrip("/") == "/home": continue try: size = int(parts[0]) user = os.path.basename(path.rstrip("/")) entries.append({ "user": user, "size_bytes": size, "size_human": bytes_human(size), }) except ValueError: continue entries = sorted(entries, key=lambda x: x["size_bytes"], reverse=True) dprint(f"/home : {len(entries)} utilisateur(s) : " + ", ".join(f"{e['user']} ({e['size_human']})" for e in entries)) return entries # -- Proxmox ------------------------------------------------------------------ def get_proxmox_role(dev_name): zpool_out = run(["zpool", "status", "-P"]) if zpool_out and f"/dev/{dev_name}" in zpool_out: return "zfs_pool" ceph_dir = "/var/lib/ceph/osd" if os.path.isdir(ceph_dir): try: for entry in os.listdir(ceph_dir): link = os.path.join(ceph_dir, entry, "block") if os.path.islink(link) and dev_name in os.path.realpath(link): return "ceph_osd" except OSError: pass return None # -- Espace offline (partitions non montées) ---------------------------------- def get_offline_space(dev, fstype): """Lit l'espace d'une partition non montée selon son type de filesystem.""" if fstype in ("ext2", "ext3", "ext4"): return _ext4_space(dev) if fstype == "ntfs": return _ntfs_space(dev) if fstype == "btrfs": return _btrfs_space(dev) return None def _ext4_space(dev): out = run(["tune2fs", "-l", dev]) if not out: return None vals = {} for line in out.splitlines(): for key in ("Block count", "Free blocks", "Block size"): if line.startswith(key + ":"): try: vals[key] = int(line.split(":", 1)[1].strip().replace(",", "")) except ValueError: pass try: total = vals["Block count"] * vals["Block size"] free = vals["Free blocks"] * vals["Block size"] used = total - free return {"size_bytes": total, "used_bytes": used, "free_bytes": free, "used_percent": int(used / total * 100) if total else 0} except KeyError: return None def _ntfs_space(dev): try: r = subprocess.run( ["ntfsresize", "--info", "--force", dev], capture_output=True, text=True, timeout=15, ) combined = r.stdout + r.stderr if r.returncode != 0: locked_markers = ("hibernated", "fast restart", "Windows is", "hiberfil", "unclean", "Refusing to operate") if any(m in combined for m in locked_markers): note = "Volume NTFS verrouillé — Windows en veille ou Fast Startup actif" else: note = "Lecture NTFS impossible (ntfsresize indisponible ou erreur)" dprint(f"NTFS {dev} : {note}") return {"offline_note": note} total = used = None for line in combined.splitlines(): if "Current volume size:" in line: m = re.search(r"(\d+)\s+bytes", line) if m: total = int(m.group(1)) if "You might resize at" in line: m = re.search(r"(\d+)\s+bytes", line) if m: used = int(m.group(1)) if total and used: return {"size_bytes": total, "used_bytes": used, "free_bytes": total - used, "used_percent": int(used / total * 100) if total else 0} return {"offline_note": "Taille NTFS non déterminée"} except FileNotFoundError: return {"offline_note": "ntfsresize non installé — sudo apt install ntfs-3g"} except Exception as e: return {"offline_note": f"Erreur lecture NTFS : {e}"} def _btrfs_space(dev): out = run(["btrfs", "filesystem", "show", dev]) if not out: return None # "Total devices 1 FS bytes used 10.00GiB" m = re.search(r"FS bytes used\s+([\d.]+)(\w+)", out) if m: # approximation seulement, pas de total fiable sans montage dprint(f"btrfs {dev} : espace utilisé approximatif") return None # trop imprecis sans montage # -- Construction des partitions ---------------------------------------------- def build_partitions(children, df_map, lvm_map, home_done): parts = [] for child in (children or []): if child.get("type") not in ("part", "lvm"): continue name = child.get("name", "") fstype = child.get("fstype") or None mountpoint = child.get("mountpoint") or None if fstype == "squashfs": # snap Ubuntu - ignorer continue size_b = child.get("size") part = { "name": name, "uuid": child.get("uuid") or None, "fstype": fstype, "size_bytes": size_b, "size_human": bytes_human(size_b), "used_bytes": None, "used_human": None, "free_bytes": None, "free_human": None, "used_percent": None, "mountpoint": mountpoint, "offline_note": None, "home_users": None, "lvm": None, } # Espace via df if mountpoint and mountpoint in df_map: df = df_map[mountpoint] part["used_bytes"] = df["used_bytes"] part["used_human"] = bytes_human(df["used_bytes"]) part["free_bytes"] = df["free_bytes"] part["free_human"] = bytes_human(df["free_bytes"]) if df["size_bytes"] > 0: part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100) dprint(f" partition {name} montee sur {mountpoint} : " f"{part['used_human']} / {part['size_human']} ({part['used_percent']}%)") else: dprint(f" partition {name} : fstype={fstype}, non montee") if fstype in ("ext2", "ext3", "ext4", "ntfs", "btrfs"): space = get_offline_space(f"/dev/{name}", fstype) if space: if "offline_note" in space: part["offline_note"] = space["offline_note"] dprint(f" partition {name} : {space['offline_note']}") else: part["used_bytes"] = space["used_bytes"] part["used_human"] = bytes_human(space["used_bytes"]) part["free_bytes"] = space["free_bytes"] part["free_human"] = bytes_human(space["free_bytes"]) part["used_percent"] = space["used_percent"] dprint(f" partition {name} offline : " f"{part['used_human']} / {part['size_human']} ({part['used_percent']}%)") # /home users sur cette partition if mountpoint == "/home" and not home_done[0]: part["home_users"] = get_home_users() home_done[0] = True # LVM dev_path = f"/dev/{name}" if fstype == "LVM2_member" and dev_path in lvm_map: lvm_info = lvm_map[dev_path] lvs = [dict(lv) for lv in lvm_info["logical_volumes"]] lv_children = child.get("children") or [] for lv_child in lv_children: lv_mp = lv_child.get("mountpoint") or None lv_name_raw = lv_child.get("name", "") if not lv_mp: continue df = df_map.get(lv_mp) for lv in lvs: if lv["lv_name"] in lv_name_raw or lv_name_raw.endswith(lv["lv_name"]): lv["mountpoint"] = lv_mp lv["fstype"] = lv_child.get("fstype") or None if df: lv["used_human"] = bytes_human(df["used_bytes"]) lv["free_human"] = bytes_human(df["free_bytes"]) if df["size_bytes"] > 0: lv["used_percent"] = int( df["used_bytes"] / df["size_bytes"] * 100) if lv_mp == "/home" and not home_done[0]: lv["home_users"] = get_home_users() home_done[0] = True part["lvm"] = {"vg_name": lvm_info["vg_name"], "logical_volumes": lvs} parts.append(part) return parts # -- Collecte principale ------------------------------------------------------ def collect(): os_type, os_version = detect_os() dprint(f"OS detecte : {os_type} {os_version}") hostname = get_hostname() ip = get_ip() dprint(f"Machine : {hostname} / {ip}") df_map = get_df_map() lvm_map = get_lvm_map() dprint("liste des disques via lsblk...") output = run([ "lsblk", "-J", "-b", "-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME", ]) if not output: print("[inventaire] Impossible de lister les disques via lsblk", file=sys.stderr) sys.exit(1) try: blk = json.loads(output) except json.JSONDecodeError as e: print(f"[inventaire] Erreur parsing lsblk JSON: {e}", file=sys.stderr) sys.exit(1) disks = [] home_done = [False] for blkdev in blk.get("blockdevices", []): if blkdev.get("type") != "disk": continue name = blkdev.get("name", "") dev_path = f"/dev/{name}" model = (blkdev.get("model") or "inconnu").strip() serial = (blkdev.get("serial") or "inconnu").strip() size_b = blkdev.get("size") or 0 rota = blkdev.get("rota", "") dtype = disk_type(name, rota) dprint(f"disque {name} : {model} / serial {serial} / {dtype} / {bytes_human(size_b)}") partitions = build_partitions( blkdev.get("children") or [], df_map, lvm_map, home_done) # /home sur partition racine si pas encore trouve if not home_done[0]: for p in partitions: if p.get("mountpoint") == "/": dprint("/home non dedie, calcul depuis la partition racine /") p["home_users"] = get_home_users() home_done[0] = True break disk = { "device": name, "path": dev_path, "by_id": get_by_id(dev_path), "model": model, "serial": serial, "type": dtype, "capacity_bytes": size_b, "capacity_human": bytes_human(size_b), "bus": get_bus(name), "smart": get_smart(dev_path), "partitions": partitions, } if os_type == "proxmox": role = get_proxmox_role(name) if role: disk["proxmox_role"] = role dprint(f" role Proxmox : {role}") disks.append(disk) dprint(f"collecte terminee : {len(disks)} disque(s)") return { "hostname": hostname, "ip": ip, "os": os_type, "os_version": os_version, "collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(), "disks": disks, } # -- HTTP POST ---------------------------------------------------------------- def post_to_api(payload, api_url): url = f"{api_url.rstrip('/')}/api/ingest" data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=30) as resp: body = json.loads(resp.read()) print( f"[inventaire] OK - {body.get('accepted', '?')} disque(s) " f"enregistre(s) pour {body.get('hostname', '?')}" ) except urllib.error.HTTPError as e: print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"[inventaire] Impossible de joindre {url}: {e.reason}", file=sys.stderr) sys.exit(1) # -- Entrypoint --------------------------------------------------------------- if __name__ == "__main__": if os.geteuid() != 0: print("[inventaire] Ce script doit etre execute en root (sudo).", file=sys.stderr) sys.exit(1) args = parse_args() api_url = f"http://{args.host}:{args.port}" _verbose[0] = args.debug or args.dry_run print(f"[inventaire] Collecte en cours...", file=sys.stderr) payload = collect() if args.output: with open(args.output, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) print(f"[inventaire] JSON sauvegarde : {args.output}", file=sys.stderr) if args.dry_run: print_json(payload) print(f"\n[inventaire] --dry-run : aucune donnee envoyee (cible: {api_url})", file=sys.stderr) elif args.debug: print_json(payload) post_to_api(payload, api_url) else: post_to_api(payload, api_url)