feat(runtime): start and ship the local agent (#767)

* feat(runtime): start and ship the local agent

* refactor: gate local agent behind feature flag

* chore: skip agent manager if flag is false

* fix: hot reload agents

* test: fix config tests
This commit is contained in:
Nico
2026-04-10 00:00:30 +02:00
committed by GitHub
parent 863fbfc5cc
commit 28ba8657f9
19 changed files with 174 additions and 37 deletions

View File

@@ -11,6 +11,7 @@
!**/components.json
!app/**
!apps/agent/**
!packages/**
!public/**

View File

@@ -64,6 +64,8 @@ COPY --from=deps /deps/shoutrrr /usr/local/bin/shoutrrr
COPY ./package.json ./bun.lock ./
COPY ./packages/core/package.json ./packages/core/package.json
COPY ./packages/contracts/package.json ./packages/contracts/package.json
COPY ./apps/agent/package.json ./apps/agent/package.json
RUN bun install --frozen-lockfile --ignore-scripts
@@ -86,11 +88,14 @@ WORKDIR /app
COPY ./package.json ./bun.lock ./
COPY ./packages/core/package.json ./packages/core/package.json
COPY ./packages/contracts/package.json ./packages/contracts/package.json
COPY ./apps/agent/package.json ./apps/agent/package.json
RUN bun install --frozen-lockfile
COPY . .
RUN bun run build
RUN bun build apps/agent/src/index.ts --outfile .output/agent/index.mjs --target bun
FROM base AS production

View File

@@ -58,7 +58,7 @@ export const createApp = () => {
limit: 1000,
keyGenerator: (c) => c.req.header("x-forwarded-for") ?? "",
skip: () => {
return config.disableRateLimiting;
return config.flags.disableRateLimiting;
},
}),
);

View File

@@ -63,11 +63,14 @@ describe("parseConfig", () => {
appVersion: "1.2.3",
trustedOrigins: ["https://admin.example.com", "http://localhost:3000", "https://example.com"],
trustProxy: true,
disableRateLimiting: true,
appSecret: validAppSecret,
baseUrl: "https://example.com",
isSecure: true,
enableDevPanel: true,
flags: {
disableRateLimiting: true,
enableDevPanel: true,
enableLocalAgent: false,
},
provisioningPath: "/tmp/provisioning",
allowedHosts: ["example.com", "admin.example.com", "localhost:3000"],
});

View File

@@ -43,6 +43,7 @@ const envSchema = z
APP_SECRET_FILE: z.string().optional(),
BASE_URL: z.string(),
ENABLE_DEV_PANEL: z.string().default("false"),
ENABLE_LOCAL_AGENT: z.string().default("false"),
PROVISIONING_PATH: z.string().optional(),
})
.transform((s, ctx) => {
@@ -125,11 +126,14 @@ const envSchema = z
appVersion: s.APP_VERSION,
trustedOrigins: trustedOrigins,
trustProxy: s.TRUST_PROXY === "true",
disableRateLimiting: s.DISABLE_RATE_LIMITING === "true" || s.NODE_ENV === "test",
appSecret: appSecret ?? "",
baseUrl,
isSecure: baseUrl.startsWith("https://"),
enableDevPanel: s.ENABLE_DEV_PANEL === "true",
flags: {
disableRateLimiting: s.DISABLE_RATE_LIMITING === "true" || s.NODE_ENV === "test",
enableDevPanel: s.ENABLE_DEV_PANEL === "true",
enableLocalAgent: s.ENABLE_LOCAL_AGENT === "true",
},
provisioningPath: s.PROVISIONING_PATH,
allowedHosts,
};

View File

@@ -30,13 +30,13 @@ export const auth = betterAuth({
},
trustedOrigins: config.trustedOrigins,
rateLimit: {
enabled: !config.disableRateLimiting,
enabled: !config.flags.disableRateLimiting,
},
advanced: {
cookiePrefix: "zerobyte",
useSecureCookies: config.isSecure,
ipAddress: {
disableIpTracking: config.disableRateLimiting,
disableIpTracking: config.flags.disableRateLimiting,
},
},
onAPIError: {

View File

@@ -1,11 +1,11 @@
import type { ChildProcess } from "node:child_process";
import { createAgentManagerRuntime, type AgentManagerRuntime } from "./controller/server";
import type { AgentManagerRuntime } from "./controller/server";
import { spawnLocalAgentProcess, stopLocalAgentProcess } from "./local/process";
export type { AgentBackupEventHandlers } from "./controller/server";
type AgentRuntimeState = {
agentManager: AgentManagerRuntime;
agentManager: AgentManagerRuntime | null;
localAgent: ChildProcess | null;
};
@@ -22,7 +22,7 @@ const getAgentRuntimeState = () => {
}
const runtime = {
agentManager: createAgentManagerRuntime(),
agentManager: null,
localAgent: null,
};
@@ -32,6 +32,20 @@ const getAgentRuntimeState = () => {
const getAgentManagerRuntime = () => getAgentRuntimeState().agentManager;
export const startAgentRuntime = async () => {
const runtime = getAgentRuntimeState();
if (runtime.agentManager) {
await runtime.agentManager.stop();
}
const { createAgentManagerRuntime } = await import("./controller/server");
const agentManager = createAgentManagerRuntime();
await agentManager.start();
runtime.agentManager = agentManager;
};
export const spawnLocalAgent = async () => {
await spawnLocalAgentProcess(getAgentRuntimeState());
};
@@ -40,9 +54,7 @@ export const stopLocalAgent = async () => {
await stopLocalAgentProcess(getAgentRuntimeState());
};
export const agentManager = getAgentManagerRuntime();
export const stopAgentRuntime = async () => {
await getAgentManagerRuntime().stop();
await getAgentManagerRuntime()?.stop();
await stopLocalAgent();
};

View File

@@ -47,18 +47,19 @@ export function createAgentManagerRuntime() {
let backupHandlers: AgentBackupEventHandlers = {};
let runtimeScope: Scope.CloseableScope | null = null;
const closeSession = (sessionHandle: ControllerAgentSessionHandle) => {
Effect.runSync(Fiber.interrupt(sessionHandle.runFiber));
Effect.runSync(Scope.close(sessionHandle.scope, Exit.succeed(undefined)));
};
const closeSession = (sessionHandle: ControllerAgentSessionHandle) =>
Effect.gen(function* () {
yield* Fiber.interrupt(sessionHandle.runFiber);
yield* Scope.close(sessionHandle.scope, Exit.succeed(undefined));
});
const closeAllSessions = () => {
const closeAllSessions = Effect.gen(function* () {
const currentSessions = sessions;
sessions = new Map();
for (const sessionHandle of currentSessions.values()) {
closeSession(sessionHandle);
yield* closeSession(sessionHandle);
}
};
});
const getSessionHandle = (agentId: string) => sessions.get(agentId);
@@ -105,7 +106,9 @@ export function createAgentManagerRuntime() {
const setSession = (agentId: string, sessionHandle: ControllerAgentSessionHandle) => {
const existingSession = getSessionHandle(agentId);
if (existingSession) {
closeSession(existingSession);
void Effect.runPromise(closeSession(existingSession)).catch((error) => {
logger.error(`Failed to close existing agent session for ${agentId}: ${toMessage(error)}`);
});
}
sessions.set(agentId, sessionHandle);
@@ -118,7 +121,9 @@ export function createAgentManagerRuntime() {
}
sessions.delete(agentId);
closeSession(sessionHandle);
void Effect.runPromise(closeSession(sessionHandle)).catch((error) => {
logger.error(`Failed to close agent session for ${agentId}: ${toMessage(error)}`);
});
};
const acquireServer = Effect.acquireRelease(
@@ -180,7 +185,7 @@ export function createAgentManagerRuntime() {
}),
),
(server) =>
Effect.sync(closeAllSessions).pipe(
closeAllSessions.pipe(
Effect.andThen(
Effect.tryPromise({
try: () => server.stop(true),

View File

@@ -12,6 +12,10 @@ type LocalAgentState = {
export async function spawnLocalAgentProcess(runtime: LocalAgentState) {
await stopLocalAgentProcess(runtime);
if (!config.flags.enableLocalAgent) {
return;
}
const sourceEntryPoint = path.join(process.cwd(), "apps", "agent", "src", "index.ts");
const productionEntryPoint = path.join(process.cwd(), ".output", "agent", "index.mjs");

View File

@@ -0,0 +1,58 @@
import { afterEach, describe, expect, test, vi } from "vitest";
import { Scheduler } from "../../../core/scheduler";
import * as backendModule from "../../backends/backend";
import type { VolumeBackend } from "../../backends/backend";
import * as bootstrapModule from "../bootstrap";
import { createTestVolume } from "~/test/helpers/volume";
const loadShutdownModule = async () => {
const moduleUrl = new URL("../shutdown.ts", import.meta.url);
moduleUrl.searchParams.set("test", crypto.randomUUID());
return import(moduleUrl.href);
};
afterEach(() => {
vi.restoreAllMocks();
});
describe("shutdown", () => {
test("stops the agent runtime before unmounting mounted volumes", async () => {
const events: string[] = [];
const stopScheduler = vi.fn(async () => {
events.push("scheduler.stop");
});
const stopApplicationRuntime = vi.fn(async () => {
events.push("agents.stop");
});
const unmountVolume = vi.fn(async () => {
events.push("backend.unmount");
return { status: "unmounted" as const };
});
await createTestVolume({
name: "Shutdown test volume",
config: {
backend: "directory",
path: "/Applications",
},
status: "mounted",
});
vi.spyOn(Scheduler, "stop").mockImplementation(stopScheduler);
vi.spyOn(bootstrapModule, "stopApplicationRuntime").mockImplementation(stopApplicationRuntime);
vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(
() =>
({
mount: async () => ({ status: "mounted" as const }),
unmount: unmountVolume,
checkHealth: async () => ({ status: "mounted" as const }),
}) satisfies VolumeBackend,
);
const { shutdown } = await loadShutdownModule();
await shutdown();
expect(events).toEqual(["scheduler.stop", "agents.stop", "backend.unmount"]);
});
});

View File

@@ -1,4 +1,5 @@
import { runDbMigrations } from "../../db/db";
import { spawnLocalAgent, startAgentRuntime, stopAgentRuntime } from "../agents/agents-manager";
import { runMigrations } from "./migrations";
import { startup } from "./startup";
@@ -7,6 +8,10 @@ let bootstrapPromise: Promise<void> | undefined;
const runBootstrap = async () => {
await runDbMigrations();
await runMigrations();
if (process.env.ENABLE_LOCAL_AGENT === "true") {
await startAgentRuntime();
await spawnLocalAgent();
}
await startup();
};
@@ -22,3 +27,11 @@ export const bootstrapApplication = async () => {
throw err;
}
};
export const stopApplicationRuntime = async () => {
try {
await stopAgentRuntime();
} finally {
bootstrapPromise = undefined;
}
};

View File

@@ -2,9 +2,11 @@ import { Scheduler } from "../../core/scheduler";
import { db } from "../../db/db";
import { logger } from "@zerobyte/core/node";
import { createVolumeBackend } from "../backends/backend";
import { stopApplicationRuntime } from "./bootstrap";
export const shutdown = async () => {
await Scheduler.stop();
await stopApplicationRuntime();
const volumes = await db.query.volumesTable.findMany({
where: { status: "mounted" },

View File

@@ -107,7 +107,7 @@ const setRegistrationEnabled = async (enabled: boolean) => {
logger.info(`Registration enabled set to: ${enabled}`);
};
const isDevPanelEnabled = () => config.enableDevPanel;
const isDevPanelEnabled = () => config.flags.enableDevPanel;
export const systemService = {
getSystemInfo,

View File

@@ -1,9 +1,11 @@
import { definePlugin } from "nitro";
import { bootstrapApplication } from "../modules/lifecycle/bootstrap";
import { bootstrapApplication, stopApplicationRuntime } from "../modules/lifecycle/bootstrap";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "../utils/errors";
export default definePlugin(async () => {
export default definePlugin(async (nitroApp) => {
nitroApp.hooks.hook("close", stopApplicationRuntime);
await bootstrapApplication().catch((err) => {
logger.error(`Bootstrap failed: ${toMessage(err)}`);
process.exit(1);

View File

@@ -4,7 +4,8 @@
"type": "module",
"module": "src/index.ts",
"scripts": {
"tsc": "tsc --noEmit"
"tsc": "tsc --noEmit",
"test": "bunx --bun vitest run --config ./vitest.config.ts"
},
"dependencies": {
"@zerobyte/contracts": "workspace:*",

View File

@@ -1,4 +1,4 @@
import { afterEach, expect, mock, spyOn, test } from "bun:test";
import { afterEach, expect, test, vi } from "vitest";
import { Effect } from "effect";
import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
@@ -7,11 +7,11 @@ import * as resticServer from "@zerobyte/core/restic/server";
import { createControllerSession } from "../controller-session";
afterEach(() => {
mock.restore();
vi.restoreAllMocks();
});
test("emits backup.failed when a backup command hits a restic error", async () => {
spyOn(resticServer, "createRestic").mockReturnValue(
vi.spyOn(resticServer, "createRestic").mockReturnValue(
fromPartial({
backup: () => Effect.fail("source path missing"),
}),

View File

@@ -1,4 +1,4 @@
import { afterEach, expect, mock, spyOn, test } from "bun:test";
import { afterEach, expect, test, vi } from "vitest";
import { Effect } from "effect";
import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
@@ -18,17 +18,19 @@ const createDeferred = <T>() => {
};
afterEach(() => {
mock.restore();
vi.restoreAllMocks();
});
test("waits for running-job registration before returning to the processor loop", async () => {
const outboundMessages: string[] = [];
const runningJobs = new Map<string, RunningJob>();
const setRunningJobGate = createDeferred<void>();
const processorLoopGate = createDeferred<void>();
const commandCompleted = createDeferred<void>();
const backupGate = createDeferred<{ exitCode: number; result: null; warningDetails: null }>();
let registeredAbortController: AbortController | undefined;
spyOn(resticServer, "createRestic").mockReturnValue(
vi.spyOn(resticServer, "createRestic").mockReturnValue(
fromPartial({
backup: () =>
Effect.async<{ exitCode: number; result: null; warningDetails: null }, never>((resume) => {
@@ -82,11 +84,21 @@ test("waits for running-job registration before returning to the processor loop"
scheduleId: "schedule-1",
});
const runPromise = Effect.runPromise(handleBackupRunCommand(context, runPayload));
const processorLoopPromise = Effect.runPromise(
Effect.gen(function* () {
yield* handleBackupRunCommand(context, runPayload);
commandCompleted.resolve(undefined);
yield* Effect.async<void, never>((resume) => {
void processorLoopGate.promise.then(() => {
resume(Effect.void);
});
});
}),
);
try {
const returnedBeforeRegistration = await Promise.race([
runPromise.then(() => true),
commandCompleted.promise.then(() => true),
new Promise<false>((resolve) => {
setTimeout(() => resolve(false), 0);
}),
@@ -95,7 +107,7 @@ test("waits for running-job registration before returning to the processor loop"
expect(returnedBeforeRegistration).toBe(false);
setRunningJobGate.resolve(undefined);
await runPromise;
await commandCompleted.promise;
await Effect.runPromise(handleBackupCancelCommand(context, cancelPayload));
expect(registeredAbortController?.signal.aborted).toBe(true);
@@ -111,7 +123,9 @@ test("waits for running-job registration before returning to the processor loop"
expect(runningJobs.has("job-1")).toBe(false);
});
} finally {
processorLoopGate.resolve(undefined);
setRunningJobGate.resolve(undefined);
backupGate.resolve({ exitCode: 0, result: null, warningDetails: null });
await processorLoopPromise;
}
});

View File

@@ -0,0 +1,13 @@
import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
environment: "node",
server: {
deps: {
inline: ["zod"],
},
},
include: ["src/**/*.test.ts", "src/**/*.spec.ts"],
},
});

View File

@@ -27,7 +27,7 @@
"studio": "drizzle-kit studio",
"test:server": "dotenv -e .env.test -- bunx --bun vitest run --project server",
"test:client": "dotenv -e .env.test -- bunx --bun vitest run --project client",
"test": "dotenv -e .env.test -- bunx --bun vitest run && vp run -r test",
"test": "dotenv -e .env.test -- bunx --bun vitest run && vp run -F './apps/*' -F './packages/*' test",
"test:e2e": "NODE_ENV=test dotenv -e .env.local -- playwright test",
"test:e2e:ui": "NODE_ENV=test dotenv -e .env.local -- playwright test --ui",
"test:codegen": "playwright codegen localhost:4096"