refactor(backend): better job scheduling pattern

This commit is contained in:
Nicolas Meienberger
2025-10-25 17:46:01 +02:00
parent 47ff720adb
commit 2202ad3247
6 changed files with 399 additions and 69 deletions

View File

@@ -0,0 +1,44 @@
import cron, { type ScheduledTask } from "node-cron";
import { logger } from "../utils/logger";
export abstract class Job {
abstract run(): Promise<unknown>;
}
type JobConstructor = new () => Job;
class SchedulerClass {
private tasks: ScheduledTask[] = [];
async start() {
logger.info("Scheduler started");
}
build(JobClass: JobConstructor) {
const job = new JobClass();
return {
schedule: (cronExpression: string) => {
const task = cron.schedule(cronExpression, async () => {
try {
await job.run();
} catch (error) {
logger.error(`Job ${JobClass.name} failed:`, error);
}
});
this.tasks.push(task);
logger.info(`Scheduled job ${JobClass.name} with cron: ${cronExpression}`);
},
};
}
async stop() {
for (const task of this.tasks) {
task.stop();
}
this.tasks = [];
logger.info("Scheduler stopped");
}
}
export const Scheduler = new SchedulerClass();

View File

@@ -0,0 +1,49 @@
import { Job } from "../core/scheduler";
import path from "node:path";
import fs from "node:fs/promises";
import { volumeService } from "../modules/volumes/volume.service";
import { readMountInfo } from "../utils/mountinfo";
import { getVolumePath } from "../modules/volumes/helpers";
import { logger } from "../utils/logger";
import { executeUnmount } from "../modules/backends/utils/backend-utils";
import { toMessage } from "../utils/errors";
import { VOLUME_MOUNT_BASE } from "../core/constants";
export class CleanupDanglingMountsJob extends Job {
async run() {
const allVolumes = await volumeService.listVolumes();
const allSystemMounts = await readMountInfo();
for (const mount of allSystemMounts) {
if (mount.mountPoint.includes("ironmount") && mount.mountPoint.endsWith("_data")) {
const matchingVolume = allVolumes.find((v) => getVolumePath(v.name) === mount.mountPoint);
if (!matchingVolume) {
logger.info(`Found dangling mount at ${mount.mountPoint}, attempting to unmount...`);
await executeUnmount(mount.mountPoint);
await fs.rmdir(path.dirname(mount.mountPoint)).catch((err) => {
logger.warn(
`Failed to remove dangling mount directory ${path.dirname(mount.mountPoint)}: ${toMessage(err)}`,
);
});
}
}
}
const allIronmountDirs = await fs.readdir(VOLUME_MOUNT_BASE).catch(() => []);
for (const dir of allIronmountDirs) {
const volumePath = getVolumePath(dir);
const matchingVolume = allVolumes.find((v) => getVolumePath(v.name) === volumePath);
if (!matchingVolume) {
const fullPath = path.join(VOLUME_MOUNT_BASE, dir);
logger.info(`Found dangling mount directory at ${fullPath}, attempting to remove...`);
await fs.rmdir(fullPath, { recursive: true }).catch((err) => {
logger.warn(`Failed to remove dangling mount directory ${fullPath}: ${toMessage(err)}`);
});
}
}
return { done: true, timestamp: new Date() };
}
}

View File

@@ -0,0 +1,25 @@
import { Job } from "../core/scheduler";
import { volumeService } from "../modules/volumes/volume.service";
import { logger } from "../utils/logger";
import { db } from "../db/db";
import { eq, or } from "drizzle-orm";
import { volumesTable } from "../db/schema";
export class VolumeHealthCheckJob extends Job {
async run() {
logger.debug("Running health check for all volumes...");
const volumes = await db.query.volumesTable.findMany({
where: or(eq(volumesTable.status, "mounted"), eq(volumesTable.status, "error")),
});
for (const volume of volumes) {
const { status } = await volumeService.checkHealth(volume.name);
if (status === "error" && volume.autoRemount) {
await volumeService.mountVolume(volume.name);
}
}
return { done: true, timestamp: new Date() };
}
}

View File

@@ -1,42 +0,0 @@
import fs from "node:fs/promises";
import path from "node:path";
import { VOLUME_MOUNT_BASE } from "../../core/constants";
import { toMessage } from "../../utils/errors";
import { logger } from "../../utils/logger";
import { readMountInfo } from "../../utils/mountinfo";
import { executeUnmount } from "../backends/utils/backend-utils";
import { getVolumePath } from "../volumes/helpers";
import { volumeService } from "../volumes/volume.service";
export const cleanupDanglingMounts = async () => {
const allVolumes = await volumeService.listVolumes();
const allSystemMounts = await readMountInfo();
for (const mount of allSystemMounts) {
if (mount.mountPoint.includes("ironmount") && mount.mountPoint.endsWith("_data")) {
const matchingVolume = allVolumes.find((v) => getVolumePath(v.name) === mount.mountPoint);
if (!matchingVolume) {
logger.info(`Found dangling mount at ${mount.mountPoint}, attempting to unmount...`);
await executeUnmount(mount.mountPoint);
await fs.rmdir(path.dirname(mount.mountPoint)).catch((err) => {
logger.warn(`Failed to remove dangling mount directory ${path.dirname(mount.mountPoint)}: ${toMessage(err)}`);
});
}
}
}
const allIronmountDirs = await fs.readdir(VOLUME_MOUNT_BASE).catch(() => []);
for (const dir of allIronmountDirs) {
const volumePath = getVolumePath(dir);
const matchingVolume = allVolumes.find((v) => getVolumePath(v.name) === volumePath);
if (!matchingVolume) {
const fullPath = path.join(VOLUME_MOUNT_BASE, dir);
logger.info(`Found dangling mount directory at ${fullPath}, attempting to remove...`);
await fs.rmdir(fullPath, { recursive: true }).catch((err) => {
logger.warn(`Failed to remove dangling mount directory ${fullPath}: ${toMessage(err)}`);
});
}
}
};

View File

@@ -1,19 +1,19 @@
import { Scheduler } from "../../core/scheduler";
import { and, eq, or } from "drizzle-orm";
import { getTasks, schedule } from "node-cron";
import { db } from "../../db/db";
import { volumesTable } from "../../db/schema";
import { logger } from "../../utils/logger";
import { restic } from "../../utils/restic";
import { volumeService } from "../volumes/volume.service";
import { cleanupDanglingMounts } from "./cleanup";
import { CleanupDanglingMountsJob } from "../../jobs/cleanup-dangling";
import { VolumeHealthCheckJob } from "../../jobs/healthchecks";
export const startup = async () => {
await Scheduler.start();
await restic.ensurePassfile().catch((err) => {
logger.error(`Error ensuring restic passfile exists: ${err.message}`);
});
cleanupDanglingMounts().catch((err) => {
logger.error(`Error during startup cleanup of dangling mounts: ${err.message}`);
});
const volumes = await db.query.volumesTable.findMany({
where: or(
@@ -28,26 +28,6 @@ export const startup = async () => {
});
}
const existingTasks = getTasks();
existingTasks.forEach(async (task) => await task.destroy());
schedule("0 * * * *", async () => {
logger.debug("Running hourly cleanup of dangling mounts...");
await cleanupDanglingMounts();
});
schedule("* * * * *", async () => {
logger.debug("Running health check for all volumes...");
const volumes = await db.query.volumesTable.findMany({
where: or(eq(volumesTable.status, "mounted"), eq(volumesTable.status, "error")),
});
for (const volume of volumes) {
const { status } = await volumeService.checkHealth(volume.name);
if (status === "error" && volume.autoRemount) {
await volumeService.mountVolume(volume.name);
}
}
});
Scheduler.build(CleanupDanglingMountsJob).schedule("0 * * * *");
Scheduler.build(VolumeHealthCheckJob).schedule("* * * * *");
};