diff --git a/app/server/modules/agents/__tests__/controller-agent-session.test.ts b/app/server/modules/agents/__tests__/controller-agent-session.test.ts index c2a35dd5..a497267c 100644 --- a/app/server/modules/agents/__tests__/controller-agent-session.test.ts +++ b/app/server/modules/agents/__tests__/controller-agent-session.test.ts @@ -1,19 +1,26 @@ import { expect, mock, test } from "bun:test"; +import waitForExpect from "wait-for-expect"; import { createAgentMessage } from "@zerobyte/contracts/agent-protocol"; import { createControllerAgentSession } from "../controller-agent-session"; -const createSocket = () => { +const createSocket = (send = mock(() => 1)) => { + const close = mock(() => undefined); + return { data: { id: "connection-1", agentId: "local", organizationId: null, agentName: "Local Agent" }, - send: mock(() => undefined), - } as unknown as Parameters[0]; + send, + close, + }; }; test("close emits a synthetic backup.cancelled for a started backup", () => { const onBackupCancelled = mock(() => undefined); - const session = createControllerAgentSession(createSocket(), { - onBackupCancelled, - }); + const session = createControllerAgentSession( + createSocket() as unknown as Parameters[0], + { + onBackupCancelled, + }, + ); session.handleMessage( createAgentMessage("backup.started", { @@ -68,9 +75,12 @@ test("close does not emit a synthetic backup.cancelled after a terminal event", }, ]) { const onBackupCancelled = mock(() => undefined); - const session = createControllerAgentSession(createSocket(), { - onBackupCancelled, - }); + const session = createControllerAgentSession( + createSocket() as unknown as Parameters[0], + { + onBackupCancelled, + }, + ); session.handleMessage( createAgentMessage("backup.started", { @@ -87,9 +97,12 @@ test("close does not emit a synthetic backup.cancelled after a terminal event", test("close emits a synthetic backup.cancelled for a queued backup", () => { const onBackupCancelled = mock(() => undefined); - const session = createControllerAgentSession(createSocket(), { - onBackupCancelled, - }); + const session = createControllerAgentSession( + createSocket() as unknown as Parameters[0], + { + onBackupCancelled, + }, + ); session.sendBackup({ jobId: "job-queued", @@ -119,3 +132,38 @@ test("close emits a synthetic backup.cancelled for a queued backup", () => { "The connection to the backup agent was lost before this backup started. Restart the backup to ensure it completes.", }); }); + +test("a dropped backup.cancel closes the session and emits a synthetic backup.cancelled", async () => { + const send = mock(() => 0); + const socket = createSocket(send); + const onBackupCancelled = mock(() => undefined); + const session = createControllerAgentSession( + socket as unknown as Parameters[0], + { + onBackupCancelled, + }, + ); + + session.handleMessage( + createAgentMessage("backup.started", { + jobId: "job-1", + scheduleId: "schedule-1", + }), + ); + session.sendBackupCancel({ + jobId: "job-1", + scheduleId: "schedule-1", + }); + + await waitForExpect(() => { + expect(send).toHaveBeenCalledTimes(1); + expect(socket.close).toHaveBeenCalledTimes(1); + expect(onBackupCancelled).toHaveBeenCalledTimes(1); + expect(onBackupCancelled).toHaveBeenCalledWith({ + jobId: "job-1", + scheduleId: "schedule-1", + message: + "The connection to the backup agent was lost while this backup was running. Restart the backup to ensure it completes.", + }); + }); +}); diff --git a/app/server/modules/agents/controller-agent-session.ts b/app/server/modules/agents/controller-agent-session.ts index 900aee72..50ead28b 100644 --- a/app/server/modules/agents/controller-agent-session.ts +++ b/app/server/modules/agents/controller-agent-session.ts @@ -51,6 +51,7 @@ export const createControllerAgentSession = ( socket: AgentSocket, handlers: ControllerAgentSessionHandlers = {}, ): ControllerAgentSession => { + let isClosed = false; const outboundQueue = Effect.runSync(Queue.bounded(64)); const activeBackupJobs = Effect.runSync(Ref.make>(new Map())); const pendingBackupJobs = Effect.runSync(Ref.make>(new Map())); @@ -112,17 +113,62 @@ export const createControllerAgentSession = ( ); }; + const closeSession = () => { + if (isClosed) { + return; + } + + isClosed = true; + updateState((current) => ({ ...current, isReady: false })); + const pendingJobs = Effect.runSync(Ref.get(pendingBackupJobs)); + Effect.runSync(Ref.set(pendingBackupJobs, new Map())); + + for (const [jobId, scheduleId] of pendingJobs) { + handlers.onBackupCancelled?.({ + jobId, + scheduleId, + message: + "The connection to the backup agent was lost before this backup started. Restart the backup to ensure it completes.", + }); + } + + const activeJobs = Effect.runSync(Ref.get(activeBackupJobs)); + Effect.runSync(Ref.set(activeBackupJobs, new Map())); + for (const [jobId, scheduleId] of activeJobs) { + handlers.onBackupCancelled?.({ + jobId, + scheduleId, + message: + "The connection to the backup agent was lost while this backup was running. Restart the backup to ensure it completes.", + }); + } + void Effect.runPromise(Fiber.interrupt(writerFiber)).catch(() => {}); + void Effect.runPromise(Fiber.interrupt(heartbeatFiber)).catch(() => {}); + void Effect.runPromise(Queue.shutdown(outboundQueue)).catch(() => {}); + }; + + const handleSendFailure = (reason: string) => { + logger.error( + `Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`, + ); + + try { + socket.close(); + } catch (error) { + logger.error(`Failed to close socket for agent ${socket.data.agentId} on ${socket.data.id}: ${toMessage(error)}`); + } + + closeSession(); + }; + const writerFiber = Effect.runFork( Effect.forever( Effect.gen(function* () { const message = yield* Queue.take(outboundQueue); yield* Effect.sync(() => { - try { - socket.send(message); - } catch (error) { - logger.error( - `Failed to send message to agent ${socket.data.agentId} on ${socket.data.id}: ${toMessage(error)}`, - ); + const sendResult = socket.send(message); + if (sendResult <= 0) { + handleSendFailure(sendResult === 0 ? "connection issue" : "backpressure"); } }); }), @@ -216,32 +262,6 @@ export const createControllerAgentSession = ( offerOutbound(createControllerMessage("backup.cancel", payload)); }, isReady: () => Effect.runSync(Ref.get(state)).isReady, - close: () => { - updateState((current) => ({ ...current, isReady: false })); - const pendingJobs = Effect.runSync(Ref.get(pendingBackupJobs)); - Effect.runSync(Ref.set(pendingBackupJobs, new Map())); - for (const [jobId, scheduleId] of pendingJobs) { - handlers.onBackupCancelled?.({ - jobId, - scheduleId, - message: - "The connection to the backup agent was lost before this backup started. Restart the backup to ensure it completes.", - }); - } - - const activeJobs = Effect.runSync(Ref.get(activeBackupJobs)); - Effect.runSync(Ref.set(activeBackupJobs, new Map())); - for (const [jobId, scheduleId] of activeJobs) { - handlers.onBackupCancelled?.({ - jobId, - scheduleId, - message: - "The connection to the backup agent was lost while this backup was running. Restart the backup to ensure it completes.", - }); - } - void Effect.runPromise(Fiber.interrupt(writerFiber)).catch(() => {}); - void Effect.runPromise(Fiber.interrupt(heartbeatFiber)).catch(() => {}); - void Effect.runPromise(Queue.shutdown(outboundQueue)).catch(() => {}); - }, + close: closeSession, }; };