refactor(repositories): add a locking mechanism for restic operations (#94)

* refactor(repositories): add a locking mechanism for restic operations

* fix: add missing lock in list repositories
This commit is contained in:
Nico
2025-12-01 19:47:21 +01:00
committed by GitHub
parent 999850dab8
commit 9b46737852
14 changed files with 346 additions and 126 deletions

View File

@@ -11,6 +11,7 @@ import type { CreateBackupScheduleBody, UpdateBackupScheduleBody } from "./backu
import { toMessage } from "../../utils/errors";
import { serverEvents } from "../../core/events";
import { notificationsService } from "../notifications/notifications.service";
import { repoMutex } from "../../core/repository-mutex";
const runningBackups = new Map<number, AbortController>();
@@ -241,21 +242,33 @@ const executeBackup = async (scheduleId: number, manual = false) => {
backupOptions.include = schedule.includePatterns;
}
const { exitCode } = await restic.backup(repository.config, volumePath, {
...backupOptions,
compressionMode: repository.compressionMode ?? "auto",
onProgress: (progress) => {
serverEvents.emit("backup:progress", {
scheduleId,
volumeName: volume.name,
repositoryName: repository.name,
...progress,
});
},
});
const releaseBackupLock = await repoMutex.acquireShared(repository.id, `backup:${volume.name}`);
let exitCode: number;
try {
const result = await restic.backup(repository.config, volumePath, {
...backupOptions,
compressionMode: repository.compressionMode ?? "auto",
onProgress: (progress) => {
serverEvents.emit("backup:progress", {
scheduleId,
volumeName: volume.name,
repositoryName: repository.name,
...progress,
});
},
});
exitCode = result.exitCode;
} finally {
releaseBackupLock();
}
if (schedule.retentionPolicy) {
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.id.toString() });
const releaseForgetLock = await repoMutex.acquireExclusive(repository.id, `forget:${volume.name}`);
try {
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.id.toString() });
} finally {
releaseForgetLock();
}
}
const nextBackupAt = calculateNextRun(schedule.cronExpression);
@@ -403,7 +416,13 @@ const runForget = async (scheduleId: number) => {
}
logger.info(`Manually running retention policy (forget) for schedule ${scheduleId}`);
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.id.toString() });
const releaseLock = await repoMutex.acquireExclusive(repository.id, `forget:manual:${scheduleId}`);
try {
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.id.toString() });
} finally {
releaseLock();
}
logger.info(`Retention policy applied successfully for schedule ${scheduleId}`);
};

View File

@@ -34,7 +34,7 @@ export const startup = async () => {
Scheduler.build(CleanupDanglingMountsJob).schedule("0 * * * *");
Scheduler.build(VolumeHealthCheckJob).schedule("*/30 * * * *");
Scheduler.build(RepositoryHealthCheckJob).schedule("0 12 * * *");
Scheduler.build(RepositoryHealthCheckJob).schedule("50 12 * * *");
Scheduler.build(BackupExecutionJob).schedule("* * * * *");
Scheduler.build(CleanupSessionsJob).schedule("0 0 * * *");
};

View File

@@ -8,6 +8,7 @@ import { toMessage } from "../../utils/errors";
import { generateShortId } from "../../utils/id";
import { restic } from "../../utils/restic";
import { cryptoUtils } from "../../utils/crypto";
import { repoMutex } from "../../core/repository-mutex";
import type { CompressionMode, OverwriteMode, RepositoryConfig } from "~/schemas/restic";
const listRepositories = async () => {
@@ -160,15 +161,20 @@ const listSnapshots = async (name: string, backupId?: string) => {
throw new NotFoundError("Repository not found");
}
let snapshots = [];
const releaseLock = await repoMutex.acquireShared(repository.id, "snapshots");
try {
let snapshots = [];
if (backupId) {
snapshots = await restic.snapshots(repository.config, { tags: [backupId.toString()] });
} else {
snapshots = await restic.snapshots(repository.config);
if (backupId) {
snapshots = await restic.snapshots(repository.config, { tags: [backupId.toString()] });
} else {
snapshots = await restic.snapshots(repository.config);
}
return snapshots;
} finally {
releaseLock();
}
return snapshots;
};
const listSnapshotFiles = async (name: string, snapshotId: string, path?: string) => {
@@ -180,22 +186,27 @@ const listSnapshotFiles = async (name: string, snapshotId: string, path?: string
throw new NotFoundError("Repository not found");
}
const result = await restic.ls(repository.config, snapshotId, path);
const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`);
try {
const result = await restic.ls(repository.config, snapshotId, path);
if (!result.snapshot) {
throw new NotFoundError("Snapshot not found or empty");
if (!result.snapshot) {
throw new NotFoundError("Snapshot not found or empty");
}
return {
snapshot: {
id: result.snapshot.id,
short_id: result.snapshot.short_id,
time: result.snapshot.time,
hostname: result.snapshot.hostname,
paths: result.snapshot.paths,
},
files: result.nodes,
};
} finally {
releaseLock();
}
return {
snapshot: {
id: result.snapshot.id,
short_id: result.snapshot.short_id,
time: result.snapshot.time,
hostname: result.snapshot.hostname,
paths: result.snapshot.paths,
},
files: result.nodes,
};
};
const restoreSnapshot = async (
@@ -220,14 +231,19 @@ const restoreSnapshot = async (
const target = options?.targetPath || "/";
const result = await restic.restore(repository.config, snapshotId, target, options);
const releaseLock = await repoMutex.acquireShared(repository.id, `restore:${snapshotId}`);
try {
const result = await restic.restore(repository.config, snapshotId, target, options);
return {
success: true,
message: "Snapshot restored successfully",
filesRestored: result.files_restored,
filesSkipped: result.files_skipped,
};
return {
success: true,
message: "Snapshot restored successfully",
filesRestored: result.files_restored,
filesSkipped: result.files_skipped,
};
} finally {
releaseLock();
}
};
const getSnapshotDetails = async (name: string, snapshotId: string) => {
@@ -239,14 +255,19 @@ const getSnapshotDetails = async (name: string, snapshotId: string) => {
throw new NotFoundError("Repository not found");
}
const snapshots = await restic.snapshots(repository.config);
const snapshot = snapshots.find((snap) => snap.id === snapshotId || snap.short_id === snapshotId);
const releaseLock = await repoMutex.acquireShared(repository.id, `snapshot_details:${snapshotId}`);
try {
const snapshots = await restic.snapshots(repository.config);
const snapshot = snapshots.find((snap) => snap.id === snapshotId || snap.short_id === snapshotId);
if (!snapshot) {
throw new NotFoundError("Snapshot not found");
if (!snapshot) {
throw new NotFoundError("Snapshot not found");
}
return snapshot;
} finally {
releaseLock();
}
return snapshot;
};
const checkHealth = async (repositoryId: string) => {
@@ -258,18 +279,23 @@ const checkHealth = async (repositoryId: string) => {
throw new NotFoundError("Repository not found");
}
const { hasErrors, error } = await restic.check(repository.config);
const releaseLock = await repoMutex.acquireExclusive(repository.id, "check");
try {
const { hasErrors, error } = await restic.check(repository.config);
await db
.update(repositoriesTable)
.set({
status: hasErrors ? "error" : "healthy",
lastChecked: Date.now(),
lastError: error,
})
.where(eq(repositoriesTable.id, repository.id));
await db
.update(repositoriesTable)
.set({
status: hasErrors ? "error" : "healthy",
lastChecked: Date.now(),
lastError: error,
})
.where(eq(repositoriesTable.id, repository.id));
return { lastError: error };
return { lastError: error };
} finally {
releaseLock();
}
};
const doctorRepository = async (name: string) => {
@@ -295,48 +321,51 @@ const doctorRepository = async (name: string) => {
error: unlockResult.error,
});
const checkResult = await restic.check(repository.config, { readData: false }).then(
(result) => result,
(error) => ({ success: false, output: null, error: toMessage(error), hasErrors: true }),
);
steps.push({
step: "check",
success: checkResult.success,
output: checkResult.output,
error: checkResult.error,
});
if (checkResult.hasErrors) {
const repairResult = await restic.repairIndex(repository.config).then(
(result) => ({ success: true, output: result.output, error: null }),
(error) => ({ success: false, output: null, error: toMessage(error) }),
);
steps.push({
step: "repair_index",
success: repairResult.success,
output: repairResult.output,
error: repairResult.error,
});
const recheckResult = await restic.check(repository.config, { readData: false }).then(
const releaseLock = await repoMutex.acquireExclusive(repository.id, "doctor");
try {
const checkResult = await restic.check(repository.config, { readData: false }).then(
(result) => result,
(error) => ({ success: false, output: null, error: toMessage(error), hasErrors: true }),
);
steps.push({
step: "recheck",
success: recheckResult.success,
output: recheckResult.output,
error: recheckResult.error,
step: "check",
success: checkResult.success,
output: checkResult.output,
error: checkResult.error,
});
if (checkResult.hasErrors) {
const repairResult = await restic.repairIndex(repository.config).then(
(result) => ({ success: true, output: result.output, error: null }),
(error) => ({ success: false, output: null, error: toMessage(error) }),
);
steps.push({
step: "repair_index",
success: repairResult.success,
output: repairResult.output,
error: repairResult.error,
});
const recheckResult = await restic.check(repository.config, { readData: false }).then(
(result) => result,
(error) => ({ success: false, output: null, error: toMessage(error), hasErrors: true }),
);
steps.push({
step: "recheck",
success: recheckResult.success,
output: recheckResult.output,
error: recheckResult.error,
});
}
} finally {
releaseLock();
}
const allSuccessful = steps.every((s) => s.success);
console.log("Doctor steps:", steps);
await db
.update(repositoriesTable)
.set({
@@ -361,7 +390,12 @@ const deleteSnapshot = async (name: string, snapshotId: string) => {
throw new NotFoundError("Repository not found");
}
await restic.deleteSnapshot(repository.config, snapshotId);
const releaseLock = await repoMutex.acquireExclusive(repository.id, `delete:${snapshotId}`);
try {
await restic.deleteSnapshot(repository.config, snapshotId);
} finally {
releaseLock();
}
};
const updateRepository = async (name: string, updates: { name?: string; compressionMode?: CompressionMode }) => {