Files
zerobyte/app/server/modules/agents/__tests__/session.test.ts
Nico 33601dde24 feat(agent): add standalone agent runtime (#776)
* feat(agent): add standalone agent runtime

* fix(agent): add Bun and DOM types to agent tsconfig

* refactor: wrap backup error in a tagged effect error

* feat(controller): add agent manager and session handling

* feat(backups): execute backups through the agent

* fix(agent): harden disconnect and send-failure handling

* fix: rebase conflicts

* test: simplify mocks

* refactor: split agent runtime state

* fix(backup): keep old path when agent is disabled

* fix: pr feedbacks
2026-04-13 23:29:10 +02:00

199 lines
4.8 KiB
TypeScript

import { Effect, Exit, Scope } from "effect";
import { expect, test, vi } from "vitest";
import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
import { createAgentMessage } from "@zerobyte/contracts/agent-protocol";
import { createControllerAgentSession } from "../controller/session";
const createSocket = (overrides: Partial<Parameters<typeof createControllerAgentSession>[0]> = {}) => {
return {
data: { id: "connection-1", agentId: "local", organizationId: null, agentName: "Local Agent" },
send: vi.fn(() => 1),
close: vi.fn(),
...overrides,
};
};
const createSession = (handlers: Parameters<typeof createControllerAgentSession>[1] = {}, socket = createSocket()) => {
const scope = Effect.runSync(Scope.make());
try {
const session = Effect.runSync(Scope.extend(createControllerAgentSession(fromPartial(socket), handlers), scope));
return {
session,
run: () => {
Effect.runFork(Scope.extend(session.run, scope));
},
socket,
close: () => {
Effect.runSync(Scope.close(scope, Exit.succeed(undefined)));
},
closeAsync: () => {
return Effect.runPromise(Scope.close(scope, Exit.succeed(undefined)));
},
};
} catch (error) {
Effect.runSync(Scope.close(scope, Exit.fail(error)));
throw error;
}
};
test("close emits a synthetic backup.cancelled for a started backup", () => {
const onBackupCancelled = vi.fn();
const { session, close } = createSession({
onBackupCancelled,
});
Effect.runSync(
session.handleMessage(
createAgentMessage("backup.started", {
jobId: "job-1",
scheduleId: "schedule-1",
}),
),
);
close();
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledWith(
expect.objectContaining({
jobId: "job-1",
scheduleId: "schedule-1",
}),
);
});
test.each([
{
name: "backup.completed",
jobId: "job-1",
scheduleId: "schedule-1",
terminalMessage: createAgentMessage("backup.completed", {
jobId: "job-1",
scheduleId: "schedule-1",
exitCode: 0,
result: null,
}),
expectedCancelledCalls: 0,
},
{
name: "backup.failed",
jobId: "job-2",
scheduleId: "schedule-2",
terminalMessage: createAgentMessage("backup.failed", {
jobId: "job-2",
scheduleId: "schedule-2",
error: "backup failed",
}),
expectedCancelledCalls: 0,
},
{
name: "backup.cancelled",
jobId: "job-3",
scheduleId: "schedule-3",
terminalMessage: createAgentMessage("backup.cancelled", {
jobId: "job-3",
scheduleId: "schedule-3",
message: "Backup was cancelled",
}),
expectedCancelledCalls: 1,
},
])("close does not emit an extra synthetic backup.cancelled after $name", (testCase) => {
const onBackupCancelled = vi.fn();
const { session, close } = createSession({
onBackupCancelled,
});
Effect.runSync(
session.handleMessage(
createAgentMessage("backup.started", {
jobId: testCase.jobId,
scheduleId: testCase.scheduleId,
}),
),
);
Effect.runSync(session.handleMessage(testCase.terminalMessage));
close();
expect(onBackupCancelled).toHaveBeenCalledTimes(testCase.expectedCancelledCalls);
});
test("close emits a synthetic backup.cancelled for a queued backup", () => {
const onBackupCancelled = vi.fn();
const { session, close } = createSession({
onBackupCancelled,
});
Effect.runSync(
session.sendBackup({
jobId: "job-queued",
scheduleId: "schedule-queued",
organizationId: "org-1",
sourcePath: "/tmp/source",
repositoryConfig: {
backend: "local",
path: "/tmp/repository",
},
options: {},
runtime: {
password: "password",
cacheDir: "/tmp/cache",
passFile: "/tmp/pass",
defaultExcludes: [],
rcloneConfigFile: "/tmp/rclone.conf",
},
}),
);
close();
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenLastCalledWith(
expect.objectContaining({
jobId: "job-queued",
scheduleId: "schedule-queued",
}),
);
});
test("a dropped backup.cancel closes the session and emits a synthetic backup.cancelled", async () => {
const send = vi.fn(() => 0);
const socket = createSocket({ send, close: vi.fn() });
const onBackupCancelled = vi.fn();
const { session, run, closeAsync } = createSession({ onBackupCancelled }, socket);
try {
run();
Effect.runSync(
session.handleMessage(
createAgentMessage("backup.started", {
jobId: "job-1",
scheduleId: "schedule-1",
}),
),
);
Effect.runSync(
session.sendBackupCancel({
jobId: "job-1",
scheduleId: "schedule-1",
}),
);
await waitForExpect(() => {
expect(send).toHaveBeenCalledTimes(1);
expect(socket.close).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledWith(
expect.objectContaining({
jobId: "job-1",
scheduleId: "schedule-1",
}),
);
});
} finally {
await closeAsync();
}
});