diff --git a/agent/src/transport/mqtt.rs b/agent/src/transport/mqtt.rs index d4a5ed2..d647b0f 100644 --- a/agent/src/transport/mqtt.rs +++ b/agent/src/transport/mqtt.rs @@ -1,22 +1,82 @@ -use crate::config::MqttConfig; +use rumqttc::{Client, LastWill, MqttOptions, QoS, Event, Packet, Transport}; use std::sync::mpsc::Sender; +use std::time::Duration; +use crate::config::MqttConfig; pub enum MqttIncoming { ConfigUpdate(Vec), } -pub struct MqttClient; +pub fn start( + hostname: &str, + cfg: &MqttConfig, + incoming_tx: Sender, +) -> Client { + let status_topic = format!("{}/{}/status", cfg.topic_base, hostname); + let config_topic = format!("{}/{}/config", cfg.topic_base, hostname); -impl MqttClient { - pub fn clone(&self) -> Self { - MqttClient + let mut opts = MqttOptions::new( + format!("nanometrics-{}", hostname), + &cfg.host, + cfg.port, + ); + opts.set_keep_alive(Duration::from_secs(30)); + // Forcer TCP non-TLS (use-native-tls est activé en feature mais le broker est sans TLS) + opts.set_transport(Transport::Tcp); + + if cfg.last_will { + opts.set_last_will(LastWill::new( + &status_topic, + "offline", + QoS::AtLeastOnce, + true, + )); } -} -pub fn start(_hostname: &str, _cfg: &MqttConfig, _tx: Sender) -> rumqttc::Client { - let opts = rumqttc::MqttOptions::new("stub", "localhost", 1883); - let (client, _conn) = rumqttc::Client::new(opts, 1); + let (client, mut connection) = Client::new(opts, 16); + + let client_clone = client.clone(); + let config_topic_clone = config_topic.clone(); + let status_topic_clone = status_topic.clone(); + let birth = cfg.birth_message; + + std::thread::spawn(move || { + for event in connection.iter() { + match event { + Ok(Event::Incoming(Packet::ConnAck(_))) => { + if birth { + let _ = client_clone.publish( + &status_topic_clone, + QoS::AtLeastOnce, + true, + "online", + ); + } + let _ = client_clone.subscribe( + &config_topic_clone, + QoS::AtLeastOnce, + ); + } + Ok(Event::Incoming(Packet::Publish(p))) => { + if p.topic == config_topic_clone { + let _ = incoming_tx.send(MqttIncoming::ConfigUpdate( + p.payload.to_vec(), + )); + } + } + Err(e) => { + eprintln!("[mqtt] erreur: {}", e); + std::thread::sleep(Duration::from_secs(5)); + } + _ => {} + } + } + }); + client } -pub fn publish_metrics(_client: &rumqttc::Client, _base: &str, _hostname: &str, _json: &str) {} +pub fn publish_metrics(client: &Client, topic_base: &str, hostname: &str, json: &str) { + let topic = format!("{}/{}/metrics", topic_base, hostname); + let _ = client.publish(topic, QoS::AtMostOnce, false, json); +}