feat(server): handlers REST + transport MQTT
Ajout des handlers HTTP (agents, métriques historique, config agent/serveur, icônes upload/get) et du client MQTT serveur avec subscribe automatique et PushConfig. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/user/nanometrics/server/db"
|
||||
)
|
||||
|
||||
func AgentsHandler(database *db.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
agents, err := database.GetAgents()
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(agents)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/user/nanometrics/server/db"
|
||||
"github.com/user/nanometrics/server/models"
|
||||
)
|
||||
|
||||
func AgentConfigHandler(database *db.DB, pushConfig func(agentID string, cfg *models.AgentConfig)) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
|
||||
if len(parts) < 4 {
|
||||
http.Error(w, "invalid path", 400)
|
||||
return
|
||||
}
|
||||
agentID := parts[2]
|
||||
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
cfg, err := database.GetAgentConfig(agentID)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
if cfg == nil {
|
||||
cfg = &models.AgentConfig{}
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(cfg)
|
||||
|
||||
case http.MethodPut:
|
||||
var cfg models.AgentConfig
|
||||
if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
if err := database.UpsertAgentConfig(agentID, &cfg); err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
if pushConfig != nil {
|
||||
go pushConfig(agentID, &cfg)
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
||||
default:
|
||||
http.Error(w, "method not allowed", 405)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ServerConfigHandler(database *db.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
cfg, err := database.GetServerConfig()
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(cfg)
|
||||
|
||||
case http.MethodPut:
|
||||
var cfg models.ServerConfig
|
||||
if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
if err := database.SetServerConfig(cfg); err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
||||
default:
|
||||
http.Error(w, "method not allowed", 405)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package handlers_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/user/nanometrics/server/db"
|
||||
"github.com/user/nanometrics/server/handlers"
|
||||
"github.com/user/nanometrics/server/models"
|
||||
)
|
||||
|
||||
func testDB(t *testing.T) *db.DB {
|
||||
d, _ := db.Open(":memory:")
|
||||
t.Cleanup(func() { d.Close() })
|
||||
return d
|
||||
}
|
||||
|
||||
func TestServerConfigGetPut(t *testing.T) {
|
||||
d := testDB(t)
|
||||
h := handlers.ServerConfigHandler(d)
|
||||
|
||||
r := httptest.NewRequest(http.MethodGet, "/api/config", nil)
|
||||
w := httptest.NewRecorder()
|
||||
h(w, r)
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("GET status: %d", w.Code)
|
||||
}
|
||||
var got models.ServerConfig
|
||||
json.NewDecoder(w.Body).Decode(&got)
|
||||
if got.TileMinWidth != 220 {
|
||||
t.Errorf("tile_min_width défaut: %d", got.TileMinWidth)
|
||||
}
|
||||
|
||||
cfg := models.DefaultServerConfig()
|
||||
cfg.TileMinWidth = 300
|
||||
body, _ := json.Marshal(cfg)
|
||||
r2 := httptest.NewRequest(http.MethodPut, "/api/config", bytes.NewReader(body))
|
||||
w2 := httptest.NewRecorder()
|
||||
h(w2, r2)
|
||||
if w2.Code != 204 {
|
||||
t.Fatalf("PUT status: %d", w2.Code)
|
||||
}
|
||||
|
||||
r3 := httptest.NewRequest(http.MethodGet, "/api/config", nil)
|
||||
w3 := httptest.NewRecorder()
|
||||
h(w3, r3)
|
||||
var got2 models.ServerConfig
|
||||
json.NewDecoder(w3.Body).Decode(&got2)
|
||||
if got2.TileMinWidth != 300 {
|
||||
t.Errorf("tile_min_width après PUT: %d", got2.TileMinWidth)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentsHandler(t *testing.T) {
|
||||
d := testDB(t)
|
||||
h := handlers.AgentsHandler(d)
|
||||
|
||||
r := httptest.NewRequest(http.MethodGet, "/api/agents", nil)
|
||||
w := httptest.NewRecorder()
|
||||
h(w, r)
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("status: %d", w.Code)
|
||||
}
|
||||
var agents []models.Agent
|
||||
json.NewDecoder(w.Body).Decode(&agents)
|
||||
if agents == nil {
|
||||
// tableau vide attendu, pas d'erreur
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"image"
|
||||
_ "image/jpeg"
|
||||
"image/png"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/disintegration/imaging"
|
||||
"github.com/user/nanometrics/server/db"
|
||||
)
|
||||
|
||||
const maxIconSize = 128
|
||||
|
||||
func IconUploadHandler(database *db.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "method not allowed", 405)
|
||||
return
|
||||
}
|
||||
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
|
||||
if len(parts) < 4 {
|
||||
http.Error(w, "invalid path", 400)
|
||||
return
|
||||
}
|
||||
agentID := parts[2]
|
||||
|
||||
r.ParseMultipartForm(2 << 20)
|
||||
file, header, err := r.FormFile("icon")
|
||||
if err != nil {
|
||||
http.Error(w, "fichier manquant", 400)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
mime := header.Header.Get("Content-Type")
|
||||
if mime == "" {
|
||||
mime = "image/png"
|
||||
}
|
||||
|
||||
if strings.Contains(mime, "svg") {
|
||||
var buf bytes.Buffer
|
||||
buf.ReadFrom(file)
|
||||
if err := database.SaveIcon(agentID, buf.Bytes(), "image/svg+xml"); err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
img, _, err := image.Decode(file)
|
||||
if err != nil {
|
||||
http.Error(w, "image invalide", 400)
|
||||
return
|
||||
}
|
||||
resized := imaging.Fit(img, maxIconSize, maxIconSize, imaging.Lanczos)
|
||||
var buf bytes.Buffer
|
||||
if err := png.Encode(&buf, resized); err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
if err := database.SaveIcon(agentID, buf.Bytes(), "image/png"); err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
func IconGetHandler(database *db.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
|
||||
if len(parts) < 4 {
|
||||
http.Error(w, "invalid path", 400)
|
||||
return
|
||||
}
|
||||
agentID := parts[2]
|
||||
data, mime, err := database.GetIcon(agentID)
|
||||
if err != nil {
|
||||
http.Error(w, "not found", 404)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", mime)
|
||||
w.Write(data)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/user/nanometrics/server/db"
|
||||
)
|
||||
|
||||
func MetricsHistoryHandler(database *db.DB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
|
||||
if len(parts) < 4 {
|
||||
http.Error(w, "invalid path", 400)
|
||||
return
|
||||
}
|
||||
agentID := parts[2]
|
||||
|
||||
now := time.Now().Unix()
|
||||
from := now - 3600
|
||||
to := now
|
||||
|
||||
if v := r.URL.Query().Get("from"); v != "" {
|
||||
if n, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||||
from = n
|
||||
}
|
||||
}
|
||||
if v := r.URL.Query().Get("to"); v != "" {
|
||||
if n, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||||
to = n
|
||||
}
|
||||
}
|
||||
|
||||
history, err := database.GetMetricsHistory(agentID, from, to)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 500)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(history)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/user/nanometrics/server/models"
|
||||
)
|
||||
|
||||
type MQTTClient struct {
|
||||
client mqtt.Client
|
||||
topicBase string
|
||||
}
|
||||
|
||||
func StartMQTT(broker, topicBase string, handler func(*models.AgentMetrics)) (*MQTTClient, error) {
|
||||
mc := &MQTTClient{topicBase: topicBase}
|
||||
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(broker).
|
||||
SetClientID("nanometrics-server").
|
||||
SetAutoReconnect(true).
|
||||
SetOnConnectHandler(func(c mqtt.Client) {
|
||||
log.Printf("[mqtt] connecté à %s", broker)
|
||||
topic := fmt.Sprintf("%s/+/metrics", topicBase)
|
||||
if tok := c.Subscribe(topic, 0, nil); tok.Wait() && tok.Error() != nil {
|
||||
log.Printf("[mqtt] subscribe error: %v", tok.Error())
|
||||
}
|
||||
}).
|
||||
SetConnectionLostHandler(func(c mqtt.Client, err error) {
|
||||
log.Printf("[mqtt] connexion perdue: %v", err)
|
||||
}).
|
||||
SetDefaultPublishHandler(func(_ mqtt.Client, msg mqtt.Message) {
|
||||
var m models.AgentMetrics
|
||||
if err := json.Unmarshal(msg.Payload(), &m); err != nil {
|
||||
log.Printf("[mqtt] JSON invalide sur %s: %v", msg.Topic(), err)
|
||||
return
|
||||
}
|
||||
if m.Hostname != "" {
|
||||
handler(&m)
|
||||
}
|
||||
})
|
||||
|
||||
mc.client = mqtt.NewClient(opts)
|
||||
if tok := mc.client.Connect(); tok.Wait() && tok.Error() != nil {
|
||||
return nil, fmt.Errorf("mqtt connect: %w", tok.Error())
|
||||
}
|
||||
return mc, nil
|
||||
}
|
||||
|
||||
func (mc *MQTTClient) PushConfig(hostname string, cfg *models.AgentConfig) error {
|
||||
topic := fmt.Sprintf("%s/%s/config", mc.topicBase, hostname)
|
||||
data, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tok := mc.client.Publish(topic, 1, false, data)
|
||||
tok.Wait()
|
||||
return tok.Error()
|
||||
}
|
||||
Reference in New Issue
Block a user