Files
zerobyte/app/server/modules/agents/controller/session.ts
Nico 33601dde24 feat(agent): add standalone agent runtime (#776)
* feat(agent): add standalone agent runtime

* fix(agent): add Bun and DOM types to agent tsconfig

* refactor: wrap backup error in a tagged effect error

* feat(controller): add agent manager and session handling

* feat(backups): execute backups through the agent

* fix(agent): harden disconnect and send-failure handling

* fix: rebase conflicts

* test: simplify mocks

* refactor: split agent runtime state

* fix(backup): keep old path when agent is disabled

* fix: pr feedbacks
2026-04-13 23:29:10 +02:00

276 lines
7.8 KiB
TypeScript

import { Effect, Queue, Ref, type Scope } from "effect";
import {
createControllerMessage,
parseAgentMessage,
type AgentMessage,
type BackupCancelPayload,
type BackupCancelledPayload,
type BackupCompletedPayload,
type BackupFailedPayload,
type BackupProgressPayload,
type BackupRunPayload,
type BackupStartedPayload,
type ControllerWireMessage,
} from "@zerobyte/contracts/agent-protocol";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
export type AgentConnectionData = {
id: string;
agentId: string;
organizationId: string | null;
agentName: string;
};
type AgentSocket = Bun.ServerWebSocket<AgentConnectionData>;
type SessionState = {
isReady: boolean;
lastSeenAt: number | null;
lastPongAt: number | null;
};
type TrackedBackupJob = {
scheduleId: string;
state: "pending" | "active";
};
type ControllerAgentSessionHandlers = {
onBackupStarted?: (payload: BackupStartedPayload) => void;
onBackupProgress?: (payload: BackupProgressPayload) => void;
onBackupCompleted?: (payload: BackupCompletedPayload) => void;
onBackupFailed?: (payload: BackupFailedPayload) => void;
onBackupCancelled?: (payload: BackupCancelledPayload) => void;
};
export type ControllerAgentSession = {
readonly connectionId: string;
handleMessage: (data: string) => Effect.Effect<void>;
sendBackup: (payload: BackupRunPayload) => Effect.Effect<boolean>;
sendBackupCancel: (payload: BackupCancelPayload) => Effect.Effect<boolean>;
isReady: () => Effect.Effect<boolean>;
run: Effect.Effect<void, never, Scope.Scope>;
};
export const createControllerAgentSession = (
socket: AgentSocket,
handlers: ControllerAgentSessionHandlers = {},
): Effect.Effect<ControllerAgentSession, never, Scope.Scope> =>
Effect.gen(function* () {
let isClosed = false;
const outboundQueue = yield* Queue.bounded<ControllerWireMessage>(64);
const trackedBackupJobs = yield* Ref.make<Map<string, TrackedBackupJob>>(new Map());
const state = yield* Ref.make<SessionState>({
isReady: false,
lastSeenAt: null,
lastPongAt: null,
});
const offerOutbound = (message: ControllerWireMessage) =>
Queue.offer(outboundQueue, message).pipe(
Effect.catchAllCause((cause) =>
Effect.sync(() => {
logger.error(`Failed to queue outbound message for agent ${socket.data.agentId}: ${toMessage(cause)}`);
return false;
}),
),
);
const updateState = (update: (current: SessionState) => SessionState) => Ref.update(state, update);
const setTrackedBackupJob = (jobId: string, trackedBackupJob: TrackedBackupJob) => {
return Ref.update(trackedBackupJobs, (current) => {
const next = new Map(current);
next.set(jobId, trackedBackupJob);
return next;
});
};
const deleteTrackedBackupJob = (jobId: string) => {
return Ref.update(trackedBackupJobs, (current) => {
const next = new Map(current);
next.delete(jobId);
return next;
});
};
const takeTrackedBackupJobs = Ref.modify(
trackedBackupJobs,
(current) => [current, new Map<string, TrackedBackupJob>()] as const,
);
const releaseSession = Effect.gen(function* () {
yield* updateState((current) => ({ ...current, isReady: false }));
const trackedJobs = yield* takeTrackedBackupJobs;
for (const [jobId, trackedJob] of trackedJobs) {
const message = "The connection to the backup agent was lost. Restart the backup to ensure it completes.";
yield* Effect.sync(() => {
handlers.onBackupCancelled?.({ jobId, scheduleId: trackedJob.scheduleId, message });
});
}
yield* Queue.shutdown(outboundQueue);
});
const closeSession = () =>
Effect.suspend(() => {
if (isClosed) {
return Effect.sync(() => undefined);
}
isClosed = true;
return releaseSession;
});
yield* Effect.addFinalizer(() => closeSession());
const handleSendFailure = (reason: string) => {
logger.error(
`Closing session for agent ${socket.data.agentId} on ${socket.data.id} after an outbound websocket send failed: ${reason}`,
);
socket.close();
void Effect.runPromise(closeSession()).catch((error) => {
logger.error(
`Failed to close session for agent ${socket.data.agentId} on ${socket.data.id}: ${toMessage(error)}`,
);
});
};
const run = Effect.gen(function* () {
yield* Effect.forkScoped(
Effect.forever(
Effect.gen(function* () {
const message = yield* Queue.take(outboundQueue);
yield* Effect.sync(() => {
try {
const sendResult = socket.send(message);
if (sendResult === 0) {
handleSendFailure("connection issue");
}
} catch (error) {
handleSendFailure(toMessage(error));
}
});
}),
),
);
yield* Effect.forkScoped(
Effect.forever(
Effect.gen(function* () {
yield* Effect.sleep("15 seconds");
yield* Queue.offer(
outboundQueue,
createControllerMessage("heartbeat.ping", {
sentAt: Date.now(),
}),
);
}),
),
);
return yield* Effect.never;
});
const handleAgentMessage = (message: AgentMessage) =>
Effect.gen(function* () {
yield* updateState((current) => ({ ...current, lastSeenAt: Date.now() }));
switch (message.type) {
case "agent.ready": {
yield* updateState((current) => ({ ...current, isReady: true }));
yield* Effect.sync(() => {
logger.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`);
});
break;
}
case "backup.started": {
yield* setTrackedBackupJob(message.payload.jobId, {
scheduleId: message.payload.scheduleId,
state: "active",
});
yield* Effect.sync(() => {
logger.info(
`Backup ${message.payload.jobId} started on agent ${socket.data.agentId} for schedule ${message.payload.scheduleId}`,
);
handlers.onBackupStarted?.(message.payload);
});
break;
}
case "backup.progress": {
yield* Effect.sync(() => {
handlers.onBackupProgress?.(message.payload);
});
break;
}
case "backup.completed": {
yield* deleteTrackedBackupJob(message.payload.jobId);
yield* Effect.sync(() => {
handlers.onBackupCompleted?.(message.payload);
});
break;
}
case "backup.failed": {
yield* deleteTrackedBackupJob(message.payload.jobId);
yield* Effect.sync(() => {
handlers.onBackupFailed?.(message.payload);
});
break;
}
case "backup.cancelled": {
yield* deleteTrackedBackupJob(message.payload.jobId);
yield* Effect.sync(() => {
handlers.onBackupCancelled?.(message.payload);
});
break;
}
case "heartbeat.pong": {
yield* updateState((current) => ({ ...current, lastPongAt: message.payload.sentAt }));
break;
}
}
});
return {
connectionId: socket.data.id,
handleMessage: (data: string) => {
return Effect.gen(function* () {
const parsed = parseAgentMessage(data);
if (parsed === null) {
yield* Effect.sync(() => {
logger.warn(`Invalid JSON from agent ${socket.data.agentId}`);
});
return;
}
if (!parsed.success) {
yield* Effect.sync(() => {
logger.warn(`Invalid agent message from ${socket.data.agentId}: ${parsed.error.message}`);
});
return;
}
yield* handleAgentMessage(parsed.data);
});
},
sendBackup: (payload) => {
return Effect.gen(function* () {
const queued = yield* offerOutbound(createControllerMessage("backup.run", payload));
if (queued) {
yield* setTrackedBackupJob(payload.jobId, { scheduleId: payload.scheduleId, state: "pending" });
}
return queued;
});
},
sendBackupCancel: (payload) => offerOutbound(createControllerMessage("backup.cancel", payload)),
isReady: () => Ref.get(state).pipe(Effect.map((current) => current.isReady)),
run,
};
});