Compare commits

...

4 Commits

Author SHA1 Message Date
Nicolas Meienberger
7ff38f0128 refactor: run forget commands in the background 2025-12-01 20:22:43 +01:00
Nicolas Meienberger
33e6f3773b fix: only lock shared tasks if an exclusive is running 2025-12-01 20:07:30 +01:00
Nicolas Meienberger
a91dede086 docs: bump version in readme 2025-12-01 19:48:55 +01:00
Nico
9b46737852 refactor(repositories): add a locking mechanism for restic operations (#94)
* refactor(repositories): add a locking mechanism for restic operations

* fix: add missing lock in list repositories
2025-12-01 19:47:21 +01:00
15 changed files with 348 additions and 132 deletions

View File

@@ -40,7 +40,7 @@ In order to run Zerobyte, you need to have Docker and Docker Compose installed o
```yaml
services:
zerobyte:
image: ghcr.io/nicotsx/zerobyte:v0.13
image: ghcr.io/nicotsx/zerobyte:v0.15
container_name: zerobyte
restart: unless-stopped
cap_add:
@@ -78,7 +78,7 @@ If you want to track a local directory on the same server where Zerobyte is runn
```diff
services:
zerobyte:
image: ghcr.io/nicotsx/zerobyte:v0.13
image: ghcr.io/nicotsx/zerobyte:v0.15
container_name: zerobyte
restart: unless-stopped
cap_add:
@@ -146,7 +146,7 @@ Zerobyte can use [rclone](https://rclone.org/) to support 40+ cloud storage prov
```diff
services:
zerobyte:
image: ghcr.io/nicotsx/zerobyte:v0.13
image: ghcr.io/nicotsx/zerobyte:v0.15
container_name: zerobyte
restart: unless-stopped
cap_add:
@@ -205,7 +205,7 @@ In order to enable this feature, you need to change your bind mount `/var/lib/ze
```diff
services:
zerobyte:
image: ghcr.io/nicotsx/zerobyte:v0.13
image: ghcr.io/nicotsx/zerobyte:v0.15
container_name: zerobyte
restart: unless-stopped
ports:
@@ -236,7 +236,7 @@ In order to enable this feature, you need to run Zerobyte with several items sha
```diff
services:
zerobyte:
image: ghcr.io/nicotsx/zerobyte:v0.13
image: ghcr.io/nicotsx/zerobyte:v0.15
container_name: zerobyte
restart: unless-stopped
cap_add:

View File

@@ -70,8 +70,6 @@ export default function ScheduleDetailsPage({ params, loaderData }: Route.Compon
const { data: schedule } = useQuery({
...getBackupScheduleOptions({ path: { scheduleId: params.id } }),
initialData: loaderData.schedule,
refetchInterval: 10000,
refetchOnWindowFocus: true,
});
const {

View File

@@ -33,8 +33,6 @@ export default function Backups({ loaderData }: Route.ComponentProps) {
const { data: schedules, isLoading } = useQuery({
...listBackupSchedulesOptions(),
initialData: loaderData,
refetchInterval: 10000,
refetchOnWindowFocus: true,
});
if (isLoading) {

View File

@@ -49,8 +49,6 @@ export default function Notifications({ loaderData }: Route.ComponentProps) {
const { data } = useQuery({
...listNotificationDestinationsOptions(),
initialData: loaderData,
refetchInterval: 10000,
refetchOnWindowFocus: true,
});
const filteredNotifications =

View File

@@ -50,8 +50,6 @@ export default function Repositories({ loaderData }: Route.ComponentProps) {
const { data } = useQuery({
...listRepositoriesOptions(),
initialData: loaderData,
refetchInterval: 10000,
refetchOnWindowFocus: true,
});
const filteredRepositories =

View File

@@ -64,8 +64,6 @@ export default function RepositoryDetailsPage({ loaderData }: Route.ComponentPro
const { data } = useQuery({
...getRepositoryOptions({ path: { name: loaderData.name } }),
initialData: loaderData,
refetchInterval: 10000,
refetchOnWindowFocus: true,
});
useEffect(() => {

View File

@@ -18,8 +18,6 @@ export const RepositorySnapshotsTabContent = ({ repository }: Props) => {
const { data, isFetching, failureReason } = useQuery({
...listSnapshotsOptions({ path: { name: repository.name } }),
refetchInterval: 10000,
refetchOnWindowFocus: true,
initialData: [],
});

View File

@@ -71,8 +71,6 @@ export default function VolumeDetails({ loaderData }: Route.ComponentProps) {
const { data } = useQuery({
...getVolumeOptions({ path: { name: name ?? "" } }),
initialData: loaderData,
refetchInterval: 10000,
refetchOnWindowFocus: true,
});
const { capabilities } = useSystemInfo();

View File

@@ -61,8 +61,6 @@ export default function Volumes({ loaderData }: Route.ComponentProps) {
const { data } = useQuery({
...listVolumesOptions(),
initialData: loaderData,
refetchInterval: 10000,
refetchOnWindowFocus: true,
});
const filteredVolumes =

View File

@@ -0,0 +1,180 @@
import { logger } from "../utils/logger";
export type LockType = "shared" | "exclusive";
interface LockHolder {
id: string;
operation: string;
acquiredAt: number;
}
interface RepositoryLockState {
sharedHolders: Map<string, LockHolder>;
exclusiveHolder: LockHolder | null;
waitQueue: Array<{
type: LockType;
operation: string;
resolve: (lockId: string) => void;
}>;
}
class RepositoryMutex {
private locks = new Map<string, RepositoryLockState>();
private lockIdCounter = 0;
private getOrCreateState(repositoryId: string): RepositoryLockState {
let state = this.locks.get(repositoryId);
if (!state) {
state = {
sharedHolders: new Map(),
exclusiveHolder: null,
waitQueue: [],
};
this.locks.set(repositoryId, state);
}
return state;
}
private generateLockId(): string {
return `lock_${++this.lockIdCounter}_${Date.now()}`;
}
private cleanupStateIfEmpty(repositoryId: string): void {
const state = this.locks.get(repositoryId);
if (state && state.sharedHolders.size === 0 && !state.exclusiveHolder && state.waitQueue.length === 0) {
this.locks.delete(repositoryId);
}
}
async acquireShared(repositoryId: string, operation: string): Promise<() => void> {
const state = this.getOrCreateState(repositoryId);
if (!state.exclusiveHolder) {
const lockId = this.generateLockId();
state.sharedHolders.set(lockId, {
id: lockId,
operation,
acquiredAt: Date.now(),
});
return () => this.releaseShared(repositoryId, lockId);
}
logger.debug(
`[Mutex] Waiting for shared lock on repo ${repositoryId}: ${operation} (exclusive held by: ${state.exclusiveHolder.operation})`,
);
const lockId = await new Promise<string>((resolve) => {
state.waitQueue.push({ type: "shared", operation, resolve });
});
return () => this.releaseShared(repositoryId, lockId);
}
async acquireExclusive(repositoryId: string, operation: string): Promise<() => void> {
const state = this.getOrCreateState(repositoryId);
if (!state.exclusiveHolder && state.sharedHolders.size === 0 && state.waitQueue.length === 0) {
const lockId = this.generateLockId();
state.exclusiveHolder = {
id: lockId,
operation,
acquiredAt: Date.now(),
};
return () => this.releaseExclusive(repositoryId, lockId);
}
logger.debug(
`[Mutex] Waiting for exclusive lock on repo ${repositoryId}: ${operation} (shared: ${state.sharedHolders.size}, exclusive: ${state.exclusiveHolder ? "yes" : "no"}, queue: ${state.waitQueue.length})`,
);
const lockId = await new Promise<string>((resolve) => {
state.waitQueue.push({ type: "exclusive", operation, resolve });
});
logger.debug(`[Mutex] Acquired exclusive lock for repo ${repositoryId}: ${operation} (${lockId})`);
return () => this.releaseExclusive(repositoryId, lockId);
}
private releaseShared(repositoryId: string, lockId: string): void {
const state = this.locks.get(repositoryId);
if (!state) {
return;
}
const holder = state.sharedHolders.get(lockId);
if (!holder) {
return;
}
state.sharedHolders.delete(lockId);
const duration = Date.now() - holder.acquiredAt;
logger.debug(`[Mutex] Released shared lock for repo ${repositoryId}: ${holder.operation} (held for ${duration}ms)`);
this.processWaitQueue(repositoryId);
this.cleanupStateIfEmpty(repositoryId);
}
private releaseExclusive(repositoryId: string, lockId: string): void {
const state = this.locks.get(repositoryId);
if (!state) {
return;
}
if (!state.exclusiveHolder || state.exclusiveHolder.id !== lockId) {
return;
}
const duration = Date.now() - state.exclusiveHolder.acquiredAt;
logger.debug(
`[Mutex] Released exclusive lock for repo ${repositoryId}: ${state.exclusiveHolder.operation} (held for ${duration}ms)`,
);
state.exclusiveHolder = null;
this.processWaitQueue(repositoryId);
this.cleanupStateIfEmpty(repositoryId);
}
private processWaitQueue(repositoryId: string): void {
const state = this.locks.get(repositoryId);
if (!state || state.waitQueue.length === 0) {
return;
}
if (state.exclusiveHolder) {
return;
}
const firstWaiter = state.waitQueue[0];
if (firstWaiter.type === "exclusive") {
if (state.sharedHolders.size === 0) {
state.waitQueue.shift();
const lockId = this.generateLockId();
state.exclusiveHolder = {
id: lockId,
operation: firstWaiter.operation,
acquiredAt: Date.now(),
};
firstWaiter.resolve(lockId);
}
} else {
while (state.waitQueue.length > 0 && state.waitQueue[0].type === "shared") {
const waiter = state.waitQueue.shift();
if (!waiter) break;
const lockId = this.generateLockId();
state.sharedHolders.set(lockId, {
id: lockId,
operation: waiter.operation,
acquiredAt: Date.now(),
});
waiter.resolve(lockId);
}
}
}
isLocked(repositoryId: string): boolean {
const state = this.locks.get(repositoryId);
if (!state) return false;
return state.exclusiveHolder !== null || state.sharedHolders.size > 0;
}
}
export const repoMutex = new RepositoryMutex();

View File

@@ -4,6 +4,7 @@ import { logger } from "../utils/logger";
import { db } from "../db/db";
import { eq, or } from "drizzle-orm";
import { repositoriesTable } from "../db/schema";
import { repoMutex } from "../core/repository-mutex";
export class RepositoryHealthCheckJob extends Job {
async run() {
@@ -14,6 +15,11 @@ export class RepositoryHealthCheckJob extends Job {
});
for (const repository of repositories) {
if (repoMutex.isLocked(repository.id)) {
logger.debug(`Skipping health check for repository ${repository.name}: currently locked`);
continue;
}
try {
await repositoriesService.checkHealth(repository.id);
} catch (error) {

View File

@@ -11,6 +11,7 @@ import type { CreateBackupScheduleBody, UpdateBackupScheduleBody } from "./backu
import { toMessage } from "../../utils/errors";
import { serverEvents } from "../../core/events";
import { notificationsService } from "../notifications/notifications.service";
import { repoMutex } from "../../core/repository-mutex";
const runningBackups = new Map<number, AbortController>();
@@ -241,21 +242,28 @@ const executeBackup = async (scheduleId: number, manual = false) => {
backupOptions.include = schedule.includePatterns;
}
const { exitCode } = await restic.backup(repository.config, volumePath, {
...backupOptions,
compressionMode: repository.compressionMode ?? "auto",
onProgress: (progress) => {
serverEvents.emit("backup:progress", {
scheduleId,
volumeName: volume.name,
repositoryName: repository.name,
...progress,
});
},
});
const releaseBackupLock = await repoMutex.acquireShared(repository.id, `backup:${volume.name}`);
let exitCode: number;
try {
const result = await restic.backup(repository.config, volumePath, {
...backupOptions,
compressionMode: repository.compressionMode ?? "auto",
onProgress: (progress) => {
serverEvents.emit("backup:progress", {
scheduleId,
volumeName: volume.name,
repositoryName: repository.name,
...progress,
});
},
});
exitCode = result.exitCode;
} finally {
releaseBackupLock();
}
if (schedule.retentionPolicy) {
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.id.toString() });
void runForget(schedule.id);
}
const nextBackupAt = calculateNextRun(schedule.cronExpression);
@@ -402,8 +410,14 @@ const runForget = async (scheduleId: number) => {
throw new NotFoundError("Repository not found");
}
logger.info(`Manually running retention policy (forget) for schedule ${scheduleId}`);
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.id.toString() });
logger.info(`running retention policy (forget) for schedule ${scheduleId}`);
const releaseLock = await repoMutex.acquireExclusive(repository.id, `forget:manual:${scheduleId}`);
try {
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.id.toString() });
} finally {
releaseLock();
}
logger.info(`Retention policy applied successfully for schedule ${scheduleId}`);
};

View File

@@ -34,7 +34,7 @@ export const startup = async () => {
Scheduler.build(CleanupDanglingMountsJob).schedule("0 * * * *");
Scheduler.build(VolumeHealthCheckJob).schedule("*/30 * * * *");
Scheduler.build(RepositoryHealthCheckJob).schedule("0 12 * * *");
Scheduler.build(RepositoryHealthCheckJob).schedule("50 12 * * *");
Scheduler.build(BackupExecutionJob).schedule("* * * * *");
Scheduler.build(CleanupSessionsJob).schedule("0 0 * * *");
};

View File

@@ -8,6 +8,7 @@ import { toMessage } from "../../utils/errors";
import { generateShortId } from "../../utils/id";
import { restic } from "../../utils/restic";
import { cryptoUtils } from "../../utils/crypto";
import { repoMutex } from "../../core/repository-mutex";
import type { CompressionMode, OverwriteMode, RepositoryConfig } from "~/schemas/restic";
const listRepositories = async () => {
@@ -160,15 +161,20 @@ const listSnapshots = async (name: string, backupId?: string) => {
throw new NotFoundError("Repository not found");
}
let snapshots = [];
const releaseLock = await repoMutex.acquireShared(repository.id, "snapshots");
try {
let snapshots = [];
if (backupId) {
snapshots = await restic.snapshots(repository.config, { tags: [backupId.toString()] });
} else {
snapshots = await restic.snapshots(repository.config);
if (backupId) {
snapshots = await restic.snapshots(repository.config, { tags: [backupId.toString()] });
} else {
snapshots = await restic.snapshots(repository.config);
}
return snapshots;
} finally {
releaseLock();
}
return snapshots;
};
const listSnapshotFiles = async (name: string, snapshotId: string, path?: string) => {
@@ -180,22 +186,27 @@ const listSnapshotFiles = async (name: string, snapshotId: string, path?: string
throw new NotFoundError("Repository not found");
}
const result = await restic.ls(repository.config, snapshotId, path);
const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`);
try {
const result = await restic.ls(repository.config, snapshotId, path);
if (!result.snapshot) {
throw new NotFoundError("Snapshot not found or empty");
if (!result.snapshot) {
throw new NotFoundError("Snapshot not found or empty");
}
return {
snapshot: {
id: result.snapshot.id,
short_id: result.snapshot.short_id,
time: result.snapshot.time,
hostname: result.snapshot.hostname,
paths: result.snapshot.paths,
},
files: result.nodes,
};
} finally {
releaseLock();
}
return {
snapshot: {
id: result.snapshot.id,
short_id: result.snapshot.short_id,
time: result.snapshot.time,
hostname: result.snapshot.hostname,
paths: result.snapshot.paths,
},
files: result.nodes,
};
};
const restoreSnapshot = async (
@@ -220,14 +231,19 @@ const restoreSnapshot = async (
const target = options?.targetPath || "/";
const result = await restic.restore(repository.config, snapshotId, target, options);
const releaseLock = await repoMutex.acquireShared(repository.id, `restore:${snapshotId}`);
try {
const result = await restic.restore(repository.config, snapshotId, target, options);
return {
success: true,
message: "Snapshot restored successfully",
filesRestored: result.files_restored,
filesSkipped: result.files_skipped,
};
return {
success: true,
message: "Snapshot restored successfully",
filesRestored: result.files_restored,
filesSkipped: result.files_skipped,
};
} finally {
releaseLock();
}
};
const getSnapshotDetails = async (name: string, snapshotId: string) => {
@@ -239,14 +255,19 @@ const getSnapshotDetails = async (name: string, snapshotId: string) => {
throw new NotFoundError("Repository not found");
}
const snapshots = await restic.snapshots(repository.config);
const snapshot = snapshots.find((snap) => snap.id === snapshotId || snap.short_id === snapshotId);
const releaseLock = await repoMutex.acquireShared(repository.id, `snapshot_details:${snapshotId}`);
try {
const snapshots = await restic.snapshots(repository.config);
const snapshot = snapshots.find((snap) => snap.id === snapshotId || snap.short_id === snapshotId);
if (!snapshot) {
throw new NotFoundError("Snapshot not found");
if (!snapshot) {
throw new NotFoundError("Snapshot not found");
}
return snapshot;
} finally {
releaseLock();
}
return snapshot;
};
const checkHealth = async (repositoryId: string) => {
@@ -258,18 +279,23 @@ const checkHealth = async (repositoryId: string) => {
throw new NotFoundError("Repository not found");
}
const { hasErrors, error } = await restic.check(repository.config);
const releaseLock = await repoMutex.acquireExclusive(repository.id, "check");
try {
const { hasErrors, error } = await restic.check(repository.config);
await db
.update(repositoriesTable)
.set({
status: hasErrors ? "error" : "healthy",
lastChecked: Date.now(),
lastError: error,
})
.where(eq(repositoriesTable.id, repository.id));
await db
.update(repositoriesTable)
.set({
status: hasErrors ? "error" : "healthy",
lastChecked: Date.now(),
lastError: error,
})
.where(eq(repositoriesTable.id, repository.id));
return { lastError: error };
return { lastError: error };
} finally {
releaseLock();
}
};
const doctorRepository = async (name: string) => {
@@ -295,48 +321,51 @@ const doctorRepository = async (name: string) => {
error: unlockResult.error,
});
const checkResult = await restic.check(repository.config, { readData: false }).then(
(result) => result,
(error) => ({ success: false, output: null, error: toMessage(error), hasErrors: true }),
);
steps.push({
step: "check",
success: checkResult.success,
output: checkResult.output,
error: checkResult.error,
});
if (checkResult.hasErrors) {
const repairResult = await restic.repairIndex(repository.config).then(
(result) => ({ success: true, output: result.output, error: null }),
(error) => ({ success: false, output: null, error: toMessage(error) }),
);
steps.push({
step: "repair_index",
success: repairResult.success,
output: repairResult.output,
error: repairResult.error,
});
const recheckResult = await restic.check(repository.config, { readData: false }).then(
const releaseLock = await repoMutex.acquireExclusive(repository.id, "doctor");
try {
const checkResult = await restic.check(repository.config, { readData: false }).then(
(result) => result,
(error) => ({ success: false, output: null, error: toMessage(error), hasErrors: true }),
);
steps.push({
step: "recheck",
success: recheckResult.success,
output: recheckResult.output,
error: recheckResult.error,
step: "check",
success: checkResult.success,
output: checkResult.output,
error: checkResult.error,
});
if (checkResult.hasErrors) {
const repairResult = await restic.repairIndex(repository.config).then(
(result) => ({ success: true, output: result.output, error: null }),
(error) => ({ success: false, output: null, error: toMessage(error) }),
);
steps.push({
step: "repair_index",
success: repairResult.success,
output: repairResult.output,
error: repairResult.error,
});
const recheckResult = await restic.check(repository.config, { readData: false }).then(
(result) => result,
(error) => ({ success: false, output: null, error: toMessage(error), hasErrors: true }),
);
steps.push({
step: "recheck",
success: recheckResult.success,
output: recheckResult.output,
error: recheckResult.error,
});
}
} finally {
releaseLock();
}
const allSuccessful = steps.every((s) => s.success);
console.log("Doctor steps:", steps);
await db
.update(repositoriesTable)
.set({
@@ -361,7 +390,12 @@ const deleteSnapshot = async (name: string, snapshotId: string) => {
throw new NotFoundError("Repository not found");
}
await restic.deleteSnapshot(repository.config, snapshotId);
const releaseLock = await repoMutex.acquireExclusive(repository.id, `delete:${snapshotId}`);
try {
await restic.deleteSnapshot(repository.config, snapshotId);
} finally {
releaseLock();
}
};
const updateRepository = async (name: string, updates: { name?: string; compressionMode?: CompressionMode }) => {

View File

@@ -200,8 +200,8 @@ const init = async (config: RepositoryConfig) => {
const env = await buildEnv(config);
const args = ["init", "--repo", repoUrl, "--json"];
addRepoSpecificArgs(args, config, env);
const args = ["init", "--repo", repoUrl];
addCommonArgs(args, config, env);
const res = await $`restic ${args}`.env(env).nothrow();
await cleanupTemporaryKeys(config, env);
@@ -277,8 +277,7 @@ const backup = async (
}
}
addRepoSpecificArgs(args, config, env);
args.push("--json");
addCommonArgs(args, config, env);
const logData = throttle((data: string) => {
logger.info(data.trim());
@@ -404,8 +403,7 @@ const restore = async (
}
}
addRepoSpecificArgs(args, config, env);
args.push("--json");
addCommonArgs(args, config, env);
logger.debug(`Executing: restic ${args.join(" ")}`);
const res = await $`restic ${args}`.env(env).nothrow();
@@ -468,8 +466,7 @@ const snapshots = async (config: RepositoryConfig, options: { tags?: string[] }
}
}
addRepoSpecificArgs(args, config, env);
args.push("--json");
addCommonArgs(args, config, env);
const res = await $`restic ${args}`.env(env).nothrow().quiet();
await cleanupTemporaryKeys(config, env);
@@ -518,8 +515,7 @@ const forget = async (config: RepositoryConfig, options: RetentionPolicy, extra:
}
args.push("--prune");
addRepoSpecificArgs(args, config, env);
args.push("--json");
addCommonArgs(args, config, env);
const res = await $`restic ${args}`.env(env).nothrow();
await cleanupTemporaryKeys(config, env);
@@ -537,7 +533,7 @@ const deleteSnapshot = async (config: RepositoryConfig, snapshotId: string) => {
const env = await buildEnv(config);
const args: string[] = ["--repo", repoUrl, "forget", snapshotId, "--prune"];
addRepoSpecificArgs(args, config, env);
addCommonArgs(args, config, env);
const res = await $`restic ${args}`.env(env).nothrow();
await cleanupTemporaryKeys(config, env);
@@ -581,13 +577,13 @@ const ls = async (config: RepositoryConfig, snapshotId: string, path?: string) =
const repoUrl = buildRepoUrl(config);
const env = await buildEnv(config);
const args: string[] = ["--repo", repoUrl, "ls", snapshotId, "--json", "--long"];
const args: string[] = ["--repo", repoUrl, "ls", snapshotId, "--long"];
if (path) {
args.push(path);
}
addRepoSpecificArgs(args, config, env);
addCommonArgs(args, config, env);
const res = await safeSpawn({ command: "restic", args, env });
await cleanupTemporaryKeys(config, env);
@@ -637,8 +633,8 @@ const unlock = async (config: RepositoryConfig) => {
const repoUrl = buildRepoUrl(config);
const env = await buildEnv(config);
const args = ["unlock", "--repo", repoUrl, "--remove-all", "--json"];
addRepoSpecificArgs(args, config, env);
const args = ["unlock", "--repo", repoUrl, "--remove-all"];
addCommonArgs(args, config, env);
const res = await $`restic ${args}`.env(env).nothrow();
await cleanupTemporaryKeys(config, env);
@@ -662,7 +658,7 @@ const check = async (config: RepositoryConfig, options?: { readData?: boolean })
args.push("--read-data");
}
addRepoSpecificArgs(args, config, env);
addCommonArgs(args, config, env);
const res = await $`restic ${args}`.env(env).nothrow();
await cleanupTemporaryKeys(config, env);
@@ -696,7 +692,7 @@ const repairIndex = async (config: RepositoryConfig) => {
const env = await buildEnv(config);
const args = ["repair", "index", "--repo", repoUrl];
addRepoSpecificArgs(args, config, env);
addCommonArgs(args, config, env);
const res = await $`restic ${args}`.env(env).nothrow();
await cleanupTemporaryKeys(config, env);
@@ -717,7 +713,9 @@ const repairIndex = async (config: RepositoryConfig) => {
};
};
const addRepoSpecificArgs = (args: string[], config: RepositoryConfig, env: Record<string, string>) => {
const addCommonArgs = (args: string[], config: RepositoryConfig, env: Record<string, string>) => {
args.push("--retry-lock", "1m", "--json");
if (config.backend === "sftp" && env._SFTP_SSH_ARGS) {
args.push("-o", `sftp.args=${env._SFTP_SSH_ARGS}`);
}