mirror of
https://github.com/nicotsx/ironmount.git
synced 2025-12-10 12:10:51 +01:00
feat: mirror repositories (#95)
* feat: mirror repositories feat: mirror backup repositories * chore: pr feedbacks
This commit is contained in:
@@ -3,15 +3,16 @@ import cron from "node-cron";
|
||||
import { CronExpressionParser } from "cron-parser";
|
||||
import { NotFoundError, BadRequestError, ConflictError } from "http-errors-enhanced";
|
||||
import { db } from "../../db/db";
|
||||
import { backupSchedulesTable, repositoriesTable, volumesTable } from "../../db/schema";
|
||||
import { backupSchedulesTable, backupScheduleMirrorsTable, repositoriesTable, volumesTable } from "../../db/schema";
|
||||
import { restic } from "../../utils/restic";
|
||||
import { logger } from "../../utils/logger";
|
||||
import { getVolumePath } from "../volumes/helpers";
|
||||
import type { CreateBackupScheduleBody, UpdateBackupScheduleBody } from "./backups.dto";
|
||||
import type { CreateBackupScheduleBody, UpdateBackupScheduleBody, UpdateScheduleMirrorsBody } from "./backups.dto";
|
||||
import { toMessage } from "../../utils/errors";
|
||||
import { serverEvents } from "../../core/events";
|
||||
import { notificationsService } from "../notifications/notifications.service";
|
||||
import { repoMutex } from "../../core/repository-mutex";
|
||||
import { checkMirrorCompatibility, getIncompatibleMirrorError } from "~/server/utils/backend-compatibility";
|
||||
|
||||
const runningBackups = new Map<number, AbortController>();
|
||||
|
||||
@@ -266,19 +267,25 @@ const executeBackup = async (scheduleId: number, manual = false) => {
|
||||
void runForget(schedule.id);
|
||||
}
|
||||
|
||||
copyToMirrors(scheduleId, repository, schedule.retentionPolicy).catch((error) => {
|
||||
logger.error(`Background mirror copy failed for schedule ${scheduleId}: ${toMessage(error)}`);
|
||||
});
|
||||
|
||||
const finalStatus = exitCode === 0 ? "success" : "warning";
|
||||
|
||||
const nextBackupAt = calculateNextRun(schedule.cronExpression);
|
||||
await db
|
||||
.update(backupSchedulesTable)
|
||||
.set({
|
||||
lastBackupAt: Date.now(),
|
||||
lastBackupStatus: exitCode === 0 ? "success" : "warning",
|
||||
lastBackupStatus: finalStatus,
|
||||
lastBackupError: null,
|
||||
nextBackupAt: nextBackupAt,
|
||||
updatedAt: Date.now(),
|
||||
})
|
||||
.where(eq(backupSchedulesTable.id, scheduleId));
|
||||
|
||||
if (exitCode !== 0) {
|
||||
if (finalStatus === "warning") {
|
||||
logger.warn(`Backup completed with warnings for volume ${volume.name} to repository ${repository.name}`);
|
||||
} else {
|
||||
logger.info(`Backup completed successfully for volume ${volume.name} to repository ${repository.name}`);
|
||||
@@ -288,11 +295,11 @@ const executeBackup = async (scheduleId: number, manual = false) => {
|
||||
scheduleId,
|
||||
volumeName: volume.name,
|
||||
repositoryName: repository.name,
|
||||
status: exitCode === 0 ? "success" : "warning",
|
||||
status: finalStatus,
|
||||
});
|
||||
|
||||
notificationsService
|
||||
.sendBackupNotification(scheduleId, exitCode === 0 ? "success" : "warning", {
|
||||
.sendBackupNotification(scheduleId, finalStatus === "success" ? "success" : "warning", {
|
||||
volumeName: volume.name,
|
||||
repositoryName: repository.name,
|
||||
})
|
||||
@@ -421,6 +428,174 @@ const runForget = async (scheduleId: number) => {
|
||||
logger.info(`Retention policy applied successfully for schedule ${scheduleId}`);
|
||||
};
|
||||
|
||||
const getMirrors = async (scheduleId: number) => {
|
||||
const schedule = await db.query.backupSchedulesTable.findFirst({
|
||||
where: eq(backupSchedulesTable.id, scheduleId),
|
||||
});
|
||||
|
||||
if (!schedule) {
|
||||
throw new NotFoundError("Backup schedule not found");
|
||||
}
|
||||
|
||||
const mirrors = await db.query.backupScheduleMirrorsTable.findMany({
|
||||
where: eq(backupScheduleMirrorsTable.scheduleId, scheduleId),
|
||||
with: { repository: true },
|
||||
});
|
||||
|
||||
return mirrors;
|
||||
};
|
||||
|
||||
const updateMirrors = async (scheduleId: number, data: UpdateScheduleMirrorsBody) => {
|
||||
const schedule = await db.query.backupSchedulesTable.findFirst({
|
||||
where: eq(backupSchedulesTable.id, scheduleId),
|
||||
with: { repository: true },
|
||||
});
|
||||
|
||||
if (!schedule) {
|
||||
throw new NotFoundError("Backup schedule not found");
|
||||
}
|
||||
|
||||
for (const mirror of data.mirrors) {
|
||||
if (mirror.repositoryId === schedule.repositoryId) {
|
||||
throw new BadRequestError("Cannot add the primary repository as a mirror");
|
||||
}
|
||||
|
||||
const repo = await db.query.repositoriesTable.findFirst({
|
||||
where: eq(repositoriesTable.id, mirror.repositoryId),
|
||||
});
|
||||
|
||||
if (!repo) {
|
||||
throw new NotFoundError(`Repository ${mirror.repositoryId} not found`);
|
||||
}
|
||||
|
||||
const compatibility = await checkMirrorCompatibility(schedule.repository.config, repo.config, repo.id);
|
||||
|
||||
if (!compatibility.compatible) {
|
||||
throw new BadRequestError(
|
||||
getIncompatibleMirrorError(repo.name, schedule.repository.config.backend, repo.config.backend),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await db.delete(backupScheduleMirrorsTable).where(eq(backupScheduleMirrorsTable.scheduleId, scheduleId));
|
||||
|
||||
if (data.mirrors.length > 0) {
|
||||
await db.insert(backupScheduleMirrorsTable).values(
|
||||
data.mirrors.map((mirror) => ({
|
||||
scheduleId,
|
||||
repositoryId: mirror.repositoryId,
|
||||
enabled: mirror.enabled,
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
return getMirrors(scheduleId);
|
||||
};
|
||||
|
||||
const copyToMirrors = async (
|
||||
scheduleId: number,
|
||||
sourceRepository: { id: string; config: (typeof repositoriesTable.$inferSelect)["config"] },
|
||||
retentionPolicy: (typeof backupSchedulesTable.$inferSelect)["retentionPolicy"],
|
||||
) => {
|
||||
const mirrors = await db.query.backupScheduleMirrorsTable.findMany({
|
||||
where: eq(backupScheduleMirrorsTable.scheduleId, scheduleId),
|
||||
with: { repository: true },
|
||||
});
|
||||
|
||||
const enabledMirrors = mirrors.filter((m) => m.enabled);
|
||||
|
||||
if (enabledMirrors.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[Background] Copying snapshots to ${enabledMirrors.length} mirror repositories for schedule ${scheduleId}`,
|
||||
);
|
||||
|
||||
for (const mirror of enabledMirrors) {
|
||||
try {
|
||||
logger.info(`[Background] Copying to mirror repository: ${mirror.repository.name}`);
|
||||
|
||||
serverEvents.emit("mirror:started", {
|
||||
scheduleId,
|
||||
repositoryId: mirror.repositoryId,
|
||||
repositoryName: mirror.repository.name,
|
||||
});
|
||||
|
||||
const releaseSource = await repoMutex.acquireShared(sourceRepository.id, `mirror_source:${scheduleId}`);
|
||||
const releaseMirror = await repoMutex.acquireShared(mirror.repository.id, `mirror:${scheduleId}`);
|
||||
|
||||
try {
|
||||
await restic.copy(sourceRepository.config, mirror.repository.config, { tag: scheduleId.toString() });
|
||||
} finally {
|
||||
releaseSource();
|
||||
releaseMirror();
|
||||
}
|
||||
|
||||
if (retentionPolicy) {
|
||||
const releaseForget = await repoMutex.acquireExclusive(mirror.repository.id, `forget:mirror:${scheduleId}`);
|
||||
|
||||
try {
|
||||
logger.info(`[Background] Applying retention policy to mirror repository: ${mirror.repository.name}`);
|
||||
await restic.forget(mirror.repository.config, retentionPolicy, { tag: scheduleId.toString() });
|
||||
} finally {
|
||||
releaseForget();
|
||||
}
|
||||
}
|
||||
|
||||
await db
|
||||
.update(backupScheduleMirrorsTable)
|
||||
.set({ lastCopyAt: Date.now(), lastCopyStatus: "success", lastCopyError: null })
|
||||
.where(eq(backupScheduleMirrorsTable.id, mirror.id));
|
||||
|
||||
logger.info(`[Background] Successfully copied to mirror repository: ${mirror.repository.name}`);
|
||||
|
||||
serverEvents.emit("mirror:completed", {
|
||||
scheduleId,
|
||||
repositoryId: mirror.repositoryId,
|
||||
repositoryName: mirror.repository.name,
|
||||
status: "success",
|
||||
});
|
||||
} catch (error) {
|
||||
const errorMessage = toMessage(error);
|
||||
logger.error(`[Background] Failed to copy to mirror repository ${mirror.repository.name}: ${errorMessage}`);
|
||||
|
||||
await db
|
||||
.update(backupScheduleMirrorsTable)
|
||||
.set({ lastCopyAt: Date.now(), lastCopyStatus: "error", lastCopyError: errorMessage })
|
||||
.where(eq(backupScheduleMirrorsTable.id, mirror.id));
|
||||
|
||||
serverEvents.emit("mirror:completed", {
|
||||
scheduleId,
|
||||
repositoryId: mirror.repositoryId,
|
||||
repositoryName: mirror.repository.name,
|
||||
status: "error",
|
||||
error: errorMessage,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const getMirrorCompatibility = async (scheduleId: number) => {
|
||||
const schedule = await db.query.backupSchedulesTable.findFirst({
|
||||
where: eq(backupSchedulesTable.id, scheduleId),
|
||||
with: { repository: true },
|
||||
});
|
||||
|
||||
if (!schedule) {
|
||||
throw new NotFoundError("Backup schedule not found");
|
||||
}
|
||||
|
||||
const allRepositories = await db.query.repositoriesTable.findMany();
|
||||
const repos = allRepositories.filter((repo) => repo.id !== schedule.repositoryId);
|
||||
|
||||
const compatibility = await Promise.all(
|
||||
repos.map((repo) => checkMirrorCompatibility(schedule.repository.config, repo.config, repo.id)),
|
||||
);
|
||||
|
||||
return compatibility;
|
||||
};
|
||||
|
||||
export const backupsService = {
|
||||
listSchedules,
|
||||
getSchedule,
|
||||
@@ -432,4 +607,7 @@ export const backupsService = {
|
||||
getScheduleForVolume,
|
||||
stopBackup,
|
||||
runForget,
|
||||
getMirrors,
|
||||
updateMirrors,
|
||||
getMirrorCompatibility,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user