29a63be483
Distingue volume verrouillé (Windows hibernate/Fast Startup) de ntfsresize absent. Champ offline_note dans le payload JSON. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
710 lines
26 KiB
Python
710 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
inventaire.py - Inventaire disques HDD/SSD/NVMe
|
|
Executer en root : sudo python3 inventaire.py [options]
|
|
Dependances : stdlib uniquement (Python 3.9+)
|
|
|
|
Lancement a distance :
|
|
curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py | sudo python3 -
|
|
"""
|
|
import argparse, json, os, re, subprocess, sys, urllib.request, urllib.error
|
|
from datetime import datetime, timezone
|
|
|
|
_verbose = [False] # mutable : _verbose[0] = True pour activer
|
|
|
|
def dprint(msg):
|
|
if _verbose[0]:
|
|
print(f"[debug] {msg}", file=sys.stderr)
|
|
|
|
|
|
# -- CLI ----------------------------------------------------------------------
|
|
|
|
def parse_args():
|
|
default_host = os.environ.get("MES_HDD_HOST", "10.0.0.50")
|
|
default_port = int(os.environ.get("MES_HDD_PORT", "8088"))
|
|
p = argparse.ArgumentParser(
|
|
description="Inventaire disques HDD/SSD/NVMe -> backend mes_hdd",
|
|
epilog=(
|
|
"Variables d'environnement :\n"
|
|
" MES_HDD_HOST Hote du serveur (defaut: 10.0.0.50)\n"
|
|
" MES_HDD_PORT Port du serveur (defaut: 8088)\n\n"
|
|
"Exemples :\n"
|
|
" sudo python3 inventaire.py\n"
|
|
" sudo python3 inventaire.py --dry-run\n"
|
|
" sudo python3 inventaire.py --debug --output /tmp/inv.json\n"
|
|
" sudo python3 inventaire.py --host 192.168.1.10 --port 9000\n"
|
|
" curl -fsSL https://git.maison43gil.com/gilles/mes_hdd/raw/branch/main/inventaire.py"
|
|
" | sudo MES_HDD_HOST=10.0.0.50 python3 -\n"
|
|
),
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
p.add_argument(
|
|
"--host", default=default_host,
|
|
help=f"Hote du serveur backend (env MES_HDD_HOST, defaut: {default_host})",
|
|
)
|
|
p.add_argument(
|
|
"--port", type=int, default=default_port,
|
|
help=f"Port du serveur backend (env MES_HDD_PORT, defaut: {default_port})",
|
|
)
|
|
p.add_argument(
|
|
"-n", "--dry-run", action="store_true",
|
|
help="Affiche le JSON collecte sans envoyer au serveur",
|
|
)
|
|
p.add_argument(
|
|
"--debug", action="store_true",
|
|
help="Mode verbeux : messages detailles + affiche JSON + envoie au serveur",
|
|
)
|
|
p.add_argument(
|
|
"--output", metavar="FICHIER",
|
|
help="Sauvegarde le JSON dans un fichier (en plus de l'envoi)",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
# -- Helpers ------------------------------------------------------------------
|
|
|
|
def run(cmd, default=None):
|
|
try:
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
|
if r.returncode != 0:
|
|
dprint(f"echec {r.returncode}: {' '.join(str(c) for c in cmd)}"
|
|
+ (f" -> {r.stderr.strip()}" if r.stderr.strip() else ""))
|
|
return default
|
|
return r.stdout.strip()
|
|
except Exception as e:
|
|
dprint(f"exception: {' '.join(str(c) for c in cmd)} -> {e}")
|
|
return default
|
|
|
|
def bytes_human(n):
|
|
if n is None:
|
|
return "?"
|
|
for unit in ("o", "Ko", "Mo", "Go", "To"):
|
|
if n < 1024:
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} Po"
|
|
|
|
def print_json(payload):
|
|
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
|
|
|
|
|
# -- Detection OS -------------------------------------------------------------
|
|
|
|
def detect_os():
|
|
info = {}
|
|
try:
|
|
with open("/etc/os-release") as f:
|
|
for line in f:
|
|
if "=" in line:
|
|
k, v = line.strip().split("=", 1)
|
|
info[k] = v.strip('"')
|
|
except FileNotFoundError:
|
|
pass
|
|
os_id = info.get("ID", "").lower()
|
|
variant = info.get("VARIANT_ID", "").lower()
|
|
version = info.get("VERSION_ID", "")
|
|
if os.path.isdir("/etc/pve") or variant == "proxmox":
|
|
return "proxmox", version
|
|
if os_id == "ubuntu":
|
|
return "ubuntu", version
|
|
return "debian", version
|
|
|
|
|
|
# -- Machine ------------------------------------------------------------------
|
|
|
|
def get_hostname():
|
|
return run(["hostname"], default="inconnu")
|
|
|
|
def get_ip():
|
|
out = run(["ip", "route", "get", "1.1.1.1"])
|
|
if out:
|
|
m = re.search(r"src\s+(\S+)", out)
|
|
if m:
|
|
return m.group(1)
|
|
out = run(["hostname", "-I"])
|
|
if out:
|
|
return out.split()[0]
|
|
return "inconnu"
|
|
|
|
|
|
# -- SMART --------------------------------------------------------------------
|
|
|
|
def _smart_unavailable(reason):
|
|
return {
|
|
"status": "unavailable", "label": "SMART indisponible", "detail": reason,
|
|
"temperature_c": None, "power_on_hours": None,
|
|
"reallocated_sectors": None, "pending_sectors": None,
|
|
"uncorrectable_sectors": None,
|
|
}
|
|
|
|
def _extract_sata_attr(output, name):
|
|
m = re.search(
|
|
rf"{name}\s+\S+\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s+\S+\s+(\d+)",
|
|
output,
|
|
)
|
|
return int(m.group(1)) if m else None
|
|
|
|
def _parse_sata_smart(out):
|
|
temp = (_extract_sata_attr(out, "Temperature_Celsius")
|
|
or _extract_sata_attr(out, "Airflow_Temperature_Cel"))
|
|
poh = _extract_sata_attr(out, "Power_On_Hours")
|
|
real = _extract_sata_attr(out, "Reallocated_Sector_Ct")
|
|
pend = _extract_sata_attr(out, "Current_Pending_Sector")
|
|
uncr = _extract_sata_attr(out, "Offline_Uncorrectable")
|
|
base = {"temperature_c": temp, "power_on_hours": poh,
|
|
"reallocated_sectors": real, "pending_sectors": pend,
|
|
"uncorrectable_sectors": uncr}
|
|
if "FAILED!" in out:
|
|
return {**base, "status": "fail", "label": "Défaillance probable",
|
|
"detail": "Prévoir le remplacement du disque"}
|
|
if "PASSED" in out or "Passed" in out:
|
|
issues = []
|
|
if real and real > 0: issues.append(f"{real} secteur(s) réalloué(s)")
|
|
if pend and pend > 0: issues.append(f"{pend} secteur(s) en attente")
|
|
if uncr and uncr > 0: issues.append(f"{uncr} secteur(s) non corrigeable(s)")
|
|
if issues:
|
|
return {**base, "status": "warn", "label": "Attention",
|
|
"detail": ", ".join(issues) + " — disque à surveiller"}
|
|
parts = []
|
|
if poh is not None: parts.append(f"{poh:,}h d'utilisation".replace(",", " "))
|
|
if temp is not None: parts.append(f"{temp}°C")
|
|
parts.append("aucun secteur défectueux")
|
|
return {**base, "status": "ok", "label": "Bon état", "detail": " · ".join(parts)}
|
|
return _smart_unavailable("résultat SMART non interprétable")
|
|
|
|
def _parse_nvme_smart(out):
|
|
m = re.search(r"Temperature:\s+(\d+)\s+Celsius", out)
|
|
temp = int(m.group(1)) if m else None
|
|
|
|
m = re.search(r"Power On Hours:\s+([\d,]+)", out)
|
|
poh = int(m.group(1).replace(",", "")) if m else None
|
|
|
|
m = re.search(r"Media and Data Integrity Errors:\s+(\d+)", out)
|
|
media_err = int(m.group(1)) if m else None
|
|
|
|
m = re.search(r"Available Spare:\s+(\d+)%", out)
|
|
spare = int(m.group(1)) if m else None
|
|
|
|
m = re.search(r"Percentage Used:\s+(\d+)%", out)
|
|
pct_used = int(m.group(1)) if m else None
|
|
|
|
base = {"temperature_c": temp, "power_on_hours": poh,
|
|
"reallocated_sectors": None, "pending_sectors": None,
|
|
"uncorrectable_sectors": media_err}
|
|
|
|
if "FAILED!" in out:
|
|
return {**base, "status": "fail", "label": "Défaillance probable",
|
|
"detail": "Prévoir le remplacement du disque"}
|
|
if "PASSED" in out or "Passed" in out:
|
|
issues = []
|
|
if spare is not None and spare < 20:
|
|
issues.append(f"espace de réserve faible ({spare}%)")
|
|
if pct_used is not None and pct_used > 80:
|
|
issues.append(f"usure avancée ({pct_used}% de durée de vie)")
|
|
if media_err and media_err > 0:
|
|
issues.append(f"{media_err} erreur(s) d'intégrité")
|
|
if issues:
|
|
return {**base, "status": "warn", "label": "Attention",
|
|
"detail": ", ".join(issues) + " — disque à surveiller"}
|
|
parts = []
|
|
if poh is not None: parts.append(f"{poh:,}h d'utilisation".replace(",", " "))
|
|
if temp is not None: parts.append(f"{temp}°C")
|
|
if pct_used is not None: parts.append(f"usure : {pct_used}%")
|
|
parts.append("aucune erreur NVMe détectée")
|
|
return {**base, "status": "ok", "label": "Bon état", "detail": " · ".join(parts)}
|
|
return _smart_unavailable("résultat SMART NVMe non interprétable")
|
|
|
|
def get_smart(dev):
|
|
dprint(f"lecture SMART {dev}...")
|
|
out = run(["smartctl", "-H", "-A", "-i", dev])
|
|
if out is None:
|
|
dprint(f"SMART {dev} : smartctl indisponible ou acces refuse")
|
|
return _smart_unavailable("smartctl absent ou acces refuse")
|
|
is_nvme = bool(re.search(r"NVMe|Transport protocol:\s*NVMe", out, re.IGNORECASE))
|
|
dprint(f"SMART {dev} : protocole {'NVMe' if is_nvme else 'SATA/SAS'}")
|
|
result = _parse_nvme_smart(out) if is_nvme else _parse_sata_smart(out)
|
|
dprint(f"SMART {dev} : {result['status']} - {result['detail']}")
|
|
return result
|
|
|
|
|
|
# -- Metadonnees disque -------------------------------------------------------
|
|
|
|
def get_by_id(dev_path):
|
|
out = run(["find", "/dev/disk/by-id", "-type", "l"])
|
|
if not out:
|
|
return None
|
|
for link in out.splitlines():
|
|
if "-part" in link:
|
|
continue
|
|
target = run(["readlink", "-f", link])
|
|
if target == dev_path:
|
|
return os.path.basename(link)
|
|
return None
|
|
|
|
def get_bus(dev_name):
|
|
if dev_name.startswith("nvme"):
|
|
return "nvme"
|
|
out = run(["udevadm", "info", "--query=property", f"--name=/dev/{dev_name}"])
|
|
if out:
|
|
m = re.search(r"^ID_BUS=(.+)$", out, re.MULTILINE)
|
|
if m:
|
|
return m.group(1).lower()
|
|
return "inconnu"
|
|
|
|
def disk_type(name, rota):
|
|
if name.startswith("nvme"):
|
|
return "NVMe"
|
|
if str(rota) == "1":
|
|
return "HDD"
|
|
if str(rota) == "0":
|
|
return "SSD"
|
|
return "inconnu"
|
|
|
|
|
|
# -- Espace disque (df) -------------------------------------------------------
|
|
|
|
def get_df_map():
|
|
dprint("lecture espace disques via df...")
|
|
out = run(["df", "--output=target,size,used,avail", "-B1"])
|
|
result = {}
|
|
if not out:
|
|
return result
|
|
for line in out.splitlines()[1:]:
|
|
parts = line.split()
|
|
if len(parts) < 4:
|
|
continue
|
|
try:
|
|
result[parts[0]] = {
|
|
"size_bytes": int(parts[1]),
|
|
"used_bytes": int(parts[2]),
|
|
"free_bytes": int(parts[3]),
|
|
}
|
|
except ValueError:
|
|
pass
|
|
dprint(f"df : {len(result)} point(s) de montage")
|
|
return result
|
|
|
|
|
|
# -- LVM ----------------------------------------------------------------------
|
|
|
|
def _lv_size_human(s):
|
|
if not s:
|
|
return "?"
|
|
s = s.strip()
|
|
unit_map = {"k": "Ko", "m": "Mo", "g": "Go", "t": "To"}
|
|
if s and s[-1].lower() in unit_map:
|
|
try:
|
|
return f"{float(s[:-1]):.1f} {unit_map[s[-1].lower()]}"
|
|
except ValueError:
|
|
pass
|
|
return s
|
|
|
|
def get_lvm_map():
|
|
dprint("detection LVM...")
|
|
pvs_out = run(["pvs", "--noheadings", "--reportformat", "json",
|
|
"-o", "pv_name,vg_name"])
|
|
if not pvs_out:
|
|
dprint("LVM : aucun volume physique detecte (pvs indisponible ou absent)")
|
|
return {}
|
|
try:
|
|
pvs = json.loads(pvs_out)["report"][0]["pv"]
|
|
except (json.JSONDecodeError, KeyError, IndexError):
|
|
return {}
|
|
lvs_by_vg = {}
|
|
lvs_out = run(["lvs", "--noheadings", "--reportformat", "json",
|
|
"-o", "lv_name,vg_name,lv_size,lv_path"])
|
|
if lvs_out:
|
|
try:
|
|
for lv in json.loads(lvs_out)["report"][0]["lv"]:
|
|
lvs_by_vg.setdefault(lv.get("vg_name", ""), []).append(lv)
|
|
except (json.JSONDecodeError, KeyError, IndexError):
|
|
pass
|
|
result = {}
|
|
for pv in pvs:
|
|
pv_name = pv.get("pv_name", "")
|
|
vg_name = pv.get("vg_name", "")
|
|
if pv_name and vg_name:
|
|
result[pv_name] = {
|
|
"vg_name": vg_name,
|
|
"logical_volumes": [
|
|
{"lv_name": lv.get("lv_name", ""),
|
|
"size_human": _lv_size_human(lv.get("lv_size", "")),
|
|
"used_human": None, "free_human": None,
|
|
"used_percent": None, "fstype": None, "mountpoint": None}
|
|
for lv in lvs_by_vg.get(vg_name, [])
|
|
],
|
|
}
|
|
dprint(f"LVM : {len(result)} volume(s) physique(s)")
|
|
return result
|
|
|
|
|
|
# -- /home users --------------------------------------------------------------
|
|
|
|
def get_home_users():
|
|
if not os.path.isdir("/home"):
|
|
dprint("/home absent")
|
|
return []
|
|
dprint("calcul taille /home par utilisateur...")
|
|
out = run(["du", "-d", "1", "-b", "/home"])
|
|
if out is None:
|
|
dprint("/home : echec de du, essai avec --max-depth...")
|
|
out = run(["du", "--max-depth=1", "-b", "/home"])
|
|
if out is None:
|
|
dprint("/home : impossible de calculer les tailles")
|
|
return None
|
|
entries = []
|
|
for line in out.splitlines():
|
|
parts = line.split("\t", 1)
|
|
if len(parts) != 2:
|
|
continue
|
|
path = parts[1].strip()
|
|
if path.rstrip("/") == "/home":
|
|
continue
|
|
try:
|
|
size = int(parts[0])
|
|
user = os.path.basename(path.rstrip("/"))
|
|
entries.append({
|
|
"user": user,
|
|
"size_bytes": size,
|
|
"size_human": bytes_human(size),
|
|
})
|
|
except ValueError:
|
|
continue
|
|
entries = sorted(entries, key=lambda x: x["size_bytes"], reverse=True)
|
|
dprint(f"/home : {len(entries)} utilisateur(s) : "
|
|
+ ", ".join(f"{e['user']} ({e['size_human']})" for e in entries))
|
|
return entries
|
|
|
|
|
|
# -- Proxmox ------------------------------------------------------------------
|
|
|
|
def get_proxmox_role(dev_name):
|
|
zpool_out = run(["zpool", "status", "-P"])
|
|
if zpool_out and f"/dev/{dev_name}" in zpool_out:
|
|
return "zfs_pool"
|
|
ceph_dir = "/var/lib/ceph/osd"
|
|
if os.path.isdir(ceph_dir):
|
|
try:
|
|
for entry in os.listdir(ceph_dir):
|
|
link = os.path.join(ceph_dir, entry, "block")
|
|
if os.path.islink(link) and dev_name in os.path.realpath(link):
|
|
return "ceph_osd"
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
|
|
# -- Espace offline (partitions non montées) ----------------------------------
|
|
|
|
def get_offline_space(dev, fstype):
|
|
"""Lit l'espace d'une partition non montée selon son type de filesystem."""
|
|
if fstype in ("ext2", "ext3", "ext4"):
|
|
return _ext4_space(dev)
|
|
if fstype == "ntfs":
|
|
return _ntfs_space(dev)
|
|
if fstype == "btrfs":
|
|
return _btrfs_space(dev)
|
|
return None
|
|
|
|
def _ext4_space(dev):
|
|
out = run(["tune2fs", "-l", dev])
|
|
if not out:
|
|
return None
|
|
vals = {}
|
|
for line in out.splitlines():
|
|
for key in ("Block count", "Free blocks", "Block size"):
|
|
if line.startswith(key + ":"):
|
|
try:
|
|
vals[key] = int(line.split(":", 1)[1].strip().replace(",", ""))
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
total = vals["Block count"] * vals["Block size"]
|
|
free = vals["Free blocks"] * vals["Block size"]
|
|
used = total - free
|
|
return {"size_bytes": total, "used_bytes": used, "free_bytes": free,
|
|
"used_percent": int(used / total * 100) if total else 0}
|
|
except KeyError:
|
|
return None
|
|
|
|
def _ntfs_space(dev):
|
|
try:
|
|
r = subprocess.run(
|
|
["ntfsresize", "--info", "--force", dev],
|
|
capture_output=True, text=True, timeout=15,
|
|
)
|
|
combined = r.stdout + r.stderr
|
|
if r.returncode != 0:
|
|
locked_markers = ("hibernated", "fast restart", "Windows is", "hiberfil",
|
|
"unclean", "Refusing to operate")
|
|
if any(m in combined for m in locked_markers):
|
|
note = "Volume NTFS verrouillé — Windows en veille ou Fast Startup actif"
|
|
else:
|
|
note = "Lecture NTFS impossible (ntfsresize indisponible ou erreur)"
|
|
dprint(f"NTFS {dev} : {note}")
|
|
return {"offline_note": note}
|
|
total = used = None
|
|
for line in combined.splitlines():
|
|
if "Current volume size:" in line:
|
|
m = re.search(r"(\d+)\s+bytes", line)
|
|
if m: total = int(m.group(1))
|
|
if "You might resize at" in line:
|
|
m = re.search(r"(\d+)\s+bytes", line)
|
|
if m: used = int(m.group(1))
|
|
if total and used:
|
|
return {"size_bytes": total, "used_bytes": used, "free_bytes": total - used,
|
|
"used_percent": int(used / total * 100) if total else 0}
|
|
return {"offline_note": "Taille NTFS non déterminée"}
|
|
except FileNotFoundError:
|
|
return {"offline_note": "ntfsresize non installé — sudo apt install ntfs-3g"}
|
|
except Exception as e:
|
|
return {"offline_note": f"Erreur lecture NTFS : {e}"}
|
|
|
|
def _btrfs_space(dev):
|
|
out = run(["btrfs", "filesystem", "show", dev])
|
|
if not out:
|
|
return None
|
|
# "Total devices 1 FS bytes used 10.00GiB"
|
|
m = re.search(r"FS bytes used\s+([\d.]+)(\w+)", out)
|
|
if m:
|
|
# approximation seulement, pas de total fiable sans montage
|
|
dprint(f"btrfs {dev} : espace utilisé approximatif")
|
|
return None # trop imprecis sans montage
|
|
|
|
|
|
# -- Construction des partitions ----------------------------------------------
|
|
|
|
def build_partitions(children, df_map, lvm_map, home_done):
|
|
parts = []
|
|
for child in (children or []):
|
|
if child.get("type") not in ("part", "lvm"):
|
|
continue
|
|
name = child.get("name", "")
|
|
fstype = child.get("fstype") or None
|
|
mountpoint = child.get("mountpoint") or None
|
|
|
|
if fstype == "squashfs": # snap Ubuntu - ignorer
|
|
continue
|
|
|
|
size_b = child.get("size")
|
|
part = {
|
|
"name": name,
|
|
"uuid": child.get("uuid") or None,
|
|
"fstype": fstype,
|
|
"size_bytes": size_b,
|
|
"size_human": bytes_human(size_b),
|
|
"used_bytes": None, "used_human": None,
|
|
"free_bytes": None, "free_human": None,
|
|
"used_percent": None,
|
|
"mountpoint": mountpoint,
|
|
"offline_note": None,
|
|
"home_users": None,
|
|
"lvm": None,
|
|
}
|
|
|
|
# Espace via df
|
|
if mountpoint and mountpoint in df_map:
|
|
df = df_map[mountpoint]
|
|
part["used_bytes"] = df["used_bytes"]
|
|
part["used_human"] = bytes_human(df["used_bytes"])
|
|
part["free_bytes"] = df["free_bytes"]
|
|
part["free_human"] = bytes_human(df["free_bytes"])
|
|
if df["size_bytes"] > 0:
|
|
part["used_percent"] = int(df["used_bytes"] / df["size_bytes"] * 100)
|
|
dprint(f" partition {name} montee sur {mountpoint} : "
|
|
f"{part['used_human']} / {part['size_human']} ({part['used_percent']}%)")
|
|
else:
|
|
dprint(f" partition {name} : fstype={fstype}, non montee")
|
|
if fstype in ("ext2", "ext3", "ext4", "ntfs", "btrfs"):
|
|
space = get_offline_space(f"/dev/{name}", fstype)
|
|
if space:
|
|
if "offline_note" in space:
|
|
part["offline_note"] = space["offline_note"]
|
|
dprint(f" partition {name} : {space['offline_note']}")
|
|
else:
|
|
part["used_bytes"] = space["used_bytes"]
|
|
part["used_human"] = bytes_human(space["used_bytes"])
|
|
part["free_bytes"] = space["free_bytes"]
|
|
part["free_human"] = bytes_human(space["free_bytes"])
|
|
part["used_percent"] = space["used_percent"]
|
|
dprint(f" partition {name} offline : "
|
|
f"{part['used_human']} / {part['size_human']} ({part['used_percent']}%)")
|
|
|
|
# /home users sur cette partition
|
|
if mountpoint == "/home" and not home_done[0]:
|
|
part["home_users"] = get_home_users()
|
|
home_done[0] = True
|
|
|
|
# LVM
|
|
dev_path = f"/dev/{name}"
|
|
if fstype == "LVM2_member" and dev_path in lvm_map:
|
|
lvm_info = lvm_map[dev_path]
|
|
lvs = [dict(lv) for lv in lvm_info["logical_volumes"]]
|
|
lv_children = child.get("children") or []
|
|
for lv_child in lv_children:
|
|
lv_mp = lv_child.get("mountpoint") or None
|
|
lv_name_raw = lv_child.get("name", "")
|
|
if not lv_mp:
|
|
continue
|
|
df = df_map.get(lv_mp)
|
|
for lv in lvs:
|
|
if lv["lv_name"] in lv_name_raw or lv_name_raw.endswith(lv["lv_name"]):
|
|
lv["mountpoint"] = lv_mp
|
|
lv["fstype"] = lv_child.get("fstype") or None
|
|
if df:
|
|
lv["used_human"] = bytes_human(df["used_bytes"])
|
|
lv["free_human"] = bytes_human(df["free_bytes"])
|
|
if df["size_bytes"] > 0:
|
|
lv["used_percent"] = int(
|
|
df["used_bytes"] / df["size_bytes"] * 100)
|
|
if lv_mp == "/home" and not home_done[0]:
|
|
lv["home_users"] = get_home_users()
|
|
home_done[0] = True
|
|
part["lvm"] = {"vg_name": lvm_info["vg_name"], "logical_volumes": lvs}
|
|
|
|
parts.append(part)
|
|
return parts
|
|
|
|
|
|
# -- Collecte principale ------------------------------------------------------
|
|
|
|
def collect():
|
|
os_type, os_version = detect_os()
|
|
dprint(f"OS detecte : {os_type} {os_version}")
|
|
|
|
hostname = get_hostname()
|
|
ip = get_ip()
|
|
dprint(f"Machine : {hostname} / {ip}")
|
|
|
|
df_map = get_df_map()
|
|
lvm_map = get_lvm_map()
|
|
|
|
dprint("liste des disques via lsblk...")
|
|
output = run([
|
|
"lsblk", "-J", "-b",
|
|
"-o", "NAME,TYPE,SIZE,MODEL,SERIAL,FSTYPE,MOUNTPOINT,ROTA,UUID,PKNAME",
|
|
])
|
|
if not output:
|
|
print("[inventaire] Impossible de lister les disques via lsblk", file=sys.stderr)
|
|
sys.exit(1)
|
|
try:
|
|
blk = json.loads(output)
|
|
except json.JSONDecodeError as e:
|
|
print(f"[inventaire] Erreur parsing lsblk JSON: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
disks = []
|
|
home_done = [False]
|
|
|
|
for blkdev in blk.get("blockdevices", []):
|
|
if blkdev.get("type") != "disk":
|
|
continue
|
|
name = blkdev.get("name", "")
|
|
dev_path = f"/dev/{name}"
|
|
model = (blkdev.get("model") or "inconnu").strip()
|
|
serial = (blkdev.get("serial") or "inconnu").strip()
|
|
size_b = blkdev.get("size") or 0
|
|
rota = blkdev.get("rota", "")
|
|
dtype = disk_type(name, rota)
|
|
|
|
dprint(f"disque {name} : {model} / serial {serial} / {dtype} / {bytes_human(size_b)}")
|
|
|
|
partitions = build_partitions(
|
|
blkdev.get("children") or [], df_map, lvm_map, home_done)
|
|
|
|
# /home sur partition racine si pas encore trouve
|
|
if not home_done[0]:
|
|
for p in partitions:
|
|
if p.get("mountpoint") == "/":
|
|
dprint("/home non dedie, calcul depuis la partition racine /")
|
|
p["home_users"] = get_home_users()
|
|
home_done[0] = True
|
|
break
|
|
|
|
disk = {
|
|
"device": name,
|
|
"path": dev_path,
|
|
"by_id": get_by_id(dev_path),
|
|
"model": model,
|
|
"serial": serial,
|
|
"type": dtype,
|
|
"capacity_bytes": size_b,
|
|
"capacity_human": bytes_human(size_b),
|
|
"bus": get_bus(name),
|
|
"smart": get_smart(dev_path),
|
|
"partitions": partitions,
|
|
}
|
|
if os_type == "proxmox":
|
|
role = get_proxmox_role(name)
|
|
if role:
|
|
disk["proxmox_role"] = role
|
|
dprint(f" role Proxmox : {role}")
|
|
|
|
disks.append(disk)
|
|
|
|
dprint(f"collecte terminee : {len(disks)} disque(s)")
|
|
return {
|
|
"hostname": hostname,
|
|
"ip": ip,
|
|
"os": os_type,
|
|
"os_version": os_version,
|
|
"collected_at": datetime.now(tz=timezone.utc).astimezone().isoformat(),
|
|
"disks": disks,
|
|
}
|
|
|
|
|
|
# -- HTTP POST ----------------------------------------------------------------
|
|
|
|
def post_to_api(payload, api_url):
|
|
url = f"{api_url.rstrip('/')}/api/ingest"
|
|
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
body = json.loads(resp.read())
|
|
print(
|
|
f"[inventaire] OK - {body.get('accepted', '?')} disque(s) "
|
|
f"enregistre(s) pour {body.get('hostname', '?')}"
|
|
)
|
|
except urllib.error.HTTPError as e:
|
|
print(f"[inventaire] Erreur HTTP {e.code}: {e.read().decode()}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except urllib.error.URLError as e:
|
|
print(f"[inventaire] Impossible de joindre {url}: {e.reason}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
# -- Entrypoint ---------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
if os.geteuid() != 0:
|
|
print("[inventaire] Ce script doit etre execute en root (sudo).", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
args = parse_args()
|
|
api_url = f"http://{args.host}:{args.port}"
|
|
|
|
_verbose[0] = args.debug or args.dry_run
|
|
|
|
print(f"[inventaire] Collecte en cours...", file=sys.stderr)
|
|
payload = collect()
|
|
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
print(f"[inventaire] JSON sauvegarde : {args.output}", file=sys.stderr)
|
|
|
|
if args.dry_run:
|
|
print_json(payload)
|
|
print(f"\n[inventaire] --dry-run : aucune donnee envoyee (cible: {api_url})",
|
|
file=sys.stderr)
|
|
elif args.debug:
|
|
print_json(payload)
|
|
post_to_api(payload, api_url)
|
|
else:
|
|
post_to_api(payload, api_url)
|