feat(scheduler): automatisations planifiées (cron) — tâche 5
- table schedules (migration 0007) + service scheduler (croner) : CRUD, runSchedule avec scope (all/liste), pool de concurrence et verrou par machine, mapping actions → refresh/metrics/docker_scan ; reloadSchedules au boot - worker = reloadSchedules (remplace le refresh 30 min en dur) - routes /api/schedules (CRUD + :id/run) ; cron invalide rejeté (validation croner) - UI Paramètres : onglet « Automatisations » (liste, activer/lancer/supprimer, création) tsc 0 · 113 tests · build OK · boot OK (migration 0007, CRUD vérifié). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,206 @@
|
||||
// server/services/scheduler.ts
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { Cron } from "croner";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db, schema } from "../db/client.js";
|
||||
import { listMachines } from "./machines.js";
|
||||
import { refreshMachine } from "./refresh.js";
|
||||
import { collectMetrics } from "./machineMetrics.js";
|
||||
import { recordEvent } from "./machineState.js";
|
||||
|
||||
export type ScheduleAction = "apt_update_analyze" | "machine_metrics_simple" | "docker_scan";
|
||||
|
||||
export interface ScheduleScope {
|
||||
machineIds: "all" | string[];
|
||||
}
|
||||
|
||||
export interface ScheduleView {
|
||||
id: string;
|
||||
name: string;
|
||||
enabled: boolean;
|
||||
cron: string;
|
||||
timezone: string | null;
|
||||
scope: ScheduleScope;
|
||||
actions: ScheduleAction[];
|
||||
concurrency: number;
|
||||
lastRunAt: string | null;
|
||||
lastStatus: string | null;
|
||||
}
|
||||
|
||||
type ScheduleRow = typeof schema.schedules.$inferSelect;
|
||||
|
||||
function toView(r: ScheduleRow): ScheduleView {
|
||||
return {
|
||||
id: r.id,
|
||||
name: r.name,
|
||||
enabled: !!r.enabled,
|
||||
cron: r.cron,
|
||||
timezone: r.timezone,
|
||||
scope: JSON.parse(r.scopeJson) as ScheduleScope,
|
||||
actions: JSON.parse(r.actionsJson) as ScheduleAction[],
|
||||
concurrency: r.concurrency,
|
||||
lastRunAt: r.lastRunAt,
|
||||
lastStatus: r.lastStatus,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// CRUD
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export function listSchedules(): ScheduleView[] {
|
||||
return db.select().from(schema.schedules).all().map(toView);
|
||||
}
|
||||
|
||||
export function getSchedule(id: string): ScheduleView | null {
|
||||
const r = db.select().from(schema.schedules).where(eq(schema.schedules.id, id)).get();
|
||||
return r ? toView(r) : null;
|
||||
}
|
||||
|
||||
export interface ScheduleInput {
|
||||
name: string;
|
||||
cron: string;
|
||||
timezone?: string | null;
|
||||
enabled?: boolean;
|
||||
scope?: ScheduleScope;
|
||||
actions: ScheduleAction[];
|
||||
concurrency?: number;
|
||||
}
|
||||
|
||||
export function createSchedule(input: ScheduleInput): ScheduleView {
|
||||
// Valide l'expression cron (lève si invalide), sans planifier.
|
||||
new Cron(input.cron).stop();
|
||||
const id = randomUUID();
|
||||
const now = new Date().toISOString();
|
||||
db.insert(schema.schedules).values({
|
||||
id,
|
||||
name: input.name,
|
||||
enabled: input.enabled === false ? 0 : 1,
|
||||
cron: input.cron,
|
||||
timezone: input.timezone ?? "Europe/Paris",
|
||||
scopeJson: JSON.stringify(input.scope ?? { machineIds: "all" }),
|
||||
actionsJson: JSON.stringify(input.actions),
|
||||
concurrency: input.concurrency ?? 2,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
}).run();
|
||||
reloadSchedules();
|
||||
return getSchedule(id)!;
|
||||
}
|
||||
|
||||
export function updateSchedule(id: string, input: Partial<ScheduleInput>): ScheduleView {
|
||||
const cur = db.select().from(schema.schedules).where(eq(schema.schedules.id, id)).get();
|
||||
if (!cur) throw new Error("Schedule introuvable");
|
||||
if (input.cron) new Cron(input.cron).stop(); // valide sans planifier
|
||||
db.update(schema.schedules).set({
|
||||
...(input.name !== undefined ? { name: input.name } : {}),
|
||||
...(input.enabled !== undefined ? { enabled: input.enabled ? 1 : 0 } : {}),
|
||||
...(input.cron !== undefined ? { cron: input.cron } : {}),
|
||||
...(input.timezone !== undefined ? { timezone: input.timezone } : {}),
|
||||
...(input.scope !== undefined ? { scopeJson: JSON.stringify(input.scope) } : {}),
|
||||
...(input.actions !== undefined ? { actionsJson: JSON.stringify(input.actions) } : {}),
|
||||
...(input.concurrency !== undefined ? { concurrency: input.concurrency } : {}),
|
||||
updatedAt: new Date().toISOString(),
|
||||
}).where(eq(schema.schedules.id, id)).run();
|
||||
reloadSchedules();
|
||||
return getSchedule(id)!;
|
||||
}
|
||||
|
||||
export function deleteSchedule(id: string): void {
|
||||
db.delete(schema.schedules).where(eq(schema.schedules.id, id)).run();
|
||||
reloadSchedules();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Exécution
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const locked = new Set<string>();
|
||||
|
||||
function resolveMachineIds(scope: ScheduleScope): string[] {
|
||||
const all = listMachines().map((m) => m.id);
|
||||
return scope.machineIds === "all" ? all : scope.machineIds.filter((id) => all.includes(id));
|
||||
}
|
||||
|
||||
async function runActionOnMachine(machineId: string, action: ScheduleAction): Promise<void> {
|
||||
if (action === "apt_update_analyze") {
|
||||
await refreshMachine(machineId);
|
||||
} else if (action === "machine_metrics_simple") {
|
||||
await collectMetrics(machineId);
|
||||
} else if (action === "docker_scan") {
|
||||
const { scanDockerStacks } = await import("./dockerScan.js");
|
||||
await scanDockerStacks(machineId);
|
||||
}
|
||||
}
|
||||
|
||||
/** Exécute un schedule : actions sur le périmètre, avec verrou par machine et concurrence. */
|
||||
export async function runSchedule(id: string): Promise<{ ran: number; errors: number }> {
|
||||
const sched = getSchedule(id);
|
||||
if (!sched) throw new Error("Schedule introuvable");
|
||||
const machineIds = resolveMachineIds(sched.scope);
|
||||
let ran = 0;
|
||||
let errors = 0;
|
||||
|
||||
const queue = [...machineIds];
|
||||
const worker = async () => {
|
||||
for (;;) {
|
||||
const machineId = queue.shift();
|
||||
if (!machineId) break;
|
||||
if (locked.has(machineId)) continue; // une action tourne déjà sur cette machine
|
||||
locked.add(machineId);
|
||||
try {
|
||||
for (const action of sched.actions) {
|
||||
await runActionOnMachine(machineId, action);
|
||||
}
|
||||
ran++;
|
||||
} catch (err) {
|
||||
errors++;
|
||||
recordEvent({
|
||||
machineId,
|
||||
eventType: "schedule_action_failed",
|
||||
severity: "warning",
|
||||
message: `Schedule « ${sched.name} » : ${(err as Error).message}`,
|
||||
});
|
||||
} finally {
|
||||
locked.delete(machineId);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const pool = Math.max(1, Math.min(sched.concurrency, machineIds.length || 1));
|
||||
await Promise.all(Array.from({ length: pool }, () => worker()));
|
||||
|
||||
db.update(schema.schedules)
|
||||
.set({ lastRunAt: new Date().toISOString(), lastStatus: errors ? `partial (${errors} err)` : "ok" })
|
||||
.where(eq(schema.schedules.id, id))
|
||||
.run();
|
||||
return { ran, errors };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Enregistrement croner
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let jobs: Cron[] = [];
|
||||
|
||||
export function reloadSchedules(): void {
|
||||
for (const j of jobs) j.stop();
|
||||
jobs = [];
|
||||
for (const s of listSchedules()) {
|
||||
if (!s.enabled) continue;
|
||||
try {
|
||||
jobs.push(
|
||||
new Cron(s.cron, { timezone: s.timezone ?? undefined, name: s.id }, () => {
|
||||
runSchedule(s.id).catch((err) => console.error(`[scheduler] ${s.name}:`, (err as Error).message));
|
||||
}),
|
||||
);
|
||||
} catch (err) {
|
||||
console.error(`[scheduler] cron invalide pour ${s.name}:`, (err as Error).message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function stopSchedules(): void {
|
||||
for (const j of jobs) j.stop();
|
||||
jobs = [];
|
||||
}
|
||||
Reference in New Issue
Block a user