feat(agent): déconnexion propre sur SIGTERM/SIGINT

- Capture SIGTERM et SIGINT via libc::signal → AtomicBool RUNNING
- La boucle principale s'arrête proprement à la prochaine itération
- Envoi d'un paquet status:offline via UDP avant de quitter
- MQTT : publish status offline + disconnect() pour déconnexion gracieuse
  (le last_will reste actif pour les déconnexions brutales)
- payload.rs: #[serde(default)] sur version pour compatibilité descendante

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Gilles Soulier
2026-05-22 22:34:55 +02:00
parent 3933301cff
commit 9e77d961f5
4 changed files with 40 additions and 1 deletions
+33 -1
View File
@@ -2,6 +2,13 @@ use nanometrics_agent::{config, metrics, payload, transport};
use sysinfo::{Components, Networks, System}; use sysinfo::{Components, Networks, System};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::atomic::{AtomicBool, Ordering};
static RUNNING: AtomicBool = AtomicBool::new(true);
extern "C" fn handle_signal(_: libc::c_int) {
RUNNING.store(false, Ordering::Relaxed);
}
fn get_local_ip() -> String { fn get_local_ip() -> String {
use std::net::UdpSocket; use std::net::UdpSocket;
@@ -49,12 +56,17 @@ fn main() {
None None
}; };
unsafe {
libc::signal(libc::SIGTERM, handle_signal as libc::sighandler_t);
libc::signal(libc::SIGINT, handle_signal as libc::sighandler_t);
}
let mut last_slow = Instant::now(); let mut last_slow = Instant::now();
let mut last_medium = Instant::now(); let mut last_medium = Instant::now();
let mut first_medium = true; let mut first_medium = true;
let mut first_slow = true; let mut first_slow = true;
loop { while RUNNING.load(Ordering::Relaxed) {
let now = Instant::now(); let now = Instant::now();
while let Ok(transport::mqtt::MqttIncoming::ConfigUpdate(data)) = incoming_rx.try_recv() { while let Ok(transport::mqtt::MqttIncoming::ConfigUpdate(data)) = incoming_rx.try_recv() {
@@ -134,4 +146,24 @@ fn main() {
std::thread::sleep(Duration::from_secs(2)); std::thread::sleep(Duration::from_secs(2));
} }
// Déconnexion propre : notifier le serveur avant de quitter
let offline = serde_json::to_string(&payload::AgentMetrics {
hostname: hostname.clone(),
ip: ip.clone(),
status: "offline".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
..Default::default()
}).unwrap_or_default();
if let Some(ref udp) = udp_sender {
udp.send(&offline);
}
if let Some(ref client) = mqtt_client {
transport::mqtt::publish_status(
client, &cfg.protocols.mqtt.topic_base, &hostname, "offline",
);
std::thread::sleep(Duration::from_millis(200)); // laisser le temps au broker de recevoir
let _ = client.disconnect();
}
} }
+1
View File
@@ -5,6 +5,7 @@ pub struct AgentMetrics {
pub hostname: String, pub hostname: String,
pub ip: String, pub ip: String,
pub status: String, pub status: String,
#[serde(default)]
pub version: String, pub version: String,
pub cpu_percent: Option<f32>, pub cpu_percent: Option<f32>,
pub memory_used: Option<u64>, pub memory_used: Option<u64>,
+5
View File
@@ -78,3 +78,8 @@ pub fn publish_metrics(client: &Client, topic_base: &str, hostname: &str, json:
let topic = format!("{}/{}/metrics", topic_base, hostname); let topic = format!("{}/{}/metrics", topic_base, hostname);
let _ = client.publish(topic, QoS::AtMostOnce, false, json); let _ = client.publish(topic, QoS::AtMostOnce, false, json);
} }
pub fn publish_status(client: &Client, topic_base: &str, hostname: &str, status: &str) {
let topic = format!("{}/{}/status", topic_base, hostname);
let _ = client.publish(topic, QoS::AtLeastOnce, true, status);
}
+1
View File
@@ -18,6 +18,7 @@ fn test_serialize_json_complet() {
temperature: None, temperature: None,
smart: None, smart: None,
status: "online".to_string(), status: "online".to_string(),
version: "0.0.0".to_string(),
}; };
let json = serde_json::to_string(&m).unwrap(); let json = serde_json::to_string(&m).unwrap();
assert!(json.contains("\"hostname\":\"srv-01\"")); assert!(json.contains("\"hostname\":\"srv-01\""));