From a85e5cbee28966554ef9ed81f9577da2f7405206 Mon Sep 17 00:00:00 2001 From: Gilles Soulier Date: Fri, 22 May 2026 11:39:58 +0200 Subject: [PATCH] feat(agent): transport MQTT birth/LWT/subscribe config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implémente le transport MQTT complet avec rumqttc 0.24 : - Birth message (online) au ConnAck + subscribe config topic - Last Will Testament (offline) configuré au connect - Réception config agent via topic {base}/{hostname}/config - publish_metrics helper pour envoyer les métriques - Transport::Tcp forcé pour broker sans TLS (malgré use-native-tls feature) Co-Authored-By: Claude Sonnet 4.6 --- agent/src/transport/mqtt.rs | 80 ++++++++++++++++++++++++++++++++----- 1 file changed, 70 insertions(+), 10 deletions(-) diff --git a/agent/src/transport/mqtt.rs b/agent/src/transport/mqtt.rs index d4a5ed2..d647b0f 100644 --- a/agent/src/transport/mqtt.rs +++ b/agent/src/transport/mqtt.rs @@ -1,22 +1,82 @@ -use crate::config::MqttConfig; +use rumqttc::{Client, LastWill, MqttOptions, QoS, Event, Packet, Transport}; use std::sync::mpsc::Sender; +use std::time::Duration; +use crate::config::MqttConfig; pub enum MqttIncoming { ConfigUpdate(Vec), } -pub struct MqttClient; +pub fn start( + hostname: &str, + cfg: &MqttConfig, + incoming_tx: Sender, +) -> Client { + let status_topic = format!("{}/{}/status", cfg.topic_base, hostname); + let config_topic = format!("{}/{}/config", cfg.topic_base, hostname); -impl MqttClient { - pub fn clone(&self) -> Self { - MqttClient + let mut opts = MqttOptions::new( + format!("nanometrics-{}", hostname), + &cfg.host, + cfg.port, + ); + opts.set_keep_alive(Duration::from_secs(30)); + // Forcer TCP non-TLS (use-native-tls est activé en feature mais le broker est sans TLS) + opts.set_transport(Transport::Tcp); + + if cfg.last_will { + opts.set_last_will(LastWill::new( + &status_topic, + "offline", + QoS::AtLeastOnce, + true, + )); } -} -pub fn start(_hostname: &str, _cfg: &MqttConfig, _tx: Sender) -> rumqttc::Client { - let opts = rumqttc::MqttOptions::new("stub", "localhost", 1883); - let (client, _conn) = rumqttc::Client::new(opts, 1); + let (client, mut connection) = Client::new(opts, 16); + + let client_clone = client.clone(); + let config_topic_clone = config_topic.clone(); + let status_topic_clone = status_topic.clone(); + let birth = cfg.birth_message; + + std::thread::spawn(move || { + for event in connection.iter() { + match event { + Ok(Event::Incoming(Packet::ConnAck(_))) => { + if birth { + let _ = client_clone.publish( + &status_topic_clone, + QoS::AtLeastOnce, + true, + "online", + ); + } + let _ = client_clone.subscribe( + &config_topic_clone, + QoS::AtLeastOnce, + ); + } + Ok(Event::Incoming(Packet::Publish(p))) => { + if p.topic == config_topic_clone { + let _ = incoming_tx.send(MqttIncoming::ConfigUpdate( + p.payload.to_vec(), + )); + } + } + Err(e) => { + eprintln!("[mqtt] erreur: {}", e); + std::thread::sleep(Duration::from_secs(5)); + } + _ => {} + } + } + }); + client } -pub fn publish_metrics(_client: &rumqttc::Client, _base: &str, _hostname: &str, _json: &str) {} +pub fn publish_metrics(client: &Client, topic_base: &str, hostname: &str, json: &str) { + let topic = format!("{}/{}/metrics", topic_base, hostname); + let _ = client.publish(topic, QoS::AtMostOnce, false, json); +}