feat(agents): create agent registry and service (#849)

* feat(agents): create agent registry and service

* fix: mark agent offline only if the session was removed properly

* refactor: centralize agent backup lifecycle state

* refactor: simplify session management

* refactor: move effect / async boundary in one place

* chore: regen migration

* refactor: improve error handling

* chore: pr feedback
This commit is contained in:
Nico
2026-05-05 19:34:10 +02:00
committed by GitHub
parent e981211a2d
commit e65a135676
22 changed files with 3780 additions and 464 deletions

View File

@@ -0,0 +1,16 @@
CREATE TABLE `agents_table` (
`id` text PRIMARY KEY,
`organization_id` text,
`name` text NOT NULL,
`kind` text NOT NULL,
`status` text DEFAULT 'offline' NOT NULL,
`capabilities` text DEFAULT '{}' NOT NULL,
`last_seen_at` integer,
`last_ready_at` integer,
`created_at` integer DEFAULT (unixepoch() * 1000) NOT NULL,
`updated_at` integer DEFAULT (unixepoch() * 1000) NOT NULL,
CONSTRAINT `fk_agents_table_organization_id_organization_id_fk` FOREIGN KEY (`organization_id`) REFERENCES `organization`(`id`) ON DELETE CASCADE
);
--> statement-breakpoint
CREATE INDEX `agents_table_organization_id_idx` ON `agents_table` (`organization_id`);--> statement-breakpoint
CREATE INDEX `agents_table_status_idx` ON `agents_table` (`status`);

View File

File diff suppressed because it is too large Load Diff

View File

@@ -99,6 +99,7 @@ export const relations = defineRelations(schema, (r) => ({
users: r.many.usersTable({
alias: "usersTable_id_organization_id_via_member",
}),
agents: r.many.agentsTable(),
backupSchedules: r.many.backupSchedulesTable(),
notificationDestinations: r.many.notificationDestinationsTable(),
repositories: r.many.repositoriesTable(),
@@ -119,6 +120,13 @@ export const relations = defineRelations(schema, (r) => ({
optional: false,
}),
},
agentsTable: {
organization: r.one.organization({
from: r.agentsTable.organizationId,
to: r.organization.id,
optional: true,
}),
},
volumesTable: {
backupSchedules: r.many.backupSchedulesTable(),
organization: r.one.organization({

View File

@@ -198,6 +198,36 @@ export const ssoProvider = sqliteTable("sso_provider", {
.default(sql`(unixepoch() * 1000)`),
});
export type AgentKind = "local" | "remote";
export type AgentStatus = "offline" | "connecting" | "online" | "degraded";
export type AgentCapabilities = Record<string, unknown>;
export const agentsTable = sqliteTable(
"agents_table",
{
id: text("id").primaryKey(),
organizationId: text("organization_id").references(() => organization.id, { onDelete: "cascade" }),
name: text("name").notNull(),
kind: text("kind").$type<AgentKind>().notNull(),
status: text("status").$type<AgentStatus>().notNull().default("offline"),
capabilities: text("capabilities", { mode: "json" }).$type<AgentCapabilities>().notNull().default({}),
lastSeenAt: int("last_seen_at", { mode: "number" }),
lastReadyAt: int("last_ready_at", { mode: "number" }),
createdAt: int("created_at", { mode: "number" })
.notNull()
.default(sql`(unixepoch() * 1000)`),
updatedAt: int("updated_at", { mode: "number" })
.notNull()
.$onUpdate(() => Date.now())
.default(sql`(unixepoch() * 1000)`),
},
(table) => [
index("agents_table_organization_id_idx").on(table.organizationId),
index("agents_table_status_idx").on(table.status),
],
);
export type Agent = typeof agentsTable.$inferSelect;
/**
* Volumes Table
*/

View File

@@ -1,6 +1,7 @@
import { afterEach, expect, test, vi } from "vitest";
import waitForExpect from "wait-for-expect";
import { fromAny, fromPartial } from "@total-typescript/shoehorn";
import { Effect } from "effect";
import { agentManager, type ProcessWithAgentRuntime } from "../agents-manager";
import type { AgentManagerRuntime } from "../controller/server";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
@@ -20,8 +21,8 @@ afterEach(() => {
});
test("cancelBackup resolves a running backup when the cancel command cannot be delivered", async () => {
const sendBackup = vi.fn().mockResolvedValue(true);
const cancelBackup = vi.fn().mockResolvedValue(false);
const sendBackup = vi.fn(() => Effect.succeed(true));
const cancelBackup = vi.fn(() => Effect.succeed(false));
setAgentRuntime({ sendBackup, cancelBackup });
const resultPromise = agentManager.runBackup("local", {

View File

@@ -0,0 +1,235 @@
import { afterEach, expect, test, vi } from "vitest";
import { Effect } from "effect";
import { fromAny, fromPartial } from "@total-typescript/shoehorn";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import type { AgentManagerEvent } from "../controller/server";
import type { ProcessWithAgentRuntime } from "../agents-manager";
const controllerMock = vi.hoisted(() => ({
onEvent: null as null | ((event: AgentManagerEvent) => void),
sendBackup: vi.fn(),
cancelBackup: vi.fn(),
stop: vi.fn(),
}));
vi.mock("../controller/server", async () => {
const { Effect } = await import("effect");
return {
createAgentManagerRuntime: vi.fn((onEvent: (event: AgentManagerEvent) => void) => {
controllerMock.onEvent = onEvent;
return {
start: Effect.void,
stop: Effect.sync(controllerMock.stop),
sendBackup: controllerMock.sendBackup,
cancelBackup: controllerMock.cancelBackup,
};
}),
};
});
const processWithAgentRuntime = process as ProcessWithAgentRuntime;
const resetAgentRuntime = () => {
processWithAgentRuntime.__zerobyteAgentRuntime = {
agentManager: null,
localAgent: null,
isStoppingLocalAgent: false,
localAgentRestartTimeout: null,
activeBackupsByScheduleId: new Map(),
activeBackupScheduleIdsByJobId: new Map(),
};
};
const backupPayload = fromPartial<BackupRunPayload>({
jobId: "job-1",
scheduleId: "schedule-1",
});
afterEach(() => {
delete processWithAgentRuntime.__zerobyteAgentRuntime;
controllerMock.onEvent = null;
controllerMock.sendBackup.mockReset();
controllerMock.cancelBackup.mockReset();
controllerMock.stop.mockReset();
vi.resetModules();
vi.restoreAllMocks();
});
test("backup progress is delivered to the running backup callback", async () => {
resetAgentRuntime();
controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true));
const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager");
const onProgress = vi.fn();
await startAgentController();
const resultPromise = agentManager.runBackup("local", {
scheduleId: 42,
payload: backupPayload,
signal: new AbortController().signal,
onProgress,
});
controllerMock.onEvent?.({
type: "backup.progress",
agentId: "local",
agentName: "Local Agent",
payload: fromAny({ jobId: "job-1", scheduleId: "schedule-1", progress: { percentDone: 0.5 } }),
});
controllerMock.onEvent?.({
type: "backup.completed",
agentId: "local",
agentName: "Local Agent",
payload: { jobId: "job-1", scheduleId: "schedule-1", exitCode: 0, result: null },
});
await expect(resultPromise).resolves.toEqual({
status: "completed",
exitCode: 0,
result: null,
warningDetails: null,
});
expect(onProgress).toHaveBeenCalledWith({ percentDone: 0.5 });
await stopAgentController();
});
test("backup failed and cancelled events resolve the matching running backup", async () => {
resetAgentRuntime();
controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true));
const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager");
await startAgentController();
const failedPromise = agentManager.runBackup("local", {
scheduleId: 42,
payload: backupPayload,
signal: new AbortController().signal,
onProgress: vi.fn(),
});
controllerMock.onEvent?.({
type: "backup.failed",
agentId: "local",
agentName: "Local Agent",
payload: { jobId: "job-1", scheduleId: "schedule-1", error: "failed", errorDetails: "restic failed" },
});
await expect(failedPromise).resolves.toEqual({ status: "failed", error: "restic failed" });
const cancelledPromise = agentManager.runBackup("local", {
scheduleId: 43,
payload: fromPartial<BackupRunPayload>({ jobId: "job-2", scheduleId: "schedule-2" }),
signal: new AbortController().signal,
onProgress: vi.fn(),
});
controllerMock.onEvent?.({
type: "backup.cancelled",
agentId: "local",
agentName: "Local Agent",
payload: { jobId: "job-2", scheduleId: "schedule-2", message: "cancelled remotely" },
});
await expect(cancelledPromise).resolves.toEqual({ status: "cancelled", message: "cancelled remotely" });
await stopAgentController();
});
test("agent disconnect cancels only backups owned by that agent", async () => {
resetAgentRuntime();
controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true));
const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager");
await startAgentController();
const localPromise = agentManager.runBackup("local", {
scheduleId: 42,
payload: backupPayload,
signal: new AbortController().signal,
onProgress: vi.fn(),
});
const remotePromise = agentManager.runBackup("remote", {
scheduleId: 43,
payload: fromPartial<BackupRunPayload>({ jobId: "job-2", scheduleId: "schedule-2" }),
signal: new AbortController().signal,
onProgress: vi.fn(),
});
controllerMock.onEvent?.({ type: "agent.disconnected", agentId: "local", agentName: "Local Agent" });
controllerMock.onEvent?.({
type: "backup.completed",
agentId: "remote",
agentName: "Remote Agent",
payload: { jobId: "job-2", scheduleId: "schedule-2", exitCode: 0, result: null },
});
await expect(localPromise).resolves.toEqual({
status: "cancelled",
message: "The connection to the backup agent was lost. Restart the backup to ensure it completes.",
});
await expect(remotePromise).resolves.toEqual({
status: "completed",
exitCode: 0,
result: null,
warningDetails: null,
});
await stopAgentController();
});
test("runBackup returns unavailable and clears the active run when the command cannot be sent", async () => {
resetAgentRuntime();
controllerMock.sendBackup.mockImplementation(() => Effect.succeed(false));
const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager");
await startAgentController();
const result = await agentManager.runBackup("local", {
scheduleId: 42,
payload: backupPayload,
signal: new AbortController().signal,
onProgress: vi.fn(),
});
expect(result).toEqual({
status: "unavailable",
error: new Error("Failed to send backup command to agent local"),
});
await expect(agentManager.cancelBackup("local", 42)).resolves.toBe(false);
await stopAgentController();
});
test("runBackup rejects before sending when the abort signal is already aborted", async () => {
resetAgentRuntime();
controllerMock.sendBackup.mockImplementation(() => Effect.succeed(true));
const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager");
const abortController = new AbortController();
abortController.abort(new Error("cancelled before send"));
await startAgentController();
await expect(
agentManager.runBackup("local", {
scheduleId: 42,
payload: backupPayload,
signal: abortController.signal,
onProgress: vi.fn(),
}),
).rejects.toThrow("cancelled before send");
expect(controllerMock.sendBackup).not.toHaveBeenCalled();
await stopAgentController();
});
test("runBackup requests cancellation when the abort signal fires while sending", async () => {
resetAgentRuntime();
const abortController = new AbortController();
controllerMock.sendBackup.mockImplementation(() =>
Effect.sync(() => {
abortController.abort();
return true;
}),
);
controllerMock.cancelBackup.mockImplementation(() => Effect.succeed(false));
const { agentManager, startAgentController, stopAgentController } = await import("../agents-manager");
await startAgentController();
const result = await agentManager.runBackup("local", {
scheduleId: 42,
payload: backupPayload,
signal: abortController.signal,
onProgress: vi.fn(),
});
expect(result).toEqual({ status: "cancelled" });
expect(controllerMock.cancelBackup).toHaveBeenCalledWith("local", { jobId: "job-1", scheduleId: "schedule-1" });
await stopAgentController();
});

View File

@@ -0,0 +1,72 @@
import { beforeEach, expect, test } from "vitest";
import { db } from "~/server/db/db";
import { agentsTable } from "~/server/db/schema";
import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants";
import { agentsService } from "../agents.service";
beforeEach(async () => {
await db.delete(agentsTable);
});
test("ensureLocalAgent seeds the built-in local agent once", async () => {
await agentsService.ensureLocalAgent();
await agentsService.ensureLocalAgent();
const agents = await agentsService.listAgents();
expect(agents).toHaveLength(1);
});
test("markAgentConnecting creates and updates connection metadata", async () => {
await agentsService.markAgentConnecting({
agentId: "remote-agent",
organizationId: null,
agentName: "Remote Agent",
agentKind: "remote",
capabilities: { restic: true },
connectedAt: 1_000,
});
await agentsService.markAgentConnecting({
agentId: "remote-agent",
organizationId: null,
agentName: "Renamed Agent",
agentKind: "remote",
capabilities: { restic: true, webdav: true },
connectedAt: 2_000,
});
const agent = await agentsService.getAgent("remote-agent");
expect(agent).toMatchObject({
id: "remote-agent",
name: "Renamed Agent",
kind: "remote",
status: "connecting",
capabilities: { restic: true, webdav: true },
lastSeenAt: 2_000,
updatedAt: 2_000,
});
});
test("agent runtime status moves from connecting to online, seen, and offline", async () => {
await agentsService.markAgentConnecting({
agentId: LOCAL_AGENT_ID,
organizationId: null,
agentName: LOCAL_AGENT_NAME,
agentKind: LOCAL_AGENT_KIND,
connectedAt: 1_000,
});
await agentsService.markAgentOnline(LOCAL_AGENT_ID, 2_000);
await agentsService.markAgentSeen(LOCAL_AGENT_ID, 3_000);
await agentsService.markAgentOffline(LOCAL_AGENT_ID, 4_000);
const agent = await agentsService.getAgent(LOCAL_AGENT_ID);
expect(agent).toMatchObject({
id: LOCAL_AGENT_ID,
status: "offline",
lastSeenAt: 3_000,
lastReadyAt: 2_000,
updatedAt: 4_000,
});
});

View File

@@ -0,0 +1,280 @@
import { Effect } from "effect";
import { afterEach, expect, test, vi } from "vitest";
import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
import { createAgentMessage } from "@zerobyte/contracts/agent-protocol";
import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants";
const agentsServiceMocks = vi.hoisted(() => ({
markAgentConnecting: vi.fn(() => Promise.resolve()),
markAgentOnline: vi.fn(() => Promise.resolve()),
markAgentSeen: vi.fn(() => Promise.resolve()),
markAgentOffline: vi.fn(() => Promise.resolve()),
}));
const tokenMocks = vi.hoisted(() => ({
validateAgentToken: vi.fn(),
}));
vi.mock("../agents.service", () => ({
agentsService: agentsServiceMocks,
}));
vi.mock("../helpers/tokens", () => ({
validateAgentToken: tokenMocks.validateAgentToken,
}));
const createSocket = (id: string, agentId = LOCAL_AGENT_ID) => ({
data: {
id,
agentId,
organizationId: null,
agentName: agentId === LOCAL_AGENT_ID ? LOCAL_AGENT_NAME : `${LOCAL_AGENT_NAME} ${agentId}`,
agentKind: LOCAL_AGENT_KIND,
},
send: vi.fn(() => 1),
close: vi.fn(),
});
type CapturedFetch = NonNullable<Parameters<typeof Bun.serve>[0]["fetch"]>;
const invokeFetch = (fetch: CapturedFetch | undefined, request: Request, srv: Parameters<CapturedFetch>[1]) => {
if (!fetch) {
throw new Error("Bun.serve was not called with a fetch handler");
}
return Reflect.apply(fetch, fromPartial<ThisParameterType<CapturedFetch>>({}), [
request,
srv,
]) as ReturnType<CapturedFetch>;
};
const startRuntime = async (onEvent = vi.fn()) => {
const { createAgentManagerRuntime } = await import("../controller/server");
const runtime = createAgentManagerRuntime(onEvent);
await Effect.runPromise(runtime.start);
return { runtime, onEvent };
};
afterEach(() => {
vi.restoreAllMocks();
tokenMocks.validateAgentToken.mockReset();
vi.resetModules();
});
test("websocket fetch rejects requests without a bearer token", async () => {
const serve = vi
.spyOn(Bun, "serve")
.mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) }));
const { runtime } = await startRuntime();
const fetch = serve.mock.calls[0]?.[0].fetch;
const upgrade = vi.fn();
const srv = fromPartial<Parameters<NonNullable<typeof fetch>>[1]>({ upgrade });
const response = await invokeFetch(fetch, new Request("http://localhost:3001/agent"), srv);
await Effect.runPromise(runtime.stop);
expect(response?.status).toBe(401);
expect(await response?.text()).toBe("Missing token");
expect(upgrade).not.toHaveBeenCalled();
});
test("websocket fetch rejects invalid bearer tokens", async () => {
tokenMocks.validateAgentToken.mockResolvedValue(undefined);
const serve = vi
.spyOn(Bun, "serve")
.mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) }));
const { runtime } = await startRuntime();
const fetch = serve.mock.calls[0]?.[0].fetch;
const upgrade = vi.fn();
const srv = fromPartial<Parameters<NonNullable<typeof fetch>>[1]>({ upgrade });
const response = await invokeFetch(
fetch,
new Request("http://localhost:3001/agent", { headers: { authorization: "Bearer bad-token" } }),
srv,
);
await Effect.runPromise(runtime.stop);
expect(response?.status).toBe(401);
expect(await response?.text()).toBe("Invalid or revoked token");
expect(tokenMocks.validateAgentToken).toHaveBeenCalledWith("bad-token");
expect(upgrade).not.toHaveBeenCalled();
});
test("websocket fetch upgrades valid agent tokens with connection metadata", async () => {
tokenMocks.validateAgentToken.mockResolvedValue({
agentId: LOCAL_AGENT_ID,
organizationId: null,
agentName: LOCAL_AGENT_NAME,
agentKind: LOCAL_AGENT_KIND,
});
const serve = vi
.spyOn(Bun, "serve")
.mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) }));
const { runtime } = await startRuntime();
const fetch = serve.mock.calls[0]?.[0].fetch;
const upgrade = vi.fn(() => true);
const srv = fromPartial<Parameters<NonNullable<typeof fetch>>[1]>({ upgrade });
const response = await invokeFetch(
fetch,
new Request("http://localhost:3001/agent", { headers: { authorization: "Bearer valid-token" } }),
srv,
);
await Effect.runPromise(runtime.stop);
expect(response).toBeUndefined();
expect(tokenMocks.validateAgentToken).toHaveBeenCalledWith("valid-token");
expect(upgrade).toHaveBeenCalledWith(expect.any(Request), {
data: expect.objectContaining({
agentId: LOCAL_AGENT_ID,
organizationId: null,
agentName: LOCAL_AGENT_NAME,
agentKind: LOCAL_AGENT_KIND,
id: expect.any(String),
}),
});
});
test("websocket lifecycle updates agent connection status", async () => {
const stop = vi.fn(() => Promise.resolve());
const serve = vi.spyOn(Bun, "serve").mockReturnValue(fromPartial({ port: 3001, stop }));
const { runtime } = await startRuntime();
const websocket = serve.mock.calls[0]?.[0].websocket;
const socket = createSocket("connection-1");
await websocket?.open?.(fromPartial(socket));
await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID }));
await websocket?.message?.(fromPartial(socket), createAgentMessage("heartbeat.pong", { sentAt: 123 }));
await websocket?.close?.(fromPartial(socket), 1000, "done");
await Effect.runPromise(runtime.stop);
expect(agentsServiceMocks.markAgentConnecting).toHaveBeenCalledWith({
agentId: LOCAL_AGENT_ID,
organizationId: null,
agentName: LOCAL_AGENT_NAME,
agentKind: LOCAL_AGENT_KIND,
});
expect(agentsServiceMocks.markAgentOnline).toHaveBeenCalledWith(LOCAL_AGENT_ID, expect.any(Number));
expect(agentsServiceMocks.markAgentSeen).toHaveBeenCalledWith(LOCAL_AGENT_ID, expect.any(Number));
expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledWith(LOCAL_AGENT_ID);
expect(stop).toHaveBeenCalledWith(true);
});
test("websocket open failure closes the upgraded socket", async () => {
agentsServiceMocks.markAgentConnecting.mockRejectedValueOnce(new Error("db unavailable"));
const serve = vi
.spyOn(Bun, "serve")
.mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) }));
const { runtime } = await startRuntime();
const websocket = serve.mock.calls[0]?.[0].websocket;
const socket = createSocket("connection-1");
await websocket?.open?.(fromPartial(socket));
expect(socket.close).toHaveBeenCalled();
await Effect.runPromise(runtime.stop);
});
test("shutdown closes all sessions and stops the server when marking one agent offline fails", async () => {
agentsServiceMocks.markAgentOffline.mockRejectedValueOnce(new Error("db unavailable"));
const stop = vi.fn(() => Promise.resolve());
const serve = vi.spyOn(Bun, "serve").mockReturnValue(fromPartial({ port: 3001, stop }));
const { runtime, onEvent } = await startRuntime(vi.fn());
const websocket = serve.mock.calls[0]?.[0].websocket;
const firstSocket = createSocket("connection-1", "agent-1");
const secondSocket = createSocket("connection-2", "agent-2");
await websocket?.open?.(fromPartial(firstSocket));
await websocket?.open?.(fromPartial(secondSocket));
await Effect.runPromise(runtime.stop);
expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledWith("agent-1");
expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledWith("agent-2");
expect(onEvent).toHaveBeenCalledWith(expect.objectContaining({ type: "agent.disconnected", agentId: "agent-1" }));
expect(onEvent).toHaveBeenCalledWith(expect.objectContaining({ type: "agent.disconnected", agentId: "agent-2" }));
expect(stop).toHaveBeenCalledWith(true);
});
test("closing a replaced connection reports disconnect without marking the active agent offline", async () => {
const serve = vi
.spyOn(Bun, "serve")
.mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) }));
const { runtime, onEvent } = await startRuntime(vi.fn());
const websocket = serve.mock.calls[0]?.[0].websocket;
const oldSocket = createSocket("connection-1");
const newSocket = createSocket("connection-2");
const offlineCallsBeforeClose = agentsServiceMocks.markAgentOffline.mock.calls.length;
await websocket?.open?.(fromPartial(oldSocket));
await websocket?.open?.(fromPartial(newSocket));
await websocket?.message?.(fromPartial(newSocket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID }));
await websocket?.close?.(fromPartial(oldSocket), 1000, "replaced");
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({ type: "agent.disconnected", agentId: LOCAL_AGENT_ID }),
);
expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledTimes(offlineCallsBeforeClose);
expect(
await Effect.runPromise(
runtime.sendBackup(LOCAL_AGENT_ID, {
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
repositoryConfig: { backend: "local" as const, path: "/tmp/repository" },
options: {},
runtime: {
password: "password",
cacheDir: "/tmp/cache",
passFile: "/tmp/pass",
defaultExcludes: [],
rcloneConfigFile: "/tmp/rclone.conf",
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],
webhookTimeoutMs: 60_000,
}),
),
).toBe(true);
await Effect.runPromise(runtime.stop);
});
test("sendBackup is only delivered after the agent is ready", async () => {
const serve = vi
.spyOn(Bun, "serve")
.mockReturnValue(fromPartial({ port: 3001, stop: vi.fn(() => Promise.resolve()) }));
const { runtime } = await startRuntime();
const websocket = serve.mock.calls[0]?.[0].websocket;
const socket = createSocket("connection-1");
const payload = {
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
repositoryConfig: { backend: "local" as const, path: "/tmp/repository" },
options: {},
runtime: {
password: "password",
cacheDir: "/tmp/cache",
passFile: "/tmp/pass",
defaultExcludes: [],
rcloneConfigFile: "/tmp/rclone.conf",
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],
webhookTimeoutMs: 60_000,
};
await websocket?.open?.(fromPartial(socket));
await expect(Effect.runPromise(runtime.sendBackup(LOCAL_AGENT_ID, payload))).resolves.toBe(false);
await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID }));
await expect(Effect.runPromise(runtime.sendBackup(LOCAL_AGENT_ID, payload))).resolves.toBe(true);
await waitForExpect(() => {
expect(socket.send).toHaveBeenCalledWith(expect.stringContaining('"type":"backup.run"'));
});
await Effect.runPromise(runtime.stop);
});

View File

@@ -1,29 +1,41 @@
import { Effect, Exit, Scope } from "effect";
import { Effect, Exit, Fiber, Scope } from "effect";
import { expect, test, vi } from "vitest";
import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
import { createAgentMessage } from "@zerobyte/contracts/agent-protocol";
import { createAgentMessage, type AgentMessage } from "@zerobyte/contracts/agent-protocol";
import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants";
import { createControllerAgentSession } from "../controller/session";
const createSocket = (overrides: Partial<Parameters<typeof createControllerAgentSession>[0]> = {}) => {
return {
data: { id: "connection-1", agentId: "local", organizationId: null, agentName: "Local Agent" },
data: {
id: "connection-1",
agentId: LOCAL_AGENT_ID,
organizationId: null,
agentName: LOCAL_AGENT_NAME,
agentKind: LOCAL_AGENT_KIND,
},
send: vi.fn(() => 1),
close: vi.fn(),
...overrides,
};
};
const createSession = (handlers: Parameters<typeof createControllerAgentSession>[1] = {}, socket = createSocket()) => {
const createSession = (
onEvent: Parameters<typeof createControllerAgentSession>[1] = () => Effect.void,
socket = createSocket(),
) => {
const scope = Effect.runSync(Scope.make());
try {
const session = Effect.runSync(Scope.extend(createControllerAgentSession(fromPartial(socket), handlers), scope));
const session = Effect.runSync(Scope.extend(createControllerAgentSession(fromPartial(socket), onEvent), scope));
return {
session,
run: () => {
Effect.runFork(Scope.extend(session.run, scope));
const fiber = Effect.runFork(Scope.extend(session.run, scope));
Effect.runSync(Scope.addFinalizer(scope, Fiber.interrupt(fiber)));
return fiber;
},
socket,
close: () => {
@@ -39,92 +51,33 @@ const createSession = (handlers: Parameters<typeof createControllerAgentSession>
}
};
test("close emits a synthetic backup.cancelled for a started backup", () => {
const onBackupCancelled = vi.fn();
const { session, close } = createSession({
onBackupCancelled,
});
test("closing the session scope interrupts the session runner", async () => {
const { run, closeAsync } = createSession();
const fiber = run();
Effect.runSync(
session.handleMessage(
createAgentMessage("backup.started", {
jobId: "job-1",
scheduleId: "schedule-1",
}),
),
);
await closeAsync();
const exit = await Effect.runPromise(Fiber.await(fiber).pipe(Effect.timeout("100 millis")));
expect(Exit.isInterrupted(exit)).toBe(true);
});
test("close reports a transport disconnect", () => {
const onEvent = vi.fn(() => Effect.void);
const { close } = createSession(onEvent);
close();
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledWith(
expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
jobId: "job-1",
scheduleId: "schedule-1",
type: "agent.disconnected",
}),
);
});
test.each([
{
name: "backup.completed",
jobId: "job-1",
scheduleId: "schedule-1",
terminalMessage: createAgentMessage("backup.completed", {
jobId: "job-1",
scheduleId: "schedule-1",
exitCode: 0,
result: null,
}),
expectedCancelledCalls: 0,
},
{
name: "backup.failed",
jobId: "job-2",
scheduleId: "schedule-2",
terminalMessage: createAgentMessage("backup.failed", {
jobId: "job-2",
scheduleId: "schedule-2",
error: "backup failed",
}),
expectedCancelledCalls: 0,
},
{
name: "backup.cancelled",
jobId: "job-3",
scheduleId: "schedule-3",
terminalMessage: createAgentMessage("backup.cancelled", {
jobId: "job-3",
scheduleId: "schedule-3",
message: "Backup was cancelled",
}),
expectedCancelledCalls: 1,
},
])("close does not emit an extra synthetic backup.cancelled after $name", (testCase) => {
const onBackupCancelled = vi.fn();
const { session, close } = createSession({
onBackupCancelled,
});
Effect.runSync(
session.handleMessage(
createAgentMessage("backup.started", {
jobId: testCase.jobId,
scheduleId: testCase.scheduleId,
}),
),
);
Effect.runSync(session.handleMessage(testCase.terminalMessage));
close();
expect(onBackupCancelled).toHaveBeenCalledTimes(testCase.expectedCancelledCalls);
});
test("close emits a synthetic backup.cancelled for a queued backup", () => {
const onBackupCancelled = vi.fn();
const { session, close } = createSession({
onBackupCancelled,
});
test("sendBackup only queues the transport message", () => {
const onEvent = vi.fn(() => Effect.void);
const { session, close } = createSession(onEvent);
Effect.runSync(
session.sendBackup({
@@ -152,31 +105,77 @@ test("close emits a synthetic backup.cancelled for a queued backup", () => {
close();
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenLastCalledWith(
expect.objectContaining({
jobId: "job-queued",
scheduleId: "schedule-queued",
}),
);
expect(onEvent).not.toHaveBeenCalledWith(expect.objectContaining({ type: "backup.cancelled" }));
});
test("a dropped backup.cancel closes the session and emits a synthetic backup.cancelled", async () => {
test("invalid inbound messages are ignored", () => {
const onEvent = vi.fn(() => Effect.void);
const { session, close } = createSession(onEvent);
Effect.runSync(session.handleMessage("not json"));
Effect.runSync(session.handleMessage(JSON.stringify({ type: "backup.progress", payload: {} })));
expect(onEvent).not.toHaveBeenCalled();
close();
});
test("agent.ready marks the session ready and forwards the event", () => {
const onEvent = vi.fn(() => Effect.void);
const { session, close } = createSession(onEvent);
expect(Effect.runSync(session.isReady())).toBe(false);
Effect.runSync(session.handleMessage(createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID })));
expect(Effect.runSync(session.isReady())).toBe(true);
expect(onEvent).toHaveBeenCalledWith({ type: "agent.ready", payload: { agentId: LOCAL_AGENT_ID } });
close();
});
test("backup agent messages are forwarded unchanged", () => {
const onEvent = vi.fn(() => Effect.void);
const { session, close } = createSession(onEvent);
const message = {
type: "backup.progress" as const,
payload: {
jobId: "job-1",
scheduleId: "schedule-1",
progress: {
message_type: "status" as const,
seconds_elapsed: 0,
seconds_remaining: 0,
percent_done: 0.5,
total_files: 0,
files_done: 0,
total_bytes: 0,
bytes_done: 0,
current_files: [],
},
},
} satisfies Extract<AgentMessage, { type: "backup.progress" }>;
Effect.runSync(session.handleMessage(createAgentMessage(message.type, message.payload)));
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: message.type,
payload: expect.objectContaining({
jobId: message.payload.jobId,
scheduleId: message.payload.scheduleId,
progress: expect.objectContaining(message.payload.progress),
}),
}),
);
close();
});
test("a dropped backup.cancel closes the session and reports a transport disconnect", async () => {
const send = vi.fn(() => 0);
const socket = createSocket({ send, close: vi.fn() });
const onBackupCancelled = vi.fn();
const { session, run, closeAsync } = createSession({ onBackupCancelled }, socket);
const onEvent = vi.fn(() => Effect.void);
const { session, run, closeAsync } = createSession(onEvent, socket);
try {
run();
Effect.runSync(
session.handleMessage(
createAgentMessage("backup.started", {
jobId: "job-1",
scheduleId: "schedule-1",
}),
),
);
Effect.runSync(
session.sendBackupCancel({
jobId: "job-1",
@@ -187,11 +186,10 @@ test("a dropped backup.cancel closes the session and emits a synthetic backup.ca
await waitForExpect(() => {
expect(send).toHaveBeenCalledTimes(1);
expect(socket.close).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledWith(
expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
jobId: "job-1",
scheduleId: "schedule-1",
type: "agent.disconnected",
}),
);
});

View File

@@ -1,7 +1,8 @@
import { logger } from "@zerobyte/core/node";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import { Effect } from "effect";
import { config } from "../../core/config";
import type { AgentBackupEventHandlers } from "./controller/server";
import { createAgentManagerRuntime, type AgentManagerEvent } from "./controller/server";
import { spawnLocalAgentProcess, stopLocalAgentProcess } from "./local/process";
import type { BackupExecutionProgress, BackupExecutionResult } from "./helpers/runtime-state";
import { createAgentRuntimeState } from "./helpers/runtime-state";
@@ -46,6 +47,17 @@ const resolveActiveBackupRun = (scheduleId: number, result: BackupExecutionResul
return true;
};
const cancelActiveBackupRunsForAgent = (agentId: string, message: string) => {
const activeBackupsByScheduleId = getActiveBackupsByScheduleId();
const matchingScheduleIds = [...activeBackupsByScheduleId.values()]
.filter((activeBackupRun) => activeBackupRun.agentId === agentId)
.map((activeBackupRun) => activeBackupRun.scheduleId);
for (const scheduleId of matchingScheduleIds) {
resolveActiveBackupRun(scheduleId, { status: "cancelled", message });
}
};
const getActiveBackupRun = (jobId: string, scheduleId: string, eventName: string, agentId: string) => {
const trackedScheduleId = getActiveBackupScheduleIdsByJobId().get(jobId);
if (trackedScheduleId === undefined) {
@@ -86,10 +98,12 @@ const requestBackupCancellation = async (agentId: string, scheduleId: number) =>
}
if (
await runtime.cancelBackup(agentId, {
jobId: activeBackupRun.jobId,
scheduleId: activeBackupRun.scheduleShortId,
})
await Effect.runPromise(
runtime.cancelBackup(agentId, {
jobId: activeBackupRun.jobId,
scheduleId: activeBackupRun.scheduleShortId,
}),
)
) {
return true;
}
@@ -98,68 +112,99 @@ const requestBackupCancellation = async (agentId: string, scheduleId: number) =>
return true;
};
const backupEventHandlers: AgentBackupEventHandlers = {
onBackupStarted: ({ agentId, payload }) => {
getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.started", agentId);
},
onBackupProgress: ({ agentId, payload }) => {
const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.progress", agentId);
if (!activeBackupRun) {
return;
const handleAgentManagerEvent = (event: AgentManagerEvent) => {
switch (event.type) {
case "agent.disconnected": {
cancelActiveBackupRunsForAgent(
event.agentId,
"The connection to the backup agent was lost. Restart the backup to ensure it completes.",
);
break;
}
activeBackupRun.onProgress(payload.progress);
},
onBackupCompleted: ({ agentId, payload }) => {
const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.completed", agentId);
if (!activeBackupRun) {
return;
case "backup.started": {
getActiveBackupRun(event.payload.jobId, event.payload.scheduleId, event.type, event.agentId);
break;
}
case "backup.progress": {
const activeBackupRun = getActiveBackupRun(
event.payload.jobId,
event.payload.scheduleId,
event.type,
event.agentId,
);
if (!activeBackupRun) {
break;
}
resolveActiveBackupRun(activeBackupRun.scheduleId, {
status: "completed",
exitCode: payload.exitCode,
result: payload.result,
warningDetails: payload.warningDetails ?? null,
});
},
onBackupFailed: ({ agentId, payload }) => {
const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.failed", agentId);
if (!activeBackupRun) {
return;
activeBackupRun.onProgress(event.payload.progress);
break;
}
case "backup.completed": {
const activeBackupRun = getActiveBackupRun(
event.payload.jobId,
event.payload.scheduleId,
event.type,
event.agentId,
);
if (!activeBackupRun) {
break;
}
resolveActiveBackupRun(activeBackupRun.scheduleId, {
status: "failed",
error: payload.errorDetails ?? payload.error,
});
},
onBackupCancelled: ({ agentId, payload }) => {
const activeBackupRun = getActiveBackupRun(payload.jobId, payload.scheduleId, "backup.cancelled", agentId);
if (!activeBackupRun) {
return;
resolveActiveBackupRun(activeBackupRun.scheduleId, {
status: "completed",
exitCode: event.payload.exitCode,
result: event.payload.result,
warningDetails: event.payload.warningDetails ?? null,
});
break;
}
case "backup.failed": {
const activeBackupRun = getActiveBackupRun(
event.payload.jobId,
event.payload.scheduleId,
event.type,
event.agentId,
);
if (!activeBackupRun) {
break;
}
resolveActiveBackupRun(activeBackupRun.scheduleId, {
status: "cancelled",
message: activeBackupRun.cancellationRequested ? undefined : payload.message,
});
},
resolveActiveBackupRun(activeBackupRun.scheduleId, {
status: "failed",
error: event.payload.errorDetails ?? event.payload.error,
});
break;
}
case "backup.cancelled": {
const activeBackupRun = getActiveBackupRun(
event.payload.jobId,
event.payload.scheduleId,
event.type,
event.agentId,
);
if (!activeBackupRun) {
break;
}
resolveActiveBackupRun(activeBackupRun.scheduleId, {
status: "cancelled",
message: activeBackupRun.cancellationRequested ? undefined : event.payload.message,
});
break;
}
}
};
export const startAgentController = async () => {
const runtime = getAgentRuntimeState();
if (runtime.agentManager) {
await runtime.agentManager.stop();
await Effect.runPromise(runtime.agentManager.stop);
runtime.agentManager = null;
}
const { createAgentManagerRuntime } = await import("./controller/server");
const nextAgentManager = createAgentManagerRuntime();
nextAgentManager.setBackupEventHandlers(backupEventHandlers);
await nextAgentManager.start();
const nextAgentManager = createAgentManagerRuntime(handleAgentManagerEvent);
await Effect.runPromise(nextAgentManager.start);
runtime.agentManager = nextAgentManager;
};
@@ -167,7 +212,9 @@ export const stopAgentController = async () => {
const runtime = getAgentRuntimeState();
const agentManagerRuntime = runtime.agentManager;
runtime.agentManager = null;
await agentManagerRuntime?.stop();
if (agentManagerRuntime) {
await Effect.runPromise(agentManagerRuntime.stop);
}
};
export const agentManager = {
@@ -186,6 +233,7 @@ export const agentManager = {
const completion = new Promise<BackupExecutionResult>((resolve) => {
getActiveBackupsByScheduleId().set(request.scheduleId, {
agentId,
scheduleId: request.scheduleId,
jobId: request.payload.jobId,
scheduleShortId: request.payload.scheduleId,
@@ -197,7 +245,7 @@ export const agentManager = {
});
try {
if (!(await runtime.sendBackup(agentId, request.payload))) {
if (!(await Effect.runPromise(runtime.sendBackup(agentId, request.payload)))) {
clearActiveBackupRun(request.scheduleId);
return {
status: "unavailable",

View File

@@ -0,0 +1,129 @@
import { eq } from "drizzle-orm";
import { db } from "../../db/db";
import { agentsTable, type Agent, type AgentCapabilities, type AgentKind } from "../../db/schema";
import { LOCAL_AGENT_CAPABILITIES, LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "./constants";
type AgentConnectionRegistration = {
agentId: string;
organizationId: string | null;
agentName: string;
agentKind: AgentKind;
capabilities?: AgentCapabilities;
connectedAt?: number;
};
const listAgents = async (organizationId?: string | null) => {
if (organizationId === undefined) {
return db.query.agentsTable.findMany({ orderBy: { createdAt: "asc" } });
}
if (organizationId === null) {
return db.query.agentsTable.findMany({
where: { organizationId: { isNull: true } },
orderBy: { createdAt: "asc" },
});
}
return db.query.agentsTable.findMany({
where: { organizationId },
orderBy: { createdAt: "asc" },
});
};
const getAgent = async (agentId: string) => {
return db.query.agentsTable.findFirst({ where: { id: agentId } });
};
const ensureLocalAgent = async () => {
const existing = await getAgent(LOCAL_AGENT_ID);
if (existing) {
return existing;
}
await db.insert(agentsTable).values({
id: LOCAL_AGENT_ID,
organizationId: null,
name: LOCAL_AGENT_NAME,
kind: LOCAL_AGENT_KIND,
status: "offline",
capabilities: LOCAL_AGENT_CAPABILITIES,
updatedAt: Date.now(),
});
return getAgent(LOCAL_AGENT_ID);
};
const markAgentConnecting = async (params: AgentConnectionRegistration) => {
const { agentId, organizationId, agentName, agentKind, capabilities, connectedAt = Date.now() } = params;
await db
.insert(agentsTable)
.values({
id: agentId,
organizationId,
name: agentName,
kind: agentKind,
status: "connecting",
capabilities: capabilities ?? {},
lastSeenAt: connectedAt,
updatedAt: connectedAt,
})
.onConflictDoUpdate({
target: agentsTable.id,
set: {
organizationId,
name: agentName,
kind: agentKind,
status: "connecting",
lastSeenAt: connectedAt,
updatedAt: connectedAt,
capabilities: capabilities ?? {},
},
});
return getAgent(agentId);
};
const updateAgentRuntime = async (agentId: string, values: Partial<Agent>) => {
const [updatedAgent] = await db.update(agentsTable).set(values).where(eq(agentsTable.id, agentId)).returning();
if (!updatedAgent) {
throw new Error(`Agent ${agentId} not found`);
}
return updatedAgent;
};
const markAgentOnline = async (agentId: string, readyAt = Date.now()) => {
return updateAgentRuntime(agentId, {
status: "online",
lastSeenAt: readyAt,
lastReadyAt: readyAt,
updatedAt: readyAt,
});
};
const markAgentSeen = async (agentId: string, seenAt = Date.now()) => {
return updateAgentRuntime(agentId, {
lastSeenAt: seenAt,
updatedAt: seenAt,
});
};
const markAgentOffline = async (agentId: string, disconnectedAt = Date.now()) => {
return updateAgentRuntime(agentId, {
status: "offline",
updatedAt: disconnectedAt,
});
};
export const agentsService = {
listAgents,
getAgent,
ensureLocalAgent,
markAgentConnecting,
markAgentOnline,
markAgentSeen,
markAgentOffline,
};

View File

@@ -0,0 +1,6 @@
import type { AgentCapabilities, AgentKind } from "../../db/schema";
export const LOCAL_AGENT_ID = "local";
export const LOCAL_AGENT_NAME = "Local Agent";
export const LOCAL_AGENT_KIND: AgentKind = "local";
export const LOCAL_AGENT_CAPABILITIES: AgentCapabilities = {};

View File

@@ -1,40 +1,35 @@
import { Data, Effect, Exit, Fiber, Scope } from "effect";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import type {
BackupCancelPayload,
BackupCancelledPayload,
BackupCompletedPayload,
BackupFailedPayload,
BackupProgressPayload,
BackupRunPayload,
BackupStartedPayload,
} from "@zerobyte/contracts/agent-protocol";
import { createControllerAgentSession, type AgentConnectionData, type ControllerAgentSession } from "./session";
import type { AgentMessage, BackupCancelPayload, BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import {
createControllerAgentSession,
type AgentConnectionData,
type ControllerAgentSession,
type ControllerAgentSessionEvent,
} from "./session";
import { agentsService } from "../agents.service";
import { validateAgentToken } from "../helpers/tokens";
type AgentBackupEventContext = {
type AgentEventContext = {
agentId: string;
agentName: string;
payload:
| BackupStartedPayload
| BackupProgressPayload
| BackupCompletedPayload
| BackupFailedPayload
| BackupCancelledPayload;
};
export type AgentBackupEventHandlers = {
onBackupStarted?: (context: AgentBackupEventContext & { payload: BackupStartedPayload }) => void;
onBackupProgress?: (context: AgentBackupEventContext & { payload: BackupProgressPayload }) => void;
onBackupCompleted?: (context: AgentBackupEventContext & { payload: BackupCompletedPayload }) => void;
onBackupFailed?: (context: AgentBackupEventContext & { payload: BackupFailedPayload }) => void;
onBackupCancelled?: (context: AgentBackupEventContext & { payload: BackupCancelledPayload }) => void;
};
type AgentBackupMessage = Extract<
AgentMessage,
{
type: "backup.started" | "backup.progress" | "backup.completed" | "backup.failed" | "backup.cancelled";
}
>;
export type AgentManagerEvent =
| (AgentEventContext & { type: "agent.disconnected" })
| (AgentEventContext & AgentBackupMessage);
type ControllerAgentSessionHandle = {
agentId: string;
session: ControllerAgentSession;
runFiber: Fiber.RuntimeFiber<void, never>;
scope: Scope.CloseableScope;
};
@@ -42,89 +37,163 @@ class StopAgentManagerServerError extends Data.TaggedError("StopAgentManagerServ
cause: unknown;
}> {}
export function createAgentManagerRuntime() {
export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) => void) {
let sessions = new Map<string, ControllerAgentSessionHandle>();
let backupHandlers: AgentBackupEventHandlers = {};
let runtimeScope: Scope.CloseableScope | null = null;
const closeSession = (sessionHandle: ControllerAgentSessionHandle) =>
Effect.gen(function* () {
yield* Fiber.interrupt(sessionHandle.runFiber);
yield* Scope.close(sessionHandle.scope, Exit.succeed(undefined));
yield* Effect.sync(() => {
if (sessions.get(sessionHandle.agentId) === sessionHandle) {
sessions.delete(sessionHandle.agentId);
}
});
});
const markAgentOfflineForShutdown = (agentId: string) =>
Effect.tryPromise({
try: () => agentsService.markAgentOffline(agentId),
catch: (error) => new StopAgentManagerServerError({ cause: error }),
}).pipe(
Effect.catchAll((error) =>
logger.effect.error(`Failed to mark agent ${agentId} offline during shutdown: ${toMessage(error)}`),
),
);
const closeAllSessions = Effect.gen(function* () {
const currentSessions = sessions;
sessions = new Map();
for (const sessionHandle of currentSessions.values()) {
const currentSessions = [...sessions.entries()];
for (const [agentId, sessionHandle] of currentSessions) {
yield* markAgentOfflineForShutdown(agentId);
yield* closeSession(sessionHandle);
}
sessions = new Map();
});
const getSessionHandle = (agentId: string) => sessions.get(agentId);
const getSession = (agentId: string) => getSessionHandle(agentId)?.session;
const createSessionHandlers = (ws: Bun.ServerWebSocket<AgentConnectionData>) => {
const agentId = ws.data.agentId;
const agentName = ws.data.agentName;
const handleSessionEvent = (params: { agentId: string; agentName: string; sessionId: string }) => {
const { agentId, agentName } = params;
return {
onBackupStarted: (payload: BackupStartedPayload) => {
backupHandlers.onBackupStarted?.({ agentId, agentName, payload });
},
onBackupProgress: (payload: BackupProgressPayload) => {
backupHandlers.onBackupProgress?.({ agentId, agentName, payload });
},
onBackupCompleted: (payload: BackupCompletedPayload) => {
backupHandlers.onBackupCompleted?.({ agentId, agentName, payload });
},
onBackupFailed: (payload: BackupFailedPayload) => {
backupHandlers.onBackupFailed?.({ agentId, agentName, payload });
},
onBackupCancelled: (payload: BackupCancelledPayload) => {
backupHandlers.onBackupCancelled?.({ agentId, agentName, payload });
},
return (event: ControllerAgentSessionEvent) => {
switch (event.type) {
case "agent.ready": {
const at = Date.now();
return Effect.promise(async () => {
await agentsService.markAgentOnline(agentId, at);
});
}
case "heartbeat.pong": {
const at = Date.now();
return Effect.promise(() => agentsService.markAgentSeen(agentId, at));
}
case "agent.disconnected": {
return Effect.sync(() => onEvent({ type: "agent.disconnected", agentId, agentName }));
}
default: {
return Effect.sync(() => onEvent({ ...event, agentId, agentName }));
}
}
};
};
const createSession = (ws: Bun.ServerWebSocket<AgentConnectionData>) => {
// Manual scope management because we are out of Effect
const scope = Effect.runSync(Scope.make());
const createSession = (ws: Bun.ServerWebSocket<AgentConnectionData>) =>
Effect.gen(function* () {
const scope = yield* Scope.make();
try {
const session = Effect.runSync(Scope.extend(createControllerAgentSession(ws, createSessionHandlers(ws)), scope));
const runFiber = Effect.runFork(Scope.extend(session.run, scope));
const session = yield* Scope.extend(
createControllerAgentSession(
ws,
handleSessionEvent({
agentId: ws.data.agentId,
agentName: ws.data.agentName,
sessionId: ws.data.id,
}),
),
scope,
);
const runFiber = yield* Effect.forkDaemon(Scope.extend(session.run, scope));
yield* Scope.addFinalizer(scope, Fiber.interrupt(runFiber));
return { session, runFiber, scope };
} catch (error) {
Effect.runSync(Scope.close(scope, Exit.fail(error)));
throw error;
}
};
const setSession = (agentId: string, sessionHandle: ControllerAgentSessionHandle) => {
const existingSession = getSessionHandle(agentId);
if (existingSession) {
void Effect.runPromise(closeSession(existingSession)).catch((error) => {
logger.error(`Failed to close existing agent session for ${agentId}: ${toMessage(error)}`);
});
}
sessions.set(agentId, sessionHandle);
};
const removeSession = (agentId: string, connectionId: string) => {
const sessionHandle = getSessionHandle(agentId);
if (!sessionHandle || sessionHandle.session.connectionId !== connectionId) {
return;
}
sessions.delete(agentId);
void Effect.runPromise(closeSession(sessionHandle)).catch((error) => {
logger.error(`Failed to close agent session for ${agentId}: ${toMessage(error)}`);
return { agentId: ws.data.agentId, session, scope };
});
};
const setSession = (sessionHandle: ControllerAgentSessionHandle) =>
Effect.gen(function* () {
const existingSession = sessions.get(sessionHandle.agentId);
sessions.set(sessionHandle.agentId, sessionHandle);
if (existingSession) {
yield* closeSession(existingSession);
}
});
const removeSession = (agentId: string, connectionId: string) =>
Effect.gen(function* () {
const handle = sessions.get(agentId);
if (!handle || handle.session.connectionId !== connectionId) {
return false;
}
yield* closeSession(handle);
yield* Effect.promise(() => agentsService.markAgentOffline(agentId));
return true;
});
const handleMessage = (ws: Bun.ServerWebSocket<AgentConnectionData>, data: unknown) =>
Effect.gen(function* () {
if (typeof data !== "string") {
yield* logger.effect.warn(`Ignoring non-text message from agent ${ws.data.agentId}`);
return;
}
const session = getSession(ws.data.agentId);
if (!session || session.connectionId !== ws.data.id) {
yield* logger.effect.warn(`No active session for agent ${ws.data.agentId} on ${ws.data.id}`);
return;
}
yield* session.handleMessage(data);
});
const handleOpen = (ws: Bun.ServerWebSocket<AgentConnectionData>) =>
Effect.gen(function* () {
yield* Effect.promise(() =>
agentsService.markAgentConnecting({
agentId: ws.data.agentId,
organizationId: ws.data.organizationId,
agentName: ws.data.agentName,
agentKind: ws.data.agentKind,
}),
);
const sessionHandle = yield* createSession(ws);
yield* setSession(sessionHandle);
yield* logger.effect.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) connected on ${ws.data.id}`);
});
const handleClose = (ws: Bun.ServerWebSocket<AgentConnectionData>) =>
Effect.gen(function* () {
yield* removeSession(ws.data.agentId, ws.data.id);
yield* logger.effect.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) disconnected`);
});
const runWebSocketHandler = (
ws: Bun.ServerWebSocket<AgentConnectionData>,
event: string,
effect: Effect.Effect<void>,
) =>
Effect.runPromise(
effect.pipe(
Effect.catchAllCause((cause) =>
logger.effect.error(
`Agent websocket ${event} failed for ${ws.data.agentId} on ${ws.data.id}: ${toMessage(cause)}`,
),
),
),
);
const acquireServer = Effect.acquireRelease(
Effect.sync(() =>
@@ -149,37 +218,24 @@ export function createAgentManagerRuntime() {
agentId: result.agentId,
organizationId: result.organizationId,
agentName: result.agentName,
agentKind: result.agentKind,
},
});
if (upgraded) return undefined;
return new Response("WebSocket upgrade failed", { status: 400 });
},
websocket: {
open: (ws) => {
setSession(ws.data.agentId, createSession(ws));
logger.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) connected on ${ws.data.id}`);
},
message: (ws, data) => {
if (typeof data !== "string") {
logger.warn(`Ignoring non-text message from agent ${ws.data.agentId}`);
return;
open: async (ws) => {
await runWebSocketHandler(ws, "open", handleOpen(ws));
if (getSession(ws.data.agentId)?.connectionId !== ws.data.id) {
ws.close();
}
const session = getSession(ws.data.agentId);
if (!session || session.connectionId !== ws.data.id) {
logger.warn(`No active session for agent ${ws.data.agentId} on ${ws.data.id}`);
return;
}
void Effect.runPromise(session.handleMessage(data)).catch((error) => {
logger.error(
`Failed to handle message from agent ${ws.data.agentId} on ${ws.data.id}: ${toMessage(error)}`,
);
});
},
close: (ws) => {
removeSession(ws.data.agentId, ws.data.id);
logger.info(`Agent "${ws.data.agentName}" (${ws.data.agentId}) disconnected`);
message: async (ws, data) => {
await runWebSocketHandler(ws, "message", handleMessage(ws, data));
},
close: async (ws) => {
await runWebSocketHandler(ws, "close", handleClose(ws));
},
},
}),
@@ -192,15 +248,13 @@ export function createAgentManagerRuntime() {
catch: (error) => new StopAgentManagerServerError({ cause: error }),
}),
),
Effect.catchAll((error) =>
Effect.sync(() => {
logger.error(`Failed to stop Agent Manager server: ${toMessage(error.cause)}`);
}),
),
Effect.catchAll((error) => {
return logger.effect.error(`Failed to stop Agent Manager server: ${toMessage(error.cause)}`);
}),
),
);
const stop = async () => {
const stop = Effect.gen(function* () {
if (!runtimeScope) {
return;
}
@@ -208,70 +262,67 @@ export function createAgentManagerRuntime() {
logger.info("Stopping Agent Manager...");
const scope = runtimeScope;
runtimeScope = null;
await Effect.runPromise(Scope.close(scope, Exit.succeed(undefined)));
};
yield* Scope.close(scope, Exit.succeed(undefined));
});
const start = async () => {
const start = Effect.gen(function* () {
if (runtimeScope) {
await stop();
yield* stop;
}
logger.info("Starting Agent Manager...");
const scope = Effect.runSync(Scope.make());
const scope = yield* Scope.make();
try {
const server = Effect.runSync(Scope.extend(acquireServer, scope));
runtimeScope = scope;
logger.info(`Agent Manager listening on port ${server.port}`);
} catch (error) {
await Effect.runPromise(Scope.close(scope, Exit.fail(error)));
throw error;
}
};
const server = yield* Scope.extend(acquireServer, scope).pipe(
Effect.catchAllCause((cause) =>
Scope.close(scope, Exit.failCause(cause)).pipe(Effect.andThen(Effect.failCause(cause))),
),
);
runtimeScope = scope;
logger.info(`Agent Manager listening on port ${server.port}`);
});
return {
start,
sendBackup: async (agentId: string, payload: BackupRunPayload) => {
const session = getSession(agentId);
sendBackup: (agentId: string, payload: BackupRunPayload) =>
Effect.gen(function* () {
const session = getSession(agentId);
if (!session) {
logger.warn(`Cannot send backup command. Agent ${agentId} is not connected.`);
return false;
}
if (!session) {
logger.warn(`Cannot send backup command. Agent ${agentId} is not connected.`);
return false;
}
if (!Effect.runSync(session.isReady())) {
logger.warn(`Cannot send backup command. Agent ${agentId} is not ready.`);
return false;
}
if (!(yield* session.isReady())) {
logger.warn(`Cannot send backup command. Agent ${agentId} is not ready.`);
return false;
}
if (!(await Effect.runPromise(session.sendBackup(payload)))) {
logger.warn(`Cannot send backup command. Agent ${agentId} is no longer accepting commands.`);
return false;
}
if (!(yield* session.sendBackup(payload))) {
logger.warn(`Cannot send backup command. Agent ${agentId} is no longer accepting commands.`);
return false;
}
logger.info(`Sent backup command ${payload.jobId} to agent ${agentId} for schedule ${payload.scheduleId}`);
return true;
},
cancelBackup: async (agentId: string, payload: BackupCancelPayload) => {
const session = getSession(agentId);
logger.info(`Sent backup command ${payload.jobId} to agent ${agentId} for schedule ${payload.scheduleId}`);
return true;
}),
cancelBackup: (agentId: string, payload: BackupCancelPayload) =>
Effect.gen(function* () {
const session = getSession(agentId);
if (!session) {
logger.warn(`Cannot cancel backup command. Agent ${agentId} is not connected.`);
return false;
}
if (!session) {
logger.warn(`Cannot cancel backup command. Agent ${agentId} is not connected.`);
return false;
}
if (!(await Effect.runPromise(session.sendBackupCancel(payload)))) {
logger.warn(`Cannot cancel backup command. Agent ${agentId} is no longer accepting commands.`);
return false;
}
if (!(yield* session.sendBackupCancel(payload))) {
logger.warn(`Cannot cancel backup command. Agent ${agentId} is no longer accepting commands.`);
return false;
}
logger.info(`Sent backup cancel for command ${payload.jobId} to agent ${agentId}`);
return true;
},
setBackupEventHandlers: (handlers: AgentBackupEventHandlers) => {
backupHandlers = handlers;
},
getBackupEventHandlers: () => backupHandlers,
logger.info(`Sent backup cancel for command ${payload.jobId} to agent ${agentId}`);
return true;
}),
stop,
};
}

View File

@@ -1,15 +1,11 @@
import { Effect, Queue, Ref, type Scope } from "effect";
import type { AgentKind } from "../../../db/schema";
import {
createControllerMessage,
parseAgentMessage,
type AgentMessage,
type BackupCancelPayload,
type BackupCancelledPayload,
type BackupCompletedPayload,
type BackupFailedPayload,
type BackupProgressPayload,
type BackupRunPayload,
type BackupStartedPayload,
type ControllerWireMessage,
} from "@zerobyte/contracts/agent-protocol";
import { logger } from "@zerobyte/core/node";
@@ -20,6 +16,7 @@ export type AgentConnectionData = {
agentId: string;
organizationId: string | null;
agentName: string;
agentKind: AgentKind;
};
type AgentSocket = Bun.ServerWebSocket<AgentConnectionData>;
@@ -30,18 +27,7 @@ type SessionState = {
lastPongAt: number | null;
};
type TrackedBackupJob = {
scheduleId: string;
state: "pending" | "active";
};
type ControllerAgentSessionHandlers = {
onBackupStarted?: (payload: BackupStartedPayload) => void;
onBackupProgress?: (payload: BackupProgressPayload) => void;
onBackupCompleted?: (payload: BackupCompletedPayload) => void;
onBackupFailed?: (payload: BackupFailedPayload) => void;
onBackupCancelled?: (payload: BackupCancelledPayload) => void;
};
export type ControllerAgentSessionEvent = AgentMessage | { type: "agent.disconnected" };
export type ControllerAgentSession = {
readonly connectionId: string;
@@ -54,12 +40,11 @@ export type ControllerAgentSession = {
export const createControllerAgentSession = (
socket: AgentSocket,
handlers: ControllerAgentSessionHandlers = {},
onEvent: (event: ControllerAgentSessionEvent) => Effect.Effect<void>,
): Effect.Effect<ControllerAgentSession, never, Scope.Scope> =>
Effect.gen(function* () {
let isClosed = false;
const outboundQueue = yield* Queue.bounded<ControllerWireMessage>(64);
const trackedBackupJobs = yield* Ref.make<Map<string, TrackedBackupJob>>(new Map());
const state = yield* Ref.make<SessionState>({
isReady: false,
lastSeenAt: null,
@@ -78,37 +63,10 @@ export const createControllerAgentSession = (
const updateState = (update: (current: SessionState) => SessionState) => Ref.update(state, update);
const setTrackedBackupJob = (jobId: string, trackedBackupJob: TrackedBackupJob) => {
return Ref.update(trackedBackupJobs, (current) => {
const next = new Map(current);
next.set(jobId, trackedBackupJob);
return next;
});
};
const deleteTrackedBackupJob = (jobId: string) => {
return Ref.update(trackedBackupJobs, (current) => {
const next = new Map(current);
next.delete(jobId);
return next;
});
};
const takeTrackedBackupJobs = Ref.modify(
trackedBackupJobs,
(current) => [current, new Map<string, TrackedBackupJob>()] as const,
);
const releaseSession = Effect.gen(function* () {
yield* updateState((current) => ({ ...current, isReady: false }));
const trackedJobs = yield* takeTrackedBackupJobs;
for (const [jobId, trackedJob] of trackedJobs) {
const message = "The connection to the backup agent was lost. Restart the backup to ensure it completes.";
yield* Effect.sync(() => {
handlers.onBackupCancelled?.({ jobId, scheduleId: trackedJob.scheduleId, message });
});
}
const disconnectedAt = Date.now();
yield* updateState((current) => ({ ...current, isReady: false, lastSeenAt: disconnectedAt }));
yield* onEvent({ type: "agent.disconnected" });
yield* Queue.shutdown(outboundQueue);
});
@@ -126,16 +84,13 @@ export const createControllerAgentSession = (
yield* Effect.addFinalizer(() => closeSession());
const handleSendFailure = (reason: string) => {
logger.error(
`Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`,
);
socket.close();
void Effect.runPromise(closeSession()).catch((error) => {
return Effect.gen(function* () {
logger.error(
`Failed to close session for agent ${socket.data.agentId} on ${socket.data.id}: ${toMessage(error)}`,
`Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`,
);
yield* Effect.sync(() => socket.close());
yield* closeSession();
});
};
@@ -144,17 +99,16 @@ export const createControllerAgentSession = (
Effect.forever(
Effect.gen(function* () {
const message = yield* Queue.take(outboundQueue);
yield* Effect.sync(() => {
try {
const sendResult = socket.send(message);
if (sendResult === 0) {
handleSendFailure("connection issue");
}
} catch (error) {
handleSendFailure(toMessage(error));
}
const sendResult = yield* Effect.try({
try: () => socket.send(message),
catch: (error) => toMessage(error),
});
}),
if (sendResult === 0) {
yield* handleSendFailure("connection issue");
}
}).pipe(Effect.catchAll((reason) => handleSendFailure(reason))),
),
);
@@ -177,61 +131,18 @@ export const createControllerAgentSession = (
const handleAgentMessage = (message: AgentMessage) =>
Effect.gen(function* () {
yield* updateState((current) => ({ ...current, lastSeenAt: Date.now() }));
switch (message.type) {
case "agent.ready": {
yield* updateState((current) => ({ ...current, isReady: true }));
yield* Effect.sync(() => {
logger.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`);
});
break;
}
case "backup.started": {
yield* setTrackedBackupJob(message.payload.jobId, {
scheduleId: message.payload.scheduleId,
state: "active",
});
yield* Effect.sync(() => {
logger.info(
`Backup ${message.payload.jobId} started on agent ${socket.data.agentId} for schedule ${message.payload.scheduleId}`,
);
handlers.onBackupStarted?.(message.payload);
});
break;
}
case "backup.progress": {
yield* Effect.sync(() => {
handlers.onBackupProgress?.(message.payload);
});
break;
}
case "backup.completed": {
yield* deleteTrackedBackupJob(message.payload.jobId);
yield* Effect.sync(() => {
handlers.onBackupCompleted?.(message.payload);
});
break;
}
case "backup.failed": {
yield* deleteTrackedBackupJob(message.payload.jobId);
yield* Effect.sync(() => {
handlers.onBackupFailed?.(message.payload);
});
break;
}
case "backup.cancelled": {
yield* deleteTrackedBackupJob(message.payload.jobId);
yield* Effect.sync(() => {
handlers.onBackupCancelled?.(message.payload);
});
break;
}
case "heartbeat.pong": {
yield* updateState((current) => ({ ...current, lastPongAt: message.payload.sentAt }));
break;
}
if (message.type === "agent.ready") {
const readyAt = Date.now();
yield* updateState((current) => ({ ...current, isReady: true, lastSeenAt: readyAt }));
yield* logger.effect.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`);
}
if (message.type === "heartbeat.pong") {
const seenAt = Date.now();
yield* updateState((current) => ({ ...current, lastSeenAt: seenAt, lastPongAt: message.payload.sentAt }));
}
yield* onEvent(message);
});
return {
@@ -241,33 +152,19 @@ export const createControllerAgentSession = (
const parsed = parseAgentMessage(data);
if (parsed === null) {
yield* Effect.sync(() => {
logger.warn(`Invalid JSON from agent ${socket.data.agentId}`);
});
yield* logger.effect.warn(`Invalid JSON from agent ${socket.data.agentId}`);
return;
}
if (!parsed.success) {
yield* Effect.sync(() => {
logger.warn(`Invalid agent message from ${socket.data.agentId}: ${parsed.error.message}`);
});
yield* logger.effect.warn(`Invalid agent message from ${socket.data.agentId}: ${parsed.error.message}`);
return;
}
yield* handleAgentMessage(parsed.data);
});
},
sendBackup: (payload) => {
return Effect.gen(function* () {
const queued = yield* offerOutbound(createControllerMessage("backup.run", payload));
if (queued) {
yield* setTrackedBackupJob(payload.jobId, { scheduleId: payload.scheduleId, state: "pending" });
}
return queued;
});
},
sendBackup: (payload) => offerOutbound(createControllerMessage("backup.run", payload)),
sendBackupCancel: (payload) => offerOutbound(createControllerMessage("backup.cancel", payload)),
isReady: () => Ref.get(state).pipe(Effect.map((current) => current.isReady)),
run,

View File

@@ -11,6 +11,7 @@ export type BackupExecutionResult =
| { status: "cancelled"; message?: string };
type ActiveBackupRun = {
agentId: string;
scheduleId: number;
jobId: string;
scheduleShortId: string;

View File

@@ -1,4 +1,5 @@
import { cryptoUtils } from "~/server/utils/crypto";
import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants";
export const deriveLocalAgentToken = async () => {
return cryptoUtils.deriveSecret("zerobyte:local-agent-token");
@@ -7,6 +8,6 @@ export const deriveLocalAgentToken = async () => {
export const validateAgentToken = async (token: string) => {
const localToken = await deriveLocalAgentToken();
if (token === localToken) {
return { agentId: "local", organizationId: null, agentName: "local" };
return { agentId: LOCAL_AGENT_ID, organizationId: null, agentName: LOCAL_AGENT_NAME, agentKind: LOCAL_AGENT_KIND };
}
};

View File

@@ -5,15 +5,14 @@ import { config } from "../../core/config";
import { restic, resticDeps } from "../../core/restic";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import { agentManager, type BackupExecutionProgress } from "../agents/agents-manager";
import { LOCAL_AGENT_ID } from "../agents/constants";
import { getVolumePath } from "../volumes/helpers";
import { decryptRepositoryConfig } from "../repositories/repository-config-secrets";
import { createBackupOptions } from "./backup.helpers";
import { toErrorDetails } from "../../utils/errors";
const LOCAL_AGENT_ID = "local";
const FUSE_VOLUME_BACKENDS = new Set<Volume["type"]>(["rclone", "sftp", "webdav"]);
const IGNORE_INODE_FLAG = "--ignore-inode";
type BackupExecutionRequest = {
scheduleId: number;
schedule: BackupSchedule;

View File

@@ -1,6 +1,7 @@
import { runDbMigrations } from "../../db/db";
import { config } from "../../core/config";
import { startAgentController, startLocalAgent, stopAgentController, stopLocalAgent } from "../agents/agents-manager";
import { agentsService } from "../agents/agents.service";
import { runMigrations } from "./migrations";
import { startup } from "./startup";
@@ -9,6 +10,7 @@ let bootstrapPromise: Promise<void> | undefined;
const runBootstrap = async () => {
await runDbMigrations();
await runMigrations();
await agentsService.ensureLocalAgent();
try {
await startAgentController();

View File

@@ -1,4 +1,5 @@
import { vi } from "vitest";
import { Effect } from "effect";
process.env.BASE_URL = "http://localhost:3000";
process.env.TRUSTED_ORIGINS = "http://localhost:3000";
@@ -13,6 +14,12 @@ vi.mock(import("@zerobyte/core/node"), async () => {
info: () => {},
warn: () => {},
error: () => {},
effect: {
debug: () => Effect.void,
info: () => Effect.void,
warn: () => Effect.void,
error: () => Effect.void,
},
},
};
});

View File

@@ -21,7 +21,7 @@ export const handleBackupRunCommand = (context: ControllerCommandContext, payloa
return;
}
logger.info(`Starting backup ${payload.jobId} for schedule ${payload.scheduleId}`);
yield* logger.effect.info(`Starting backup ${payload.jobId} for schedule ${payload.scheduleId}`);
const abortController = new AbortController();
yield* context.setRunningJob(payload.jobId, { scheduleId: payload.scheduleId, abortController });

View File

@@ -2,6 +2,7 @@ import { format } from "date-fns";
import { createConsola, type ConsolaReporter } from "consola";
import { formatWithOptions } from "node:util";
import { sanitizeSensitiveData } from "../utils/sanitize";
import { Effect } from "effect";
type LogLevel = "debug" | "info" | "warn" | "error";
@@ -102,4 +103,10 @@ export const logger = {
info: (...messages: unknown[]) => consola.info(formatMessages(messages).join(" ")),
warn: (...messages: unknown[]) => consola.warn(formatMessages(messages).join(" ")),
error: (...messages: unknown[]) => consola.error(formatMessages(messages).join(" ")),
effect: {
debug: (...messages: unknown[]) => Effect.sync(() => consola.debug(formatMessages(messages).join(" "))),
info: (...messages: unknown[]) => Effect.sync(() => consola.info(formatMessages(messages).join(" "))),
warn: (...messages: unknown[]) => Effect.sync(() => consola.warn(formatMessages(messages).join(" "))),
error: (...messages: unknown[]) => Effect.sync(() => consola.error(formatMessages(messages).join(" "))),
},
};

View File

@@ -1,4 +1,5 @@
import { vi } from "vitest";
import { Effect } from "effect";
vi.mock(import("../src/node/logger.ts"), () => ({
logger: {
@@ -6,5 +7,11 @@ vi.mock(import("../src/node/logger.ts"), () => ({
info: () => {},
warn: () => {},
error: () => {},
effect: {
debug: () => Effect.void,
info: () => Effect.void,
warn: () => Effect.void,
error: () => Effect.void,
},
},
}));