From fd3a52716467b5e92b8d733712098b9a65c4702e Mon Sep 17 00:00:00 2001 From: Nicolas Meienberger Date: Sat, 8 Nov 2025 11:06:26 +0100 Subject: [PATCH] feat: sse events --- apps/client/app/hooks/use-server-events.ts | 118 ++++++++++++++++++ apps/client/app/root.tsx | 3 + apps/server/package.json | 1 + apps/server/src/core/events.ts | 19 +++ apps/server/src/index.ts | 2 + .../src/modules/backups/backups.service.ts | 21 ++++ .../src/modules/events/events.controller.ts | 81 ++++++++++++ .../repositories/repositories.controller.ts | 4 +- .../src/modules/volumes/volume.controller.ts | 2 +- .../src/modules/volumes/volume.service.ts | 14 ++- bun.lock | 3 + 11 files changed, 263 insertions(+), 5 deletions(-) create mode 100644 apps/client/app/hooks/use-server-events.ts create mode 100644 apps/server/src/core/events.ts create mode 100644 apps/server/src/modules/events/events.controller.ts diff --git a/apps/client/app/hooks/use-server-events.ts b/apps/client/app/hooks/use-server-events.ts new file mode 100644 index 0000000..8869dca --- /dev/null +++ b/apps/client/app/hooks/use-server-events.ts @@ -0,0 +1,118 @@ +import { useEffect, useRef } from "react"; +import { useQueryClient } from "@tanstack/react-query"; + +type ServerEventType = + | "connected" + | "heartbeat" + | "backup:started" + | "backup:completed" + | "volume:mounted" + | "volume:unmounted" + | "volume:updated"; + +interface BackupEvent { + scheduleId: number; + volumeName: string; + repositoryName: string; + status?: "success" | "error"; +} + +interface VolumeEvent { + volumeName: string; +} + +type EventHandler = (data: unknown) => void; + +/** + * Hook to listen to Server-Sent Events (SSE) from the backend + * Automatically handles cache invalidation for backup and volume events + */ +export function useServerEvents() { + const queryClient = useQueryClient(); + const eventSourceRef = useRef(null); + const handlersRef = useRef>>(new Map()); + + useEffect(() => { + const eventSource = new EventSource("/api/v1/events"); + eventSourceRef.current = eventSource; + + eventSource.addEventListener("connected", () => { + console.log("[SSE] Connected to server events"); + }); + + eventSource.addEventListener("heartbeat", () => {}); + + eventSource.addEventListener("backup:started", (e) => { + const data = JSON.parse(e.data) as BackupEvent; + console.log("[SSE] Backup started:", data); + + handlersRef.current.get("backup:started")?.forEach((handler) => { + handler(data); + }); + }); + + eventSource.addEventListener("backup:completed", (e) => { + const data = JSON.parse(e.data) as BackupEvent; + console.log("[SSE] Backup completed:", data); + + queryClient.invalidateQueries(); + queryClient.refetchQueries(); + + handlersRef.current.get("backup:completed")?.forEach((handler) => { + handler(data); + }); + }); + + eventSource.addEventListener("volume:mounted", (e) => { + const data = JSON.parse(e.data) as VolumeEvent; + console.log("[SSE] Volume mounted:", data); + + handlersRef.current.get("volume:mounted")?.forEach((handler) => { + handler(data); + }); + }); + + eventSource.addEventListener("volume:unmounted", (e) => { + const data = JSON.parse(e.data) as VolumeEvent; + console.log("[SSE] Volume unmounted:", data); + + handlersRef.current.get("volume:unmounted")?.forEach((handler) => { + handler(data); + }); + }); + + eventSource.addEventListener("volume:updated", (e) => { + const data = JSON.parse(e.data) as VolumeEvent; + console.log("[SSE] Volume updated:", data); + + queryClient.invalidateQueries(); + + handlersRef.current.get("volume:updated")?.forEach((handler) => { + handler(data); + }); + }); + + eventSource.onerror = (error) => { + console.error("[SSE] Connection error:", error); + }; + + return () => { + console.log("[SSE] Disconnecting from server events"); + eventSource.close(); + eventSourceRef.current = null; + }; + }, [queryClient]); + + const addEventListener = (event: ServerEventType, handler: EventHandler) => { + if (!handlersRef.current.has(event)) { + handlersRef.current.set(event, new Set()); + } + handlersRef.current.get(event)?.add(handler); + + return () => { + handlersRef.current.get(event)?.delete(handler); + }; + }; + + return { addEventListener }; +} diff --git a/apps/client/app/root.tsx b/apps/client/app/root.tsx index 0d58509..7b238c4 100644 --- a/apps/client/app/root.tsx +++ b/apps/client/app/root.tsx @@ -5,6 +5,7 @@ import { Toaster } from "~/components/ui/sonner"; import type { Route } from "./+types/root"; import "./app.css"; import { client } from "./api-client/client.gen"; +import { useServerEvents } from "./hooks/use-server-events"; client.setConfig({ baseUrl: "/", @@ -63,6 +64,8 @@ export function Layout({ children }: { children: React.ReactNode }) { } export default function App() { + useServerEvents(); + return ; } diff --git a/apps/server/package.json b/apps/server/package.json index cd41ca5..fba9f71 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -20,6 +20,7 @@ "http-errors-enhanced": "^3.0.2", "node-cron": "^4.2.1", "slugify": "^1.6.6", + "tiny-typed-emitter": "^2.1.0", "winston": "^3.17.0" }, "devDependencies": { diff --git a/apps/server/src/core/events.ts b/apps/server/src/core/events.ts new file mode 100644 index 0000000..ba462ce --- /dev/null +++ b/apps/server/src/core/events.ts @@ -0,0 +1,19 @@ +import { EventEmitter } from "node:events"; +import type { TypedEmitter } from "tiny-typed-emitter"; + +/** + * Event payloads for the SSE system + */ +interface ServerEvents { + "backup:started": (data: { scheduleId: number; volumeName: string; repositoryName: string }) => void; + "backup:completed": (data: { scheduleId: number; volumeName: string; repositoryName: string; status: "success" | "error" }) => void; + "volume:mounted": (data: { volumeName: string }) => void; + "volume:unmounted": (data: { volumeName: string }) => void; + "volume:updated": (data: { volumeName: string }) => void; +} + +/** + * Global event emitter for server-side events + * Use this to emit events that should be broadcasted to connected clients via SSE + */ +export const serverEvents = new EventEmitter() as TypedEmitter; diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 50a8386..70f6dae 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -14,6 +14,7 @@ import { repositoriesController } from "./modules/repositories/repositories.cont import { systemController } from "./modules/system/system.controller"; import { volumeController } from "./modules/volumes/volume.controller"; import { backupScheduleController } from "./modules/backups/backups.controller"; +import { eventsController } from "./modules/events/events.controller"; import { handleServiceError } from "./utils/errors"; import { logger } from "./utils/logger"; @@ -44,6 +45,7 @@ const app = new Hono() .route("/api/v1/repositories", repositoriesController.use(requireAuth)) .route("/api/v1/backups", backupScheduleController.use(requireAuth)) .route("/api/v1/system", systemController.use(requireAuth)) + .route("/api/v1/events", eventsController.use(requireAuth)) .get("/assets/*", serveStatic({ root: "./assets/frontend" })) .get("/images/*", serveStatic({ root: "./assets/frontend" })) .get("*", serveStatic({ path: "./assets/frontend/index.html" })); diff --git a/apps/server/src/modules/backups/backups.service.ts b/apps/server/src/modules/backups/backups.service.ts index cb62035..2c783fc 100644 --- a/apps/server/src/modules/backups/backups.service.ts +++ b/apps/server/src/modules/backups/backups.service.ts @@ -9,6 +9,7 @@ import { logger } from "../../utils/logger"; import { getVolumePath } from "../volumes/helpers"; import type { CreateBackupScheduleBody, UpdateBackupScheduleBody } from "./backups.dto"; import { toMessage } from "../../utils/errors"; +import { serverEvents } from "../../core/events"; const calculateNextRun = (cronExpression: string): number => { try { @@ -181,6 +182,12 @@ const executeBackup = async (scheduleId: number, manual = false) => { logger.info(`Starting backup for volume ${volume.name} to repository ${repository.name}`); + serverEvents.emit("backup:started", { + scheduleId, + volumeName: volume.name, + repositoryName: repository.name, + }); + await db .update(backupSchedulesTable) .set({ lastBackupStatus: "in_progress", updatedAt: Date.now() }) @@ -224,6 +231,13 @@ const executeBackup = async (scheduleId: number, manual = false) => { .where(eq(backupSchedulesTable.id, scheduleId)); logger.info(`Backup completed successfully for volume ${volume.name} to repository ${repository.name}`); + + serverEvents.emit("backup:completed", { + scheduleId, + volumeName: volume.name, + repositoryName: repository.name, + status: "success", + }); } catch (error) { logger.error(`Backup failed for volume ${volume.name} to repository ${repository.name}: ${toMessage(error)}`); @@ -237,6 +251,13 @@ const executeBackup = async (scheduleId: number, manual = false) => { }) .where(eq(backupSchedulesTable.id, scheduleId)); + serverEvents.emit("backup:completed", { + scheduleId, + volumeName: volume.name, + repositoryName: repository.name, + status: "error", + }); + throw error; } }; diff --git a/apps/server/src/modules/events/events.controller.ts b/apps/server/src/modules/events/events.controller.ts new file mode 100644 index 0000000..f28e4cf --- /dev/null +++ b/apps/server/src/modules/events/events.controller.ts @@ -0,0 +1,81 @@ +import { Hono } from "hono"; +import { streamSSE } from "hono/streaming"; +import { logger } from "../../utils/logger"; +import { serverEvents } from "../../core/events"; + +export const eventsController = new Hono().get("/", (c) => { + logger.info("Client connected to SSE endpoint"); + + return streamSSE(c, async (stream) => { + await stream.writeSSE({ + data: JSON.stringify({ type: "connected", timestamp: Date.now() }), + event: "connected", + }); + + const onBackupStarted = (data: { scheduleId: number; volumeName: string; repositoryName: string }) => { + stream.writeSSE({ + data: JSON.stringify(data), + event: "backup:started", + }); + }; + + const onBackupCompleted = (data: { + scheduleId: number; + volumeName: string; + repositoryName: string; + status: "success" | "error"; + }) => { + stream.writeSSE({ + data: JSON.stringify(data), + event: "backup:completed", + }); + }; + + const onVolumeMounted = (data: { volumeName: string }) => { + stream.writeSSE({ + data: JSON.stringify(data), + event: "volume:mounted", + }); + }; + + const onVolumeUnmounted = (data: { volumeName: string }) => { + stream.writeSSE({ + data: JSON.stringify(data), + event: "volume:unmounted", + }); + }; + + const onVolumeUpdated = (data: { volumeName: string }) => { + stream.writeSSE({ + data: JSON.stringify(data), + event: "volume:updated", + }); + }; + + serverEvents.on("backup:started", onBackupStarted); + serverEvents.on("backup:completed", onBackupCompleted); + serverEvents.on("volume:mounted", onVolumeMounted); + serverEvents.on("volume:unmounted", onVolumeUnmounted); + serverEvents.on("volume:updated", onVolumeUpdated); + + let keepAlive = true; + + stream.onAbort(() => { + logger.info("Client disconnected from SSE endpoint"); + keepAlive = false; + serverEvents.off("backup:started", onBackupStarted); + serverEvents.off("backup:completed", onBackupCompleted); + serverEvents.off("volume:mounted", onVolumeMounted); + serverEvents.off("volume:unmounted", onVolumeUnmounted); + serverEvents.off("volume:updated", onVolumeUpdated); + }); + + while (keepAlive) { + await stream.writeSSE({ + data: JSON.stringify({ timestamp: Date.now() }), + event: "heartbeat", + }); + await stream.sleep(30000); + } + }); +}); diff --git a/apps/server/src/modules/repositories/repositories.controller.ts b/apps/server/src/modules/repositories/repositories.controller.ts index 7c9c3cf..ac7b495 100644 --- a/apps/server/src/modules/repositories/repositories.controller.ts +++ b/apps/server/src/modules/repositories/repositories.controller.ts @@ -73,7 +73,7 @@ export const repositoriesController = new Hono() }; }); - c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60"); + // c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60"); return c.json(snapshots, 200); }) @@ -108,7 +108,7 @@ export const repositoriesController = new Hono() const result = await repositoriesService.listSnapshotFiles(name, snapshotId, path); - c.header("Cache-Control", "max-age=300, stale-while-revalidate=600"); + // c.header("Cache-Control", "max-age=300, stale-while-revalidate=600"); return c.json(result, 200); }, diff --git a/apps/server/src/modules/volumes/volume.controller.ts b/apps/server/src/modules/volumes/volume.controller.ts index b1a0177..448d3b8 100644 --- a/apps/server/src/modules/volumes/volume.controller.ts +++ b/apps/server/src/modules/volumes/volume.controller.ts @@ -120,7 +120,7 @@ export const volumeController = new Hono() path: result.path, }; - c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60"); + // c.header("Cache-Control", "public, max-age=10, stale-while-revalidate=60"); return c.json(response, 200); }) diff --git a/apps/server/src/modules/volumes/volume.service.ts b/apps/server/src/modules/volumes/volume.service.ts index 1db9add..b7562bf 100644 --- a/apps/server/src/modules/volumes/volume.service.ts +++ b/apps/server/src/modules/volumes/volume.service.ts @@ -15,6 +15,7 @@ import { createVolumeBackend } from "../backends/backend"; import type { UpdateVolumeBody } from "./volume.dto"; import { getVolumePath } from "./helpers"; import { logger } from "../../utils/logger"; +import { serverEvents } from "../../core/events"; const listVolumes = async () => { const volumes = await db.query.volumesTable.findMany({}); @@ -88,6 +89,10 @@ const mountVolume = async (name: string) => { .set({ status, lastError: error ?? null, lastHealthCheck: Date.now() }) .where(eq(volumesTable.name, name)); + if (status === "mounted") { + serverEvents.emit("volume:mounted", { volumeName: name }); + } + return { error, status }; }; @@ -105,6 +110,10 @@ const unmountVolume = async (name: string) => { await db.update(volumesTable).set({ status }).where(eq(volumesTable.name, name)); + if (status === "unmounted") { + serverEvents.emit("volume:unmounted", { volumeName: name }); + } + return { error, status }; }; @@ -165,6 +174,8 @@ const updateVolume = async (name: string, volumeData: UpdateVolumeBody) => { .update(volumesTable) .set({ status, lastError: error ?? null, lastHealthCheck: Date.now() }) .where(eq(volumesTable.name, name)); + + serverEvents.emit("volume:updated", { volumeName: name }); } return { volume: updated }; @@ -277,8 +288,7 @@ const listFiles = async (name: string, subPath?: string) => { } // For directory volumes, use the configured path directly - const volumePath = - volume.config.backend === "directory" ? volume.config.path : getVolumePath(volume.name); + const volumePath = volume.config.backend === "directory" ? volume.config.path : getVolumePath(volume.name); const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath; diff --git a/bun.lock b/bun.lock index ba3f823..b529419 100644 --- a/bun.lock +++ b/bun.lock @@ -81,6 +81,7 @@ "http-errors-enhanced": "^3.0.2", "node-cron": "^4.2.1", "slugify": "^1.6.6", + "tiny-typed-emitter": "^2.1.0", "winston": "^3.17.0", }, "devDependencies": { @@ -1351,6 +1352,8 @@ "tiny-invariant": ["tiny-invariant@1.3.3", "", {}, "sha512-+FbBPE1o9QAYvviau/qC5SE3caw21q3xkvWKBtja5vgqOWIHHJ3ioaq1VPfn/Szqctz2bU/oYeKd9/z5BL+PVg=="], + "tiny-typed-emitter": ["tiny-typed-emitter@2.1.0", "", {}, "sha512-qVtvMxeXbVej0cQWKqVSSAHmKZEHAvxdF8HEUBFWts8h+xEo5m/lEiPakuyZ3BnCBjOD8i24kzNOiOLLgsSxhA=="], + "tinyexec": ["tinyexec@0.3.2", "", {}, "sha512-KQQR9yN7R5+OSwaK0XQoj22pwHoTlgYqmUscPYoknOoWCWfj/5/ABTMRi69FrKU5ffPVh5QcFikpWJI/P1ocHA=="], "tinyglobby": ["tinyglobby@0.2.15", "", { "dependencies": { "fdir": "^6.5.0", "picomatch": "^4.0.3" } }, "sha512-j2Zq4NyQYG5XMST4cbs02Ak8iJUdxRM0XI5QyxXuZOzKOINmWurp3smXu3y5wDcJrptwpSjgXHzIQxR0omXljQ=="],