feat: hub de sortie WebSocket avec buffer rejouable
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,31 @@
|
|||||||
|
// server/ws/outputHub.test.ts
|
||||||
|
import { describe, it, expect, vi } from "vitest";
|
||||||
|
import { OutputHub } from "./outputHub.js";
|
||||||
|
|
||||||
|
describe("OutputHub", () => {
|
||||||
|
it("rejoue le buffer aux nouveaux abonnés", () => {
|
||||||
|
const hub = new OutputHub();
|
||||||
|
hub.publish("m1", "ligne1");
|
||||||
|
hub.publish("m1", "ligne2");
|
||||||
|
const received: string[] = [];
|
||||||
|
hub.subscribe("m1", (c) => received.push(c));
|
||||||
|
expect(received).toEqual(["ligne1", "ligne2"]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("diffuse les nouveaux chunks aux abonnés", () => {
|
||||||
|
const hub = new OutputHub();
|
||||||
|
const fn = vi.fn();
|
||||||
|
hub.subscribe("m1", fn);
|
||||||
|
hub.publish("m1", "x");
|
||||||
|
expect(fn).toHaveBeenCalledWith("x");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("vide le buffer avec clear()", () => {
|
||||||
|
const hub = new OutputHub();
|
||||||
|
hub.publish("m1", "old");
|
||||||
|
hub.clear("m1");
|
||||||
|
const received: string[] = [];
|
||||||
|
hub.subscribe("m1", (c) => received.push(c));
|
||||||
|
expect(received).toEqual([]);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
// server/ws/outputHub.ts
|
||||||
|
type Listener = (chunk: string) => void;
|
||||||
|
const MAX_BUFFER = 5000; // lignes/chunks gardés pour le rejeu
|
||||||
|
|
||||||
|
export class OutputHub {
|
||||||
|
private buffers = new Map<string, string[]>();
|
||||||
|
private listeners = new Map<string, Set<Listener>>();
|
||||||
|
|
||||||
|
publish(machineId: string, chunk: string): void {
|
||||||
|
const buf = this.buffers.get(machineId) ?? [];
|
||||||
|
buf.push(chunk);
|
||||||
|
if (buf.length > MAX_BUFFER) buf.shift();
|
||||||
|
this.buffers.set(machineId, buf);
|
||||||
|
this.listeners.get(machineId)?.forEach((l) => l(chunk));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** S'abonne et reçoit immédiatement le buffer existant (rejeu). */
|
||||||
|
subscribe(machineId: string, listener: Listener): () => void {
|
||||||
|
(this.buffers.get(machineId) ?? []).forEach((c) => listener(c));
|
||||||
|
const set = this.listeners.get(machineId) ?? new Set();
|
||||||
|
set.add(listener);
|
||||||
|
this.listeners.set(machineId, set);
|
||||||
|
return () => set.delete(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
clear(machineId: string): void {
|
||||||
|
this.buffers.delete(machineId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const outputHub = new OutputHub();
|
||||||
Reference in New Issue
Block a user