feat: sse events

This commit is contained in:
Nicolas Meienberger
2025-11-08 11:06:26 +01:00
parent 5b4b571581
commit fd3a527164
11 changed files with 263 additions and 5 deletions

View File

@@ -0,0 +1,118 @@
import { useEffect, useRef } from "react";
import { useQueryClient } from "@tanstack/react-query";
type ServerEventType =
| "connected"
| "heartbeat"
| "backup:started"
| "backup:completed"
| "volume:mounted"
| "volume:unmounted"
| "volume:updated";
interface BackupEvent {
scheduleId: number;
volumeName: string;
repositoryName: string;
status?: "success" | "error";
}
interface VolumeEvent {
volumeName: string;
}
type EventHandler = (data: unknown) => void;
/**
* Hook to listen to Server-Sent Events (SSE) from the backend
* Automatically handles cache invalidation for backup and volume events
*/
export function useServerEvents() {
const queryClient = useQueryClient();
const eventSourceRef = useRef<EventSource | null>(null);
const handlersRef = useRef<Map<ServerEventType, Set<EventHandler>>>(new Map());
useEffect(() => {
const eventSource = new EventSource("/api/v1/events");
eventSourceRef.current = eventSource;
eventSource.addEventListener("connected", () => {
console.log("[SSE] Connected to server events");
});
eventSource.addEventListener("heartbeat", () => {});
eventSource.addEventListener("backup:started", (e) => {
const data = JSON.parse(e.data) as BackupEvent;
console.log("[SSE] Backup started:", data);
handlersRef.current.get("backup:started")?.forEach((handler) => {
handler(data);
});
});
eventSource.addEventListener("backup:completed", (e) => {
const data = JSON.parse(e.data) as BackupEvent;
console.log("[SSE] Backup completed:", data);
queryClient.invalidateQueries();
queryClient.refetchQueries();
handlersRef.current.get("backup:completed")?.forEach((handler) => {
handler(data);
});
});
eventSource.addEventListener("volume:mounted", (e) => {
const data = JSON.parse(e.data) as VolumeEvent;
console.log("[SSE] Volume mounted:", data);
handlersRef.current.get("volume:mounted")?.forEach((handler) => {
handler(data);
});
});
eventSource.addEventListener("volume:unmounted", (e) => {
const data = JSON.parse(e.data) as VolumeEvent;
console.log("[SSE] Volume unmounted:", data);
handlersRef.current.get("volume:unmounted")?.forEach((handler) => {
handler(data);
});
});
eventSource.addEventListener("volume:updated", (e) => {
const data = JSON.parse(e.data) as VolumeEvent;
console.log("[SSE] Volume updated:", data);
queryClient.invalidateQueries();
handlersRef.current.get("volume:updated")?.forEach((handler) => {
handler(data);
});
});
eventSource.onerror = (error) => {
console.error("[SSE] Connection error:", error);
};
return () => {
console.log("[SSE] Disconnecting from server events");
eventSource.close();
eventSourceRef.current = null;
};
}, [queryClient]);
const addEventListener = (event: ServerEventType, handler: EventHandler) => {
if (!handlersRef.current.has(event)) {
handlersRef.current.set(event, new Set());
}
handlersRef.current.get(event)?.add(handler);
return () => {
handlersRef.current.get(event)?.delete(handler);
};
};
return { addEventListener };
}

View File

@@ -5,6 +5,7 @@ import { Toaster } from "~/components/ui/sonner";
import type { Route } from "./+types/root";
import "./app.css";
import { client } from "./api-client/client.gen";
import { useServerEvents } from "./hooks/use-server-events";
client.setConfig({
baseUrl: "/",
@@ -63,6 +64,8 @@ export function Layout({ children }: { children: React.ReactNode }) {
}
export default function App() {
useServerEvents();
return <Outlet />;
}

View File

@@ -20,6 +20,7 @@
"http-errors-enhanced": "^3.0.2",
"node-cron": "^4.2.1",
"slugify": "^1.6.6",
"tiny-typed-emitter": "^2.1.0",
"winston": "^3.17.0"
},
"devDependencies": {

View File

@@ -0,0 +1,19 @@
import { EventEmitter } from "node:events";
import type { TypedEmitter } from "tiny-typed-emitter";
/**
* Event payloads for the SSE system
*/
interface ServerEvents {
"backup:started": (data: { scheduleId: number; volumeName: string; repositoryName: string }) => void;
"backup:completed": (data: { scheduleId: number; volumeName: string; repositoryName: string; status: "success" | "error" }) => void;
"volume:mounted": (data: { volumeName: string }) => void;
"volume:unmounted": (data: { volumeName: string }) => void;
"volume:updated": (data: { volumeName: string }) => void;
}
/**
* Global event emitter for server-side events
* Use this to emit events that should be broadcasted to connected clients via SSE
*/
export const serverEvents = new EventEmitter() as TypedEmitter<ServerEvents>;

View File

@@ -14,6 +14,7 @@ import { repositoriesController } from "./modules/repositories/repositories.cont
import { systemController } from "./modules/system/system.controller";
import { volumeController } from "./modules/volumes/volume.controller";
import { backupScheduleController } from "./modules/backups/backups.controller";
import { eventsController } from "./modules/events/events.controller";
import { handleServiceError } from "./utils/errors";
import { logger } from "./utils/logger";
@@ -44,6 +45,7 @@ const app = new Hono()
.route("/api/v1/repositories", repositoriesController.use(requireAuth))
.route("/api/v1/backups", backupScheduleController.use(requireAuth))
.route("/api/v1/system", systemController.use(requireAuth))
.route("/api/v1/events", eventsController.use(requireAuth))
.get("/assets/*", serveStatic({ root: "./assets/frontend" }))
.get("/images/*", serveStatic({ root: "./assets/frontend" }))
.get("*", serveStatic({ path: "./assets/frontend/index.html" }));

View File

@@ -9,6 +9,7 @@ import { logger } from "../../utils/logger";
import { getVolumePath } from "../volumes/helpers";
import type { CreateBackupScheduleBody, UpdateBackupScheduleBody } from "./backups.dto";
import { toMessage } from "../../utils/errors";
import { serverEvents } from "../../core/events";
const calculateNextRun = (cronExpression: string): number => {
try {
@@ -181,6 +182,12 @@ const executeBackup = async (scheduleId: number, manual = false) => {
logger.info(`Starting backup for volume ${volume.name} to repository ${repository.name}`);
serverEvents.emit("backup:started", {
scheduleId,
volumeName: volume.name,
repositoryName: repository.name,
});
await db
.update(backupSchedulesTable)
.set({ lastBackupStatus: "in_progress", updatedAt: Date.now() })
@@ -224,6 +231,13 @@ const executeBackup = async (scheduleId: number, manual = false) => {
.where(eq(backupSchedulesTable.id, scheduleId));
logger.info(`Backup completed successfully for volume ${volume.name} to repository ${repository.name}`);
serverEvents.emit("backup:completed", {
scheduleId,
volumeName: volume.name,
repositoryName: repository.name,
status: "success",
});
} catch (error) {
logger.error(`Backup failed for volume ${volume.name} to repository ${repository.name}: ${toMessage(error)}`);
@@ -237,6 +251,13 @@ const executeBackup = async (scheduleId: number, manual = false) => {
})
.where(eq(backupSchedulesTable.id, scheduleId));
serverEvents.emit("backup:completed", {
scheduleId,
volumeName: volume.name,
repositoryName: repository.name,
status: "error",
});
throw error;
}
};

View File

@@ -0,0 +1,81 @@
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { logger } from "../../utils/logger";
import { serverEvents } from "../../core/events";
export const eventsController = new Hono().get("/", (c) => {
logger.info("Client connected to SSE endpoint");
return streamSSE(c, async (stream) => {
await stream.writeSSE({
data: JSON.stringify({ type: "connected", timestamp: Date.now() }),
event: "connected",
});
const onBackupStarted = (data: { scheduleId: number; volumeName: string; repositoryName: string }) => {
stream.writeSSE({
data: JSON.stringify(data),
event: "backup:started",
});
};
const onBackupCompleted = (data: {
scheduleId: number;
volumeName: string;
repositoryName: string;
status: "success" | "error";
}) => {
stream.writeSSE({
data: JSON.stringify(data),
event: "backup:completed",
});
};
const onVolumeMounted = (data: { volumeName: string }) => {
stream.writeSSE({
data: JSON.stringify(data),
event: "volume:mounted",
});
};
const onVolumeUnmounted = (data: { volumeName: string }) => {
stream.writeSSE({
data: JSON.stringify(data),
event: "volume:unmounted",
});
};
const onVolumeUpdated = (data: { volumeName: string }) => {
stream.writeSSE({
data: JSON.stringify(data),
event: "volume:updated",
});
};
serverEvents.on("backup:started", onBackupStarted);
serverEvents.on("backup:completed", onBackupCompleted);
serverEvents.on("volume:mounted", onVolumeMounted);
serverEvents.on("volume:unmounted", onVolumeUnmounted);
serverEvents.on("volume:updated", onVolumeUpdated);
let keepAlive = true;
stream.onAbort(() => {
logger.info("Client disconnected from SSE endpoint");
keepAlive = false;
serverEvents.off("backup:started", onBackupStarted);
serverEvents.off("backup:completed", onBackupCompleted);
serverEvents.off("volume:mounted", onVolumeMounted);
serverEvents.off("volume:unmounted", onVolumeUnmounted);
serverEvents.off("volume:updated", onVolumeUpdated);
});
while (keepAlive) {
await stream.writeSSE({
data: JSON.stringify({ timestamp: Date.now() }),
event: "heartbeat",
});
await stream.sleep(30000);
}
});
});

View File

@@ -73,7 +73,7 @@ export const repositoriesController = new Hono()
};
});
c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60");
// c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60");
return c.json<ListSnapshotsDto>(snapshots, 200);
})
@@ -108,7 +108,7 @@ export const repositoriesController = new Hono()
const result = await repositoriesService.listSnapshotFiles(name, snapshotId, path);
c.header("Cache-Control", "max-age=300, stale-while-revalidate=600");
// c.header("Cache-Control", "max-age=300, stale-while-revalidate=600");
return c.json<ListSnapshotFilesDto>(result, 200);
},

View File

@@ -120,7 +120,7 @@ export const volumeController = new Hono()
path: result.path,
};
c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60");
// c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60");
return c.json<ListFilesDto>(response, 200);
})

View File

@@ -15,6 +15,7 @@ import { createVolumeBackend } from "../backends/backend";
import type { UpdateVolumeBody } from "./volume.dto";
import { getVolumePath } from "./helpers";
import { logger } from "../../utils/logger";
import { serverEvents } from "../../core/events";
const listVolumes = async () => {
const volumes = await db.query.volumesTable.findMany({});
@@ -88,6 +89,10 @@ const mountVolume = async (name: string) => {
.set({ status, lastError: error ?? null, lastHealthCheck: Date.now() })
.where(eq(volumesTable.name, name));
if (status === "mounted") {
serverEvents.emit("volume:mounted", { volumeName: name });
}
return { error, status };
};
@@ -105,6 +110,10 @@ const unmountVolume = async (name: string) => {
await db.update(volumesTable).set({ status }).where(eq(volumesTable.name, name));
if (status === "unmounted") {
serverEvents.emit("volume:unmounted", { volumeName: name });
}
return { error, status };
};
@@ -165,6 +174,8 @@ const updateVolume = async (name: string, volumeData: UpdateVolumeBody) => {
.update(volumesTable)
.set({ status, lastError: error ?? null, lastHealthCheck: Date.now() })
.where(eq(volumesTable.name, name));
serverEvents.emit("volume:updated", { volumeName: name });
}
return { volume: updated };
@@ -277,8 +288,7 @@ const listFiles = async (name: string, subPath?: string) => {
}
// For directory volumes, use the configured path directly
const volumePath =
volume.config.backend === "directory" ? volume.config.path : getVolumePath(volume.name);
const volumePath = volume.config.backend === "directory" ? volume.config.path : getVolumePath(volume.name);
const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath;

View File

@@ -81,6 +81,7 @@
"http-errors-enhanced": "^3.0.2",
"node-cron": "^4.2.1",
"slugify": "^1.6.6",
"tiny-typed-emitter": "^2.1.0",
"winston": "^3.17.0",
},
"devDependencies": {
@@ -1351,6 +1352,8 @@
"tiny-invariant": ["tiny-invariant@1.3.3", "", {}, "sha512-+FbBPE1o9QAYvviau/qC5SE3caw21q3xkvWKBtja5vgqOWIHHJ3ioaq1VPfn/Szqctz2bU/oYeKd9/z5BL+PVg=="],
"tiny-typed-emitter": ["tiny-typed-emitter@2.1.0", "", {}, "sha512-qVtvMxeXbVej0cQWKqVSSAHmKZEHAvxdF8HEUBFWts8h+xEo5m/lEiPakuyZ3BnCBjOD8i24kzNOiOLLgsSxhA=="],
"tinyexec": ["tinyexec@0.3.2", "", {}, "sha512-KQQR9yN7R5+OSwaK0XQoj22pwHoTlgYqmUscPYoknOoWCWfj/5/ABTMRi69FrKU5ffPVh5QcFikpWJI/P1ocHA=="],
"tinyglobby": ["tinyglobby@0.2.15", "", { "dependencies": { "fdir": "^6.5.0", "picomatch": "^4.0.3" } }, "sha512-j2Zq4NyQYG5XMST4cbs02Ak8iJUdxRM0XI5QyxXuZOzKOINmWurp3smXu3y5wDcJrptwpSjgXHzIQxR0omXljQ=="],