Files
nano_metrics/agent/src/main.rs
T
Gilles Soulier 5ee8b66464 feat(v0.1.7): port iperf3 configurable + iperf3 docker sur port 5202
- config.toml: nouveau champ [server] iperf3_port (défaut 5201)
- network_info: iperf3 -p <port> utilise le port configuré
- docker-compose: iperf3 exposé sur 5202 (5201 occupé par linux_benchtools)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 06:47:15 +02:00

221 lines
8.0 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use nanometrics_agent::{config, metrics, payload, transport};
use sysinfo::{Components, Networks, System};
use std::time::{Duration, Instant};
use std::sync::mpsc;
use std::sync::atomic::{AtomicBool, Ordering};
static RUNNING: AtomicBool = AtomicBool::new(true);
extern "C" fn handle_signal(_: libc::c_int) {
RUNNING.store(false, Ordering::Relaxed);
}
fn get_local_ip(server_ip: &str) -> String {
use std::net::UdpSocket;
// Try server IP first (always reachable), then internet fallback
for target in &[format!("{}:80", server_ip), "8.8.8.8:80".to_string()] {
if let Ok(s) = UdpSocket::bind("0.0.0.0:0") {
if s.connect(target.as_str()).is_ok() {
if let Ok(addr) = s.local_addr() {
let ip = addr.ip().to_string();
if ip != "0.0.0.0" {
return ip;
}
}
}
}
}
"0.0.0.0".to_string()
}
fn apply_config_update(cfg: &mut config::Config, data: &[u8]) {
if let Ok(new_cfg) = serde_json::from_slice::<config::Config>(data) {
cfg.metrics = new_cfg.metrics;
eprintln!("[config] mis à jour depuis le serveur");
}
}
fn main() {
let cfg_path = std::env::args()
.nth(1)
.unwrap_or_else(|| "config.toml".to_string());
let mut cfg = config::load(std::path::Path::new(&cfg_path))
.expect("Impossible de charger config.toml");
let hostname = System::host_name().unwrap_or_else(|| "unknown".to_string());
let ip = get_local_ip(&cfg.server.ip);
let mut sys = System::new();
let mut networks = Networks::new_with_refreshed_list();
let mut components = Components::new_with_refreshed_list();
let udp_sender = if cfg.protocols.udp.enabled {
Some(transport::udp::UdpSender::new(&cfg.server.ip, cfg.server.port))
} else {
None
};
let (incoming_tx, incoming_rx) = mpsc::channel::<transport::mqtt::MqttIncoming>();
let mqtt_client = if cfg.protocols.mqtt.enabled {
Some(transport::mqtt::start(&hostname, &cfg.protocols.mqtt, incoming_tx))
} else {
None
};
unsafe {
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t);
}
let mut last_slow = Instant::now();
let mut last_medium = Instant::now();
let mut first_medium = true;
let mut first_slow = true;
// Scheduler métriques lentes (startup + 1×/jour à l'heure configurée)
let slow_time: (u32, u32) = {
let parts: Vec<&str> = cfg.metrics.slow_daily_time.splitn(2, ':').collect();
let h = parts.first().and_then(|s| s.parse().ok()).unwrap_or(3u32);
let m = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0u32);
(h, m)
};
let mut slow_daily_done = false;
let mut slow_last_yday = metrics::network_info::current_yday().wrapping_sub(1);
// Collecte immédiate au démarrage
let mut startup_net: Option<Vec<payload::NetworkInterface>> = None;
let mut startup_hw: Option<payload::HardwareInfo> = None;
if cfg.metrics.network_info.udp || cfg.metrics.network_info.mqtt {
let ni = metrics::network_info::collect(&cfg.server.ip, cfg.server.iperf3_port);
if !ni.is_empty() { startup_net = Some(ni); }
}
if cfg.metrics.hardware_info.udp || cfg.metrics.hardware_info.mqtt {
startup_hw = metrics::hardware::collect();
}
while RUNNING.load(Ordering::Relaxed) {
let now = Instant::now();
while let Ok(transport::mqtt::MqttIncoming::ConfigUpdate(data)) = incoming_rx.try_recv() {
apply_config_update(&mut cfg, &data);
}
sys.refresh_cpu_usage();
sys.refresh_memory();
// Métriques lentes quotidiennes
let cur_yday = metrics::network_info::current_yday();
if cur_yday != slow_last_yday {
slow_last_yday = cur_yday;
slow_daily_done = false;
}
let mut daily_net: Option<Vec<payload::NetworkInterface>> = None;
let mut daily_hw: Option<payload::HardwareInfo> = None;
if !slow_daily_done {
let (ch, cm) = metrics::network_info::current_hhmm();
if ch == slow_time.0 && cm == slow_time.1 {
slow_daily_done = true;
if cfg.metrics.network_info.udp || cfg.metrics.network_info.mqtt {
let ni = metrics::network_info::collect(&cfg.server.ip, cfg.server.iperf3_port);
if !ni.is_empty() { daily_net = Some(ni); }
}
if cfg.metrics.hardware_info.udp || cfg.metrics.hardware_info.mqtt {
daily_hw = metrics::hardware::collect();
}
}
}
let mut m = payload::AgentMetrics {
hostname: hostname.clone(),
ip: ip.clone(),
status: "online".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
network_info: daily_net.or_else(|| startup_net.take()),
hardware_info: daily_hw.or_else(|| startup_hw.take()),
..Default::default()
};
if cfg.metrics.cpu.udp || cfg.metrics.cpu.mqtt {
m.cpu_percent = Some(metrics::cpu::get(&sys));
}
let (mem_used, mem_free, mem_total) = metrics::memory::get(&sys);
if cfg.metrics.memory.udp || cfg.metrics.memory.mqtt {
m.memory_used = Some(mem_used);
m.memory_free = Some(mem_free);
m.memory_total = Some(mem_total);
}
if first_medium || now.duration_since(last_medium).as_secs() >= 10 {
networks.refresh();
components.refresh();
let (rx, tx) = metrics::network::get(&networks);
if cfg.metrics.network.udp || cfg.metrics.network.mqtt {
m.network_rx = Some(rx);
m.network_tx = Some(tx);
}
if cfg.metrics.uptime.udp || cfg.metrics.uptime.mqtt {
m.uptime = Some(metrics::uptime::get());
}
if cfg.metrics.temperature.udp || cfg.metrics.temperature.mqtt {
m.temperature = metrics::temperature::get(&components);
}
last_medium = now;
first_medium = false;
}
if first_slow || now.duration_since(last_slow).as_secs() >= 60 {
if cfg.metrics.disk.udp || cfg.metrics.disk.mqtt {
let (used, free, total) = metrics::disk::get();
m.hdd_used = Some(used);
m.hdd_free = Some(free);
m.hdd_total = Some(total);
}
if cfg.metrics.smart.udp || cfg.metrics.smart.mqtt {
m.smart = metrics::smart::collect();
}
last_slow = now;
first_slow = false;
}
let json = serde_json::to_string(&m).expect("sérialisation JSON impossible");
if let Some(ref udp) = udp_sender {
udp.send(&json);
}
if let Some(ref client) = mqtt_client {
if cfg.protocols.mqtt.enabled {
transport::mqtt::publish_metrics(
client,
&cfg.protocols.mqtt.topic_base,
&hostname,
&json,
);
}
}
std::thread::sleep(Duration::from_secs(2));
}
// Déconnexion propre : notifier le serveur avant de quitter
let offline = serde_json::to_string(&payload::AgentMetrics {
hostname: hostname.clone(),
ip: ip.clone(),
status: "offline".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
..Default::default()
}).unwrap_or_default();
if let Some(ref udp) = udp_sender {
udp.send(&offline);
}
if let Some(ref client) = mqtt_client {
transport::mqtt::publish_status(
client, &cfg.protocols.mqtt.topic_base, &hostname, "offline",
);
std::thread::sleep(Duration::from_millis(200)); // laisser le temps au broker de recevoir
let _ = client.disconnect();
}
}