fix: handle socket message send failures

This commit is contained in:
Nicolas Meienberger
2026-03-30 20:58:11 +02:00
parent 3162cba8b2
commit e459606436
2 changed files with 113 additions and 45 deletions

View File

@@ -1,19 +1,26 @@
import { expect, mock, test } from "bun:test";
import waitForExpect from "wait-for-expect";
import { createAgentMessage } from "@zerobyte/contracts/agent-protocol";
import { createControllerAgentSession } from "../controller-agent-session";
const createSocket = () => {
const createSocket = (send = mock(() => 1)) => {
const close = mock(() => undefined);
return {
data: { id: "connection-1", agentId: "local", organizationId: null, agentName: "Local Agent" },
send: mock(() => undefined),
} as unknown as Parameters<typeof createControllerAgentSession>[0];
send,
close,
};
};
test("close emits a synthetic backup.cancelled for a started backup", () => {
const onBackupCancelled = mock(() => undefined);
const session = createControllerAgentSession(createSocket(), {
onBackupCancelled,
});
const session = createControllerAgentSession(
createSocket() as unknown as Parameters<typeof createControllerAgentSession>[0],
{
onBackupCancelled,
},
);
session.handleMessage(
createAgentMessage("backup.started", {
@@ -68,9 +75,12 @@ test("close does not emit a synthetic backup.cancelled after a terminal event",
},
]) {
const onBackupCancelled = mock(() => undefined);
const session = createControllerAgentSession(createSocket(), {
onBackupCancelled,
});
const session = createControllerAgentSession(
createSocket() as unknown as Parameters<typeof createControllerAgentSession>[0],
{
onBackupCancelled,
},
);
session.handleMessage(
createAgentMessage("backup.started", {
@@ -87,9 +97,12 @@ test("close does not emit a synthetic backup.cancelled after a terminal event",
test("close emits a synthetic backup.cancelled for a queued backup", () => {
const onBackupCancelled = mock(() => undefined);
const session = createControllerAgentSession(createSocket(), {
onBackupCancelled,
});
const session = createControllerAgentSession(
createSocket() as unknown as Parameters<typeof createControllerAgentSession>[0],
{
onBackupCancelled,
},
);
session.sendBackup({
jobId: "job-queued",
@@ -119,3 +132,38 @@ test("close emits a synthetic backup.cancelled for a queued backup", () => {
"The connection to the backup agent was lost before this backup started. Restart the backup to ensure it completes.",
});
});
test("a dropped backup.cancel closes the session and emits a synthetic backup.cancelled", async () => {
const send = mock(() => 0);
const socket = createSocket(send);
const onBackupCancelled = mock(() => undefined);
const session = createControllerAgentSession(
socket as unknown as Parameters<typeof createControllerAgentSession>[0],
{
onBackupCancelled,
},
);
session.handleMessage(
createAgentMessage("backup.started", {
jobId: "job-1",
scheduleId: "schedule-1",
}),
);
session.sendBackupCancel({
jobId: "job-1",
scheduleId: "schedule-1",
});
await waitForExpect(() => {
expect(send).toHaveBeenCalledTimes(1);
expect(socket.close).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledTimes(1);
expect(onBackupCancelled).toHaveBeenCalledWith({
jobId: "job-1",
scheduleId: "schedule-1",
message:
"The connection to the backup agent was lost while this backup was running. Restart the backup to ensure it completes.",
});
});
});

View File

@@ -51,6 +51,7 @@ export const createControllerAgentSession = (
socket: AgentSocket,
handlers: ControllerAgentSessionHandlers = {},
): ControllerAgentSession => {
let isClosed = false;
const outboundQueue = Effect.runSync(Queue.bounded<ControllerWireMessage>(64));
const activeBackupJobs = Effect.runSync(Ref.make<Map<string, string>>(new Map()));
const pendingBackupJobs = Effect.runSync(Ref.make<Map<string, string>>(new Map()));
@@ -112,17 +113,62 @@ export const createControllerAgentSession = (
);
};
const closeSession = () => {
if (isClosed) {
return;
}
isClosed = true;
updateState((current) => ({ ...current, isReady: false }));
const pendingJobs = Effect.runSync(Ref.get(pendingBackupJobs));
Effect.runSync(Ref.set(pendingBackupJobs, new Map()));
for (const [jobId, scheduleId] of pendingJobs) {
handlers.onBackupCancelled?.({
jobId,
scheduleId,
message:
"The connection to the backup agent was lost before this backup started. Restart the backup to ensure it completes.",
});
}
const activeJobs = Effect.runSync(Ref.get(activeBackupJobs));
Effect.runSync(Ref.set(activeBackupJobs, new Map()));
for (const [jobId, scheduleId] of activeJobs) {
handlers.onBackupCancelled?.({
jobId,
scheduleId,
message:
"The connection to the backup agent was lost while this backup was running. Restart the backup to ensure it completes.",
});
}
void Effect.runPromise(Fiber.interrupt(writerFiber)).catch(() => {});
void Effect.runPromise(Fiber.interrupt(heartbeatFiber)).catch(() => {});
void Effect.runPromise(Queue.shutdown(outboundQueue)).catch(() => {});
};
const handleSendFailure = (reason: string) => {
logger.error(
`Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`,
);
try {
socket.close();
} catch (error) {
logger.error(`Failed to close socket for agent ${socket.data.agentId} on ${socket.data.id}: ${toMessage(error)}`);
}
closeSession();
};
const writerFiber = Effect.runFork(
Effect.forever(
Effect.gen(function* () {
const message = yield* Queue.take(outboundQueue);
yield* Effect.sync(() => {
try {
socket.send(message);
} catch (error) {
logger.error(
`Failed to send message to agent ${socket.data.agentId} on ${socket.data.id}: ${toMessage(error)}`,
);
const sendResult = socket.send(message);
if (sendResult <= 0) {
handleSendFailure(sendResult === 0 ? "connection issue" : "backpressure");
}
});
}),
@@ -216,32 +262,6 @@ export const createControllerAgentSession = (
offerOutbound(createControllerMessage("backup.cancel", payload));
},
isReady: () => Effect.runSync(Ref.get(state)).isReady,
close: () => {
updateState((current) => ({ ...current, isReady: false }));
const pendingJobs = Effect.runSync(Ref.get(pendingBackupJobs));
Effect.runSync(Ref.set(pendingBackupJobs, new Map()));
for (const [jobId, scheduleId] of pendingJobs) {
handlers.onBackupCancelled?.({
jobId,
scheduleId,
message:
"The connection to the backup agent was lost before this backup started. Restart the backup to ensure it completes.",
});
}
const activeJobs = Effect.runSync(Ref.get(activeBackupJobs));
Effect.runSync(Ref.set(activeBackupJobs, new Map()));
for (const [jobId, scheduleId] of activeJobs) {
handlers.onBackupCancelled?.({
jobId,
scheduleId,
message:
"The connection to the backup agent was lost while this backup was running. Restart the backup to ensure it completes.",
});
}
void Effect.runPromise(Fiber.interrupt(writerFiber)).catch(() => {});
void Effect.runPromise(Fiber.interrupt(heartbeatFiber)).catch(() => {});
void Effect.runPromise(Queue.shutdown(outboundQueue)).catch(() => {});
},
close: closeSession,
};
};