feat(agent): transport MQTT birth/LWT/subscribe config
Implémente le transport MQTT complet avec rumqttc 0.24 :
- Birth message (online) au ConnAck + subscribe config topic
- Last Will Testament (offline) configuré au connect
- Réception config agent via topic {base}/{hostname}/config
- publish_metrics helper pour envoyer les métriques
- Transport::Tcp forcé pour broker sans TLS (malgré use-native-tls feature)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+70
-10
@@ -1,22 +1,82 @@
|
||||
use crate::config::MqttConfig;
|
||||
use rumqttc::{Client, LastWill, MqttOptions, QoS, Event, Packet, Transport};
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::time::Duration;
|
||||
use crate::config::MqttConfig;
|
||||
|
||||
pub enum MqttIncoming {
|
||||
ConfigUpdate(Vec<u8>),
|
||||
}
|
||||
|
||||
pub struct MqttClient;
|
||||
pub fn start(
|
||||
hostname: &str,
|
||||
cfg: &MqttConfig,
|
||||
incoming_tx: Sender<MqttIncoming>,
|
||||
) -> Client {
|
||||
let status_topic = format!("{}/{}/status", cfg.topic_base, hostname);
|
||||
let config_topic = format!("{}/{}/config", cfg.topic_base, hostname);
|
||||
|
||||
impl MqttClient {
|
||||
pub fn clone(&self) -> Self {
|
||||
MqttClient
|
||||
let mut opts = MqttOptions::new(
|
||||
format!("nanometrics-{}", hostname),
|
||||
&cfg.host,
|
||||
cfg.port,
|
||||
);
|
||||
opts.set_keep_alive(Duration::from_secs(30));
|
||||
// Forcer TCP non-TLS (use-native-tls est activé en feature mais le broker est sans TLS)
|
||||
opts.set_transport(Transport::Tcp);
|
||||
|
||||
if cfg.last_will {
|
||||
opts.set_last_will(LastWill::new(
|
||||
&status_topic,
|
||||
"offline",
|
||||
QoS::AtLeastOnce,
|
||||
true,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(_hostname: &str, _cfg: &MqttConfig, _tx: Sender<MqttIncoming>) -> rumqttc::Client {
|
||||
let opts = rumqttc::MqttOptions::new("stub", "localhost", 1883);
|
||||
let (client, _conn) = rumqttc::Client::new(opts, 1);
|
||||
let (client, mut connection) = Client::new(opts, 16);
|
||||
|
||||
let client_clone = client.clone();
|
||||
let config_topic_clone = config_topic.clone();
|
||||
let status_topic_clone = status_topic.clone();
|
||||
let birth = cfg.birth_message;
|
||||
|
||||
std::thread::spawn(move || {
|
||||
for event in connection.iter() {
|
||||
match event {
|
||||
Ok(Event::Incoming(Packet::ConnAck(_))) => {
|
||||
if birth {
|
||||
let _ = client_clone.publish(
|
||||
&status_topic_clone,
|
||||
QoS::AtLeastOnce,
|
||||
true,
|
||||
"online",
|
||||
);
|
||||
}
|
||||
let _ = client_clone.subscribe(
|
||||
&config_topic_clone,
|
||||
QoS::AtLeastOnce,
|
||||
);
|
||||
}
|
||||
Ok(Event::Incoming(Packet::Publish(p))) => {
|
||||
if p.topic == config_topic_clone {
|
||||
let _ = incoming_tx.send(MqttIncoming::ConfigUpdate(
|
||||
p.payload.to_vec(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("[mqtt] erreur: {}", e);
|
||||
std::thread::sleep(Duration::from_secs(5));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
client
|
||||
}
|
||||
|
||||
pub fn publish_metrics(_client: &rumqttc::Client, _base: &str, _hostname: &str, _json: &str) {}
|
||||
pub fn publish_metrics(client: &Client, topic_base: &str, hostname: &str, json: &str) {
|
||||
let topic = format!("{}/{}/metrics", topic_base, hostname);
|
||||
let _ = client.publish(topic, QoS::AtMostOnce, false, json);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user