import { Deferred, Effect, Queue, Ref, type Scope } from "effect"; import type { AgentKind } from "../../../db/schema"; import { createControllerMessage, parseAgentMessage, type AgentMessage, type BackupCancelPayload, type BackupRunPayload, type ControllerWireMessage, type RestoreCancelPayload, type RestoreRunPayload, type VolumeCommand, type VolumeCommandResponsePayload, } from "@zerobyte/contracts/agent-protocol"; import { logger } from "@zerobyte/core/node"; import { toMessage } from "@zerobyte/core/utils"; export type AgentConnectionData = { id: string; agentId: string; organizationId: string | null; agentName: string; agentKind: AgentKind; }; type AgentSocket = Bun.ServerWebSocket; type SessionState = { isReady: boolean; lastSeenAt: number | null; lastPongAt: number | null; }; type PendingCommand = { deferred: Deferred.Deferred; description: string }; export type ControllerAgentSessionEvent = | Exclude | { type: "agent.disconnected" }; export type ControllerAgentSession = { readonly connectionId: string; handleMessage: (data: string) => Effect.Effect; sendBackup: (payload: BackupRunPayload) => Effect.Effect; sendBackupCancel: (payload: BackupCancelPayload) => Effect.Effect; sendRestore: (payload: RestoreRunPayload) => Effect.Effect; sendRestoreCancel: (payload: RestoreCancelPayload) => Effect.Effect; runVolumeCommand: (command: VolumeCommand) => Effect.Effect; isReady: () => Effect.Effect; run: Effect.Effect; }; export const createControllerAgentSession = ( socket: AgentSocket, onEvent: (event: ControllerAgentSessionEvent) => Effect.Effect, ): Effect.Effect => Effect.gen(function* () { let isClosed = false; const outboundQueue = yield* Queue.bounded(64); const pendingCommands = yield* Ref.make(new Map()); const state = yield* Ref.make({ isReady: false, lastSeenAt: null, lastPongAt: null, }); const offerOutbound = (message: ControllerWireMessage) => Queue.offer(outboundQueue, message).pipe( Effect.catchAllCause((cause) => Effect.sync(() => { logger.error( `Failed to queue outbound message for agent ${socket.data.agentId}: ${toMessage(cause)}`, ); return false; }), ), ); const updateState = (update: (current: SessionState) => SessionState) => Ref.update(state, update); const setPendingCommand = (commandId: string, pending: PendingCommand) => Ref.update(pendingCommands, (current) => new Map(current).set(commandId, pending)); const removePendingCommand = (commandId: string) => Ref.modify(pendingCommands, (current) => { const pending = current.get(commandId) ?? null; const next = new Map(current); next.delete(commandId); return [pending, next]; }); const rejectPendingCommands = Effect.gen(function* () { const pendingCommandEntries = yield* Ref.get(pendingCommands); yield* Ref.set(pendingCommands, new Map()); for (const pending of pendingCommandEntries.values()) { yield* Deferred.fail( pending.deferred, new Error(`Agent session closed before ${pending.description} completed`), ); } }); const releaseSession = Effect.gen(function* () { const disconnectedAt = Date.now(); yield* updateState((current) => ({ ...current, isReady: false, lastSeenAt: disconnectedAt })); yield* rejectPendingCommands; yield* onEvent({ type: "agent.disconnected" }); yield* Queue.shutdown(outboundQueue); }); const closeSession = () => Effect.suspend(() => { if (isClosed) { return Effect.sync(() => undefined); } isClosed = true; return releaseSession; }); yield* Effect.addFinalizer(() => closeSession()); const handleSendFailure = (reason: string) => { return Effect.gen(function* () { logger.error( `Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`, ); yield* Effect.sync(() => socket.close()); yield* closeSession(); }); }; const run = Effect.gen(function* () { yield* Effect.forkScoped( Effect.forever( Effect.gen(function* () { const message = yield* Queue.take(outboundQueue); const sendResult = yield* Effect.try({ try: () => socket.send(message), catch: (error) => toMessage(error), }); if (sendResult === 0) { yield* handleSendFailure("connection issue"); } }).pipe(Effect.catchAll((reason) => handleSendFailure(reason))), ), ); yield* Effect.forkScoped( Effect.forever( Effect.gen(function* () { yield* Effect.sleep("15 seconds"); yield* Queue.offer( outboundQueue, createControllerMessage("heartbeat.ping", { sentAt: Date.now(), }), ); }), ), ); return yield* Effect.never; }); const handleVolumeCommandResult = (payload: VolumeCommandResponsePayload) => Effect.gen(function* () { const pending = yield* removePendingCommand(payload.commandId); if (!pending) { yield* logger.effect.warn(`Received response for unknown volume command ${payload.commandId}`); return; } yield* Deferred.succeed(pending.deferred, payload); }); const handleAgentMessage = (message: AgentMessage) => Effect.gen(function* () { switch (message.type) { case "agent.ready": { const readyAt = Date.now(); yield* updateState((current) => ({ ...current, isReady: true, lastSeenAt: readyAt })); yield* logger.effect.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`); yield* onEvent(message); break; } case "heartbeat.pong": { const seenAt = Date.now(); yield* updateState((current) => ({ ...current, lastSeenAt: seenAt, lastPongAt: message.payload.sentAt, })); yield* onEvent(message); break; } case "volume.commandResult": { yield* handleVolumeCommandResult(message.payload); break; } default: { yield* onEvent(message); break; } } }); return { connectionId: socket.data.id, handleMessage: (data: string) => { return Effect.gen(function* () { const parsed = parseAgentMessage(data); if (parsed === null) { yield* logger.effect.warn(`Invalid JSON from agent ${socket.data.agentId}`); return; } if (!parsed.success) { yield* logger.effect.warn( `Invalid agent message from ${socket.data.agentId}: ${parsed.error.message}`, ); return; } yield* handleAgentMessage(parsed.data); }); }, sendBackup: (payload) => offerOutbound(createControllerMessage("backup.run", payload)), sendBackupCancel: (payload) => offerOutbound(createControllerMessage("backup.cancel", payload)), sendRestore: (payload) => offerOutbound(createControllerMessage("restore.run", payload)), sendRestoreCancel: (payload) => offerOutbound(createControllerMessage("restore.cancel", payload)), runVolumeCommand: (command) => Effect.gen(function* () { const commandId = Bun.randomUUIDv7(); const description = `volume command ${command.name}`; const deferred = yield* Deferred.make(); yield* setPendingCommand(commandId, { deferred, description }); const queued = yield* offerOutbound( createControllerMessage("volume.command", { commandId, command }), ); if (!queued) { yield* removePendingCommand(commandId); return yield* Effect.fail(new Error(`Failed to queue volume command ${command.name}`)); } return yield* Deferred.await(deferred).pipe( Effect.timeoutFail({ duration: "60 seconds", onTimeout: () => new Error(`Volume command ${command.name} timed out`), }), Effect.ensuring(removePendingCommand(commandId)), ); }), isReady: () => Ref.get(state).pipe(Effect.map((current) => current.isReady)), run, }; });