feat(server): UDP listener, hub WebSocket, Gauges Prometheus
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/user/nanometrics/server/models"
|
||||
)
|
||||
|
||||
var (
|
||||
agentCPU = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "nanometrics_cpu_percent",
|
||||
Help: "Pourcentage CPU de l'agent",
|
||||
}, []string{"agent"})
|
||||
|
||||
agentMemUsed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "nanometrics_memory_used_bytes",
|
||||
Help: "RAM utilisée en octets",
|
||||
}, []string{"agent"})
|
||||
|
||||
agentMemTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "nanometrics_memory_total_bytes",
|
||||
Help: "RAM totale en octets",
|
||||
}, []string{"agent"})
|
||||
|
||||
agentDiskUsed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "nanometrics_disk_used_bytes",
|
||||
Help: "Disque utilisé en octets",
|
||||
}, []string{"agent"})
|
||||
|
||||
agentDiskTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "nanometrics_disk_total_bytes",
|
||||
Help: "Disque total en octets",
|
||||
}, []string{"agent"})
|
||||
|
||||
agentUptime = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "nanometrics_uptime_seconds",
|
||||
Help: "Uptime de l'agent en secondes",
|
||||
}, []string{"agent"})
|
||||
|
||||
agentStatus = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "nanometrics_agent_online",
|
||||
Help: "1 si l'agent est en ligne, 0 sinon",
|
||||
}, []string{"agent"})
|
||||
)
|
||||
|
||||
func Update(m *models.AgentMetrics) {
|
||||
l := prometheus.Labels{"agent": m.Hostname}
|
||||
if m.CPUPercent != nil {
|
||||
agentCPU.With(l).Set(*m.CPUPercent)
|
||||
}
|
||||
if m.MemoryUsed != nil {
|
||||
agentMemUsed.With(l).Set(float64(*m.MemoryUsed))
|
||||
}
|
||||
if m.MemoryTotal != nil {
|
||||
agentMemTotal.With(l).Set(float64(*m.MemoryTotal))
|
||||
}
|
||||
if m.HDDUsed != nil {
|
||||
agentDiskUsed.With(l).Set(float64(*m.HDDUsed))
|
||||
}
|
||||
if m.HDDTotal != nil {
|
||||
agentDiskTotal.With(l).Set(float64(*m.HDDTotal))
|
||||
}
|
||||
if m.Uptime != nil {
|
||||
agentUptime.With(l).Set(float64(*m.Uptime))
|
||||
}
|
||||
online := 0.0
|
||||
if m.Status == "online" {
|
||||
online = 1.0
|
||||
}
|
||||
agentStatus.With(l).Set(online)
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/user/nanometrics/server/models"
|
||||
)
|
||||
|
||||
func StartUDP(addr string, handler func(*models.AgentMetrics)) error {
|
||||
conn, err := net.ListenPacket("udp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("[udp] écoute sur %s", addr)
|
||||
go func() {
|
||||
buf := make([]byte, 65535)
|
||||
for {
|
||||
n, _, err := conn.ReadFrom(buf)
|
||||
if err != nil {
|
||||
log.Printf("[udp] erreur lecture: %v", err)
|
||||
continue
|
||||
}
|
||||
data := make([]byte, n)
|
||||
copy(data, buf[:n])
|
||||
go processUDP(data, handler)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func processUDP(data []byte, handler func(*models.AgentMetrics)) {
|
||||
var m models.AgentMetrics
|
||||
if err := json.Unmarshal(data, &m); err != nil {
|
||||
log.Printf("[udp] JSON invalide: %v", err)
|
||||
return
|
||||
}
|
||||
if m.Hostname == "" {
|
||||
return
|
||||
}
|
||||
handler(&m)
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package transport_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/user/nanometrics/server/models"
|
||||
"github.com/user/nanometrics/server/transport"
|
||||
)
|
||||
|
||||
func TestUDPReceive(t *testing.T) {
|
||||
received := make(chan *models.AgentMetrics, 1)
|
||||
err := transport.StartUDP("127.0.0.1:29999", func(m *models.AgentMetrics) {
|
||||
received <- m
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start UDP: %v", err)
|
||||
}
|
||||
|
||||
conn, err := net.Dial("udp", "127.0.0.1:29999")
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
cpu := 55.0
|
||||
m := models.AgentMetrics{Hostname: "test-01", IP: "127.0.0.1", Status: "online", CPUPercent: &cpu}
|
||||
data, _ := json.Marshal(m)
|
||||
conn.Write(data)
|
||||
|
||||
select {
|
||||
case got := <-received:
|
||||
if got.Hostname != "test-01" {
|
||||
t.Errorf("hostname: attendu test-01, eu %s", got.Hostname)
|
||||
}
|
||||
if got.CPUPercent == nil || *got.CPUPercent != 55.0 {
|
||||
t.Error("cpu_percent incorrect")
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Error("timeout: aucune métrique reçue")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
func Handler(hub *Hub) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("[ws] upgrade: %v", err)
|
||||
return
|
||||
}
|
||||
hub.Register(conn)
|
||||
defer func() {
|
||||
hub.Unregister(conn)
|
||||
conn.Close()
|
||||
}()
|
||||
for {
|
||||
if _, _, err := conn.ReadMessage(); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
clients map[*websocket.Conn]struct{}
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
return &Hub{clients: make(map[*websocket.Conn]struct{})}
|
||||
}
|
||||
|
||||
func (h *Hub) Register(conn *websocket.Conn) {
|
||||
h.mu.Lock()
|
||||
h.clients[conn] = struct{}{}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *Hub) Unregister(conn *websocket.Conn) {
|
||||
h.mu.Lock()
|
||||
delete(h.clients, conn)
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *Hub) Broadcast(msg interface{}) {
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Printf("[ws] marshal: %v", err)
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for conn := range h.clients {
|
||||
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
log.Printf("[ws] write: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) Count() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.clients)
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package websocket_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
wslib "github.com/gorilla/websocket"
|
||||
"github.com/user/nanometrics/server/websocket"
|
||||
)
|
||||
|
||||
func TestHubBroadcast(t *testing.T) {
|
||||
hub := websocket.NewHub()
|
||||
srv := httptest.NewServer(websocket.Handler(hub))
|
||||
defer srv.Close()
|
||||
|
||||
url := "ws" + strings.TrimPrefix(srv.URL, "http") + "/ws"
|
||||
conn, _, err := wslib.DefaultDialer.Dial(url, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
hub.Broadcast(map[string]string{"type": "test", "msg": "hello"})
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second))
|
||||
_, data, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("read: %v", err)
|
||||
}
|
||||
var got map[string]string
|
||||
json.Unmarshal(data, &got)
|
||||
if got["msg"] != "hello" {
|
||||
t.Errorf("attendu hello, eu %s", got["msg"])
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user