mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-04-19 22:37:14 -04:00
* feat(agent): add standalone agent runtime * fix(agent): add Bun and DOM types to agent tsconfig * refactor: wrap backup error in a tagged effect error * feat(controller): add agent manager and session handling * feat(backups): execute backups through the agent * fix(agent): harden disconnect and send-failure handling * fix: rebase conflicts * test: simplify mocks * refactor: split agent runtime state * fix(backup): keep old path when agent is disabled * fix: pr feedbacks
199 lines
4.8 KiB
TypeScript
199 lines
4.8 KiB
TypeScript
import { Effect, Exit, Scope } from "effect";
|
|
import { expect, test, vi } from "vitest";
|
|
import waitForExpect from "wait-for-expect";
|
|
import { fromPartial } from "@total-typescript/shoehorn";
|
|
import { createAgentMessage } from "@zerobyte/contracts/agent-protocol";
|
|
import { createControllerAgentSession } from "../controller/session";
|
|
|
|
const createSocket = (overrides: Partial<Parameters<typeof createControllerAgentSession>[0]> = {}) => {
|
|
return {
|
|
data: { id: "connection-1", agentId: "local", organizationId: null, agentName: "Local Agent" },
|
|
send: vi.fn(() => 1),
|
|
close: vi.fn(),
|
|
...overrides,
|
|
};
|
|
};
|
|
|
|
const createSession = (handlers: Parameters<typeof createControllerAgentSession>[1] = {}, socket = createSocket()) => {
|
|
const scope = Effect.runSync(Scope.make());
|
|
|
|
try {
|
|
const session = Effect.runSync(Scope.extend(createControllerAgentSession(fromPartial(socket), handlers), scope));
|
|
|
|
return {
|
|
session,
|
|
run: () => {
|
|
Effect.runFork(Scope.extend(session.run, scope));
|
|
},
|
|
socket,
|
|
close: () => {
|
|
Effect.runSync(Scope.close(scope, Exit.succeed(undefined)));
|
|
},
|
|
closeAsync: () => {
|
|
return Effect.runPromise(Scope.close(scope, Exit.succeed(undefined)));
|
|
},
|
|
};
|
|
} catch (error) {
|
|
Effect.runSync(Scope.close(scope, Exit.fail(error)));
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
test("close emits a synthetic backup.cancelled for a started backup", () => {
|
|
const onBackupCancelled = vi.fn();
|
|
const { session, close } = createSession({
|
|
onBackupCancelled,
|
|
});
|
|
|
|
Effect.runSync(
|
|
session.handleMessage(
|
|
createAgentMessage("backup.started", {
|
|
jobId: "job-1",
|
|
scheduleId: "schedule-1",
|
|
}),
|
|
),
|
|
);
|
|
|
|
close();
|
|
|
|
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
|
|
expect(onBackupCancelled).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
jobId: "job-1",
|
|
scheduleId: "schedule-1",
|
|
}),
|
|
);
|
|
});
|
|
|
|
test.each([
|
|
{
|
|
name: "backup.completed",
|
|
jobId: "job-1",
|
|
scheduleId: "schedule-1",
|
|
terminalMessage: createAgentMessage("backup.completed", {
|
|
jobId: "job-1",
|
|
scheduleId: "schedule-1",
|
|
exitCode: 0,
|
|
result: null,
|
|
}),
|
|
expectedCancelledCalls: 0,
|
|
},
|
|
{
|
|
name: "backup.failed",
|
|
jobId: "job-2",
|
|
scheduleId: "schedule-2",
|
|
terminalMessage: createAgentMessage("backup.failed", {
|
|
jobId: "job-2",
|
|
scheduleId: "schedule-2",
|
|
error: "backup failed",
|
|
}),
|
|
expectedCancelledCalls: 0,
|
|
},
|
|
{
|
|
name: "backup.cancelled",
|
|
jobId: "job-3",
|
|
scheduleId: "schedule-3",
|
|
terminalMessage: createAgentMessage("backup.cancelled", {
|
|
jobId: "job-3",
|
|
scheduleId: "schedule-3",
|
|
message: "Backup was cancelled",
|
|
}),
|
|
expectedCancelledCalls: 1,
|
|
},
|
|
])("close does not emit an extra synthetic backup.cancelled after $name", (testCase) => {
|
|
const onBackupCancelled = vi.fn();
|
|
const { session, close } = createSession({
|
|
onBackupCancelled,
|
|
});
|
|
|
|
Effect.runSync(
|
|
session.handleMessage(
|
|
createAgentMessage("backup.started", {
|
|
jobId: testCase.jobId,
|
|
scheduleId: testCase.scheduleId,
|
|
}),
|
|
),
|
|
);
|
|
Effect.runSync(session.handleMessage(testCase.terminalMessage));
|
|
close();
|
|
|
|
expect(onBackupCancelled).toHaveBeenCalledTimes(testCase.expectedCancelledCalls);
|
|
});
|
|
|
|
test("close emits a synthetic backup.cancelled for a queued backup", () => {
|
|
const onBackupCancelled = vi.fn();
|
|
const { session, close } = createSession({
|
|
onBackupCancelled,
|
|
});
|
|
|
|
Effect.runSync(
|
|
session.sendBackup({
|
|
jobId: "job-queued",
|
|
scheduleId: "schedule-queued",
|
|
organizationId: "org-1",
|
|
sourcePath: "/tmp/source",
|
|
repositoryConfig: {
|
|
backend: "local",
|
|
path: "/tmp/repository",
|
|
},
|
|
options: {},
|
|
runtime: {
|
|
password: "password",
|
|
cacheDir: "/tmp/cache",
|
|
passFile: "/tmp/pass",
|
|
defaultExcludes: [],
|
|
rcloneConfigFile: "/tmp/rclone.conf",
|
|
},
|
|
}),
|
|
);
|
|
|
|
close();
|
|
|
|
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
|
|
expect(onBackupCancelled).toHaveBeenLastCalledWith(
|
|
expect.objectContaining({
|
|
jobId: "job-queued",
|
|
scheduleId: "schedule-queued",
|
|
}),
|
|
);
|
|
});
|
|
|
|
test("a dropped backup.cancel closes the session and emits a synthetic backup.cancelled", async () => {
|
|
const send = vi.fn(() => 0);
|
|
const socket = createSocket({ send, close: vi.fn() });
|
|
const onBackupCancelled = vi.fn();
|
|
const { session, run, closeAsync } = createSession({ onBackupCancelled }, socket);
|
|
|
|
try {
|
|
run();
|
|
Effect.runSync(
|
|
session.handleMessage(
|
|
createAgentMessage("backup.started", {
|
|
jobId: "job-1",
|
|
scheduleId: "schedule-1",
|
|
}),
|
|
),
|
|
);
|
|
Effect.runSync(
|
|
session.sendBackupCancel({
|
|
jobId: "job-1",
|
|
scheduleId: "schedule-1",
|
|
}),
|
|
);
|
|
|
|
await waitForExpect(() => {
|
|
expect(send).toHaveBeenCalledTimes(1);
|
|
expect(socket.close).toHaveBeenCalledTimes(1);
|
|
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
|
|
expect(onBackupCancelled).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
jobId: "job-1",
|
|
scheduleId: "schedule-1",
|
|
}),
|
|
);
|
|
});
|
|
} finally {
|
|
await closeAsync();
|
|
}
|
|
});
|