mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-06-03 21:59:36 -04:00
429 lines
13 KiB
TypeScript
429 lines
13 KiB
TypeScript
import { and, eq } from "drizzle-orm";
|
|
import { BadRequestError, InternalServerError, NotFoundError } from "http-errors-enhanced";
|
|
import { db } from "../../db/db";
|
|
import { volumesTable } from "../../db/schema";
|
|
import { toMessage } from "../../utils/errors";
|
|
import { generateShortId } from "../../utils/id";
|
|
import type { StatFs } from "../../utils/mountinfo";
|
|
import { withTimeout } from "../../utils/timeout";
|
|
import { config } from "../../core/config";
|
|
import { LOCAL_AGENT_ID } from "../agents/constants";
|
|
import { agentManager } from "../agents/agents-manager";
|
|
import type { UpdateVolumeBody } from "./volume.dto";
|
|
import { logger } from "@zerobyte/core/node";
|
|
import { serverEvents } from "../../core/events";
|
|
import type { Volume } from "../../db/schema";
|
|
import { volumeConfigSchema, type BackendConfig, type Volume as AgentVolume } from "@zerobyte/contracts/volumes";
|
|
import { Effect } from "effect";
|
|
import { getOrganizationId } from "~/server/core/request-context";
|
|
import { type ShortId } from "~/server/utils/branded";
|
|
import { decryptVolumeConfig, encryptVolumeConfig } from "./volume-config-secrets";
|
|
import type { VolumeCommand, VolumeCommandResult } from "@zerobyte/contracts/agent-protocol";
|
|
import { createVolumeBackend, getStatFs, getVolumePath } from "../../../../apps/agent/src/volume-host";
|
|
import {
|
|
browseFilesystem as browseHostFilesystem,
|
|
listVolumeFiles,
|
|
testVolumeConnection,
|
|
} from "../../../../apps/agent/src/volume-host/operations";
|
|
|
|
type EnsureHealthyVolumeResult =
|
|
| { ready: true; volume: Volume; remounted: boolean }
|
|
| { ready: false; volume: Volume; reason: string };
|
|
|
|
const listVolumes = async () => {
|
|
const organizationId = getOrganizationId();
|
|
const volumes = await db.query.volumesTable.findMany({
|
|
where: { organizationId: organizationId },
|
|
orderBy: { id: "asc" },
|
|
});
|
|
|
|
return volumes;
|
|
};
|
|
|
|
const findVolume = async (shortId: ShortId) => {
|
|
const organizationId = getOrganizationId();
|
|
return await db.query.volumesTable.findFirst({
|
|
where: {
|
|
AND: [{ shortId: { eq: shortId } }, { organizationId: organizationId }],
|
|
},
|
|
});
|
|
};
|
|
|
|
const runVolumeCommand = async <TCommand extends VolumeCommand>(agentId: string, command: TCommand) => {
|
|
const result = await agentManager.runVolumeCommand(agentId, command);
|
|
if (result.name !== command.name) {
|
|
throw new InternalServerError(`Unexpected agent response for ${command.name}`);
|
|
}
|
|
|
|
return result as Extract<VolumeCommandResult, { name: TCommand["name"] }>;
|
|
};
|
|
|
|
const volumeForAgent = async (volume: Volume): Promise<Volume> => ({
|
|
...volume,
|
|
config: await decryptVolumeConfig(volume.config),
|
|
});
|
|
|
|
const volumeForHost = async (volume: Volume): Promise<AgentVolume> => ({
|
|
...volume,
|
|
shortId: volume.shortId,
|
|
config: await decryptVolumeConfig(volume.config),
|
|
provisioningId: volume.provisioningId ?? null,
|
|
});
|
|
|
|
// Transitional fallback: older controller-only installs do not run the supervised local agent.
|
|
// Keep all controller-local host execution behind this predicate so the fallback is easy to delete
|
|
// once volume operations always go through the local agent.
|
|
const shouldRunViaAgent = (volume: Volume) => volume.agentId !== LOCAL_AGENT_ID || config.flags.enableLocalAgent;
|
|
|
|
const shouldUseControllerLocalVolumeFallback = (volume: Volume) => !shouldRunViaAgent(volume);
|
|
|
|
const runVolumeBackendCommand = async (
|
|
volume: Volume,
|
|
name: "volume.mount" | "volume.unmount" | "volume.checkHealth",
|
|
) => {
|
|
if (shouldUseControllerLocalVolumeFallback(volume)) {
|
|
const backend = createVolumeBackend(await volumeForHost(volume));
|
|
switch (name) {
|
|
case "volume.mount":
|
|
return backend.mount();
|
|
case "volume.unmount":
|
|
return backend.unmount();
|
|
case "volume.checkHealth":
|
|
return backend.checkHealth();
|
|
}
|
|
}
|
|
|
|
const command = await runVolumeCommand(volume.agentId, {
|
|
name,
|
|
volume: await volumeForAgent(volume),
|
|
});
|
|
return command.result;
|
|
};
|
|
|
|
const createVolume = async (name: string, backendConfig: BackendConfig) => {
|
|
const organizationId = getOrganizationId();
|
|
const trimmedName = name.trim();
|
|
|
|
if (trimmedName.length === 0) {
|
|
throw new BadRequestError("Volume name cannot be empty");
|
|
}
|
|
|
|
const shortId = generateShortId();
|
|
const encryptedConfig = await encryptVolumeConfig(backendConfig);
|
|
|
|
const [created] = await db
|
|
.insert(volumesTable)
|
|
.values({
|
|
shortId,
|
|
name: trimmedName,
|
|
config: encryptedConfig,
|
|
type: backendConfig.backend,
|
|
agentId: LOCAL_AGENT_ID,
|
|
organizationId,
|
|
})
|
|
.returning();
|
|
|
|
if (!created) {
|
|
throw new InternalServerError("Failed to create volume");
|
|
}
|
|
|
|
const { error, status } = await runVolumeBackendCommand(created, "volume.mount");
|
|
|
|
await db
|
|
.update(volumesTable)
|
|
.set({ status, lastError: error ?? null, lastHealthCheck: Date.now() })
|
|
.where(and(eq(volumesTable.id, created.id), eq(volumesTable.organizationId, organizationId)));
|
|
|
|
return { volume: created, status: 201 };
|
|
};
|
|
|
|
const deleteVolume = async (shortId: ShortId) => {
|
|
const organizationId = getOrganizationId();
|
|
const volume = await findVolume(shortId);
|
|
|
|
if (!volume) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
await runVolumeBackendCommand(volume, "volume.unmount");
|
|
await db
|
|
.delete(volumesTable)
|
|
.where(and(eq(volumesTable.id, volume.id), eq(volumesTable.organizationId, organizationId)));
|
|
};
|
|
|
|
const mountVolume = async (shortId: ShortId) => {
|
|
const organizationId = getOrganizationId();
|
|
const volume = await findVolume(shortId);
|
|
|
|
if (!volume) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
await runVolumeBackendCommand(volume, "volume.unmount");
|
|
const { error, status } = await runVolumeBackendCommand(volume, "volume.mount");
|
|
|
|
await db
|
|
.update(volumesTable)
|
|
.set({ status, lastError: error ?? null, lastHealthCheck: Date.now() })
|
|
.where(and(eq(volumesTable.id, volume.id), eq(volumesTable.organizationId, organizationId)));
|
|
|
|
if (status === "mounted") {
|
|
serverEvents.emit("volume:mounted", { organizationId, volumeName: volume.name });
|
|
}
|
|
|
|
return { error, status };
|
|
};
|
|
|
|
const unmountVolume = async (shortId: ShortId, options?: { persistStatus?: boolean }) => {
|
|
const organizationId = getOrganizationId();
|
|
const volume = await findVolume(shortId);
|
|
|
|
if (!volume) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
const { status, error } = await runVolumeBackendCommand(volume, "volume.unmount");
|
|
|
|
if (options?.persistStatus !== false) {
|
|
await db
|
|
.update(volumesTable)
|
|
.set({ status })
|
|
.where(and(eq(volumesTable.id, volume.id), eq(volumesTable.organizationId, organizationId)));
|
|
|
|
if (status === "unmounted") {
|
|
serverEvents.emit("volume:unmounted", { organizationId, volumeName: volume.name });
|
|
}
|
|
}
|
|
|
|
return { error, status };
|
|
};
|
|
|
|
const getVolume = async (shortId: ShortId) => {
|
|
const volume = await findVolume(shortId);
|
|
|
|
if (!volume) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
let statfs: Partial<StatFs> = {};
|
|
if (volume.status === "mounted") {
|
|
statfs = await withTimeout(
|
|
shouldRunViaAgent(volume)
|
|
? runVolumeCommand(volume.agentId, {
|
|
name: "volume.statfs",
|
|
volume: await volumeForAgent(volume),
|
|
}).then((command) => command.result)
|
|
: volumeForHost(volume).then((hostVolume) => getStatFs(getVolumePath(hostVolume))),
|
|
1000,
|
|
"volume.statfs",
|
|
).catch((error) => {
|
|
logger.warn(`Failed to get statfs for volume ${volume.name}: ${toMessage(error)}`);
|
|
return {};
|
|
});
|
|
}
|
|
|
|
return { volume, statfs };
|
|
};
|
|
|
|
const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => {
|
|
const organizationId = getOrganizationId();
|
|
const existing = await findVolume(shortId);
|
|
|
|
if (!existing) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
const newName = volumeData.name !== undefined ? volumeData.name.trim() : existing.name;
|
|
|
|
if (newName.length === 0) {
|
|
throw new BadRequestError("Volume name cannot be empty");
|
|
}
|
|
|
|
const configChanged =
|
|
JSON.stringify(existing.config) !== JSON.stringify(volumeData.config) && volumeData.config !== undefined;
|
|
|
|
if (configChanged) {
|
|
logger.debug("Unmounting existing volume before applying new config");
|
|
await runVolumeBackendCommand(existing, "volume.unmount");
|
|
}
|
|
|
|
const newConfigResult = volumeConfigSchema.safeParse(volumeData.config || existing.config);
|
|
if (!newConfigResult.success) {
|
|
throw new BadRequestError("Invalid volume configuration");
|
|
}
|
|
const newConfig = newConfigResult.data;
|
|
|
|
const encryptedConfig = await encryptVolumeConfig(newConfig);
|
|
|
|
const [updated] = await db
|
|
.update(volumesTable)
|
|
.set({
|
|
name: newName,
|
|
config: encryptedConfig,
|
|
type: volumeData.config?.backend,
|
|
autoRemount: volumeData.autoRemount,
|
|
updatedAt: Date.now(),
|
|
})
|
|
.where(and(eq(volumesTable.id, existing.id), eq(volumesTable.organizationId, organizationId)))
|
|
.returning();
|
|
|
|
if (!updated) {
|
|
throw new InternalServerError("Failed to update volume");
|
|
}
|
|
|
|
if (configChanged) {
|
|
const { error, status } = await runVolumeBackendCommand(updated, "volume.mount");
|
|
await db
|
|
.update(volumesTable)
|
|
.set({ status, lastError: error ?? null, lastHealthCheck: Date.now() })
|
|
.where(and(eq(volumesTable.id, existing.id), eq(volumesTable.organizationId, organizationId)));
|
|
|
|
serverEvents.emit("volume:updated", { organizationId, volumeName: updated.name });
|
|
}
|
|
|
|
return { volume: updated };
|
|
};
|
|
|
|
const testConnection = async (backendConfig: BackendConfig) => {
|
|
if (!config.flags.enableLocalAgent) {
|
|
return Effect.runPromise(testVolumeConnection(backendConfig));
|
|
}
|
|
|
|
const command = await runVolumeCommand(LOCAL_AGENT_ID, { name: "volume.testConnection", backendConfig });
|
|
return command.result;
|
|
};
|
|
|
|
const checkHealth = async (shortId: ShortId) => {
|
|
const organizationId = getOrganizationId();
|
|
const volume = await findVolume(shortId);
|
|
|
|
if (!volume) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
const { error, status } = await runVolumeBackendCommand(volume, "volume.checkHealth");
|
|
|
|
if (status !== volume.status) {
|
|
serverEvents.emit("volume:status_changed", { organizationId, volumeName: volume.name, status });
|
|
}
|
|
|
|
await db
|
|
.update(volumesTable)
|
|
.set({ lastHealthCheck: Date.now(), status, lastError: error ?? null })
|
|
.where(and(eq(volumesTable.id, volume.id), eq(volumesTable.organizationId, organizationId)));
|
|
|
|
return { status, error };
|
|
};
|
|
|
|
const ensureHealthyVolume = async (shortId: ShortId): Promise<EnsureHealthyVolumeResult> => {
|
|
const volume = await findVolume(shortId);
|
|
|
|
if (!volume) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
if (volume.status === "unmounted") {
|
|
return { ready: false, volume, reason: volume.lastError ?? "Volume is not mounted" };
|
|
}
|
|
|
|
let failureReason = volume.lastError ?? "Volume health check failed";
|
|
let failedVolume = volume;
|
|
|
|
if (volume.status !== "error") {
|
|
const health = await checkHealth(shortId);
|
|
|
|
if (health.status === "mounted") {
|
|
return {
|
|
ready: true,
|
|
volume: { ...volume, status: "mounted", lastError: null },
|
|
remounted: false,
|
|
};
|
|
}
|
|
|
|
failureReason = health.error ?? failureReason;
|
|
failedVolume = { ...volume, status: "error", lastError: health.error ?? null };
|
|
}
|
|
|
|
if (!volume.autoRemount) {
|
|
return { ready: false, volume: failedVolume, reason: failureReason };
|
|
}
|
|
|
|
logger.warn(
|
|
`${volume.name} is not healthy. Auto-remount is enabled, attempting to remount. Reason: ${failureReason}`,
|
|
);
|
|
const remount = await mountVolume(shortId);
|
|
|
|
if (remount.status !== "mounted") {
|
|
return {
|
|
ready: false,
|
|
volume: { ...volume, status: remount.status, lastError: remount.error ?? null },
|
|
reason: remount.error ?? failureReason,
|
|
};
|
|
}
|
|
|
|
return {
|
|
ready: true,
|
|
volume: { ...volume, status: "mounted", lastError: null },
|
|
remounted: true,
|
|
};
|
|
};
|
|
|
|
const DEFAULT_PAGE_SIZE = 500;
|
|
|
|
const listFiles = async (shortId: ShortId, subPath?: string, offset: number = 0, limit: number = DEFAULT_PAGE_SIZE) => {
|
|
const volume = await findVolume(shortId);
|
|
|
|
if (!volume) {
|
|
throw new NotFoundError("Volume not found");
|
|
}
|
|
|
|
if (volume.status !== "mounted") {
|
|
throw new InternalServerError("Volume is not mounted");
|
|
}
|
|
|
|
try {
|
|
if (shouldUseControllerLocalVolumeFallback(volume)) {
|
|
return await listVolumeFiles(await volumeForHost(volume), subPath, offset, limit);
|
|
}
|
|
|
|
const command = await runVolumeCommand(volume.agentId, {
|
|
name: "volume.listFiles",
|
|
volume: await volumeForAgent(volume),
|
|
subPath,
|
|
offset,
|
|
limit,
|
|
});
|
|
return command.result;
|
|
} catch (error) {
|
|
throw new InternalServerError(`Failed to list files: ${toMessage(error)}`);
|
|
}
|
|
};
|
|
|
|
const browseFilesystem = async (browsePath: string) => {
|
|
try {
|
|
if (!config.flags.enableLocalAgent) {
|
|
return await browseHostFilesystem(browsePath);
|
|
}
|
|
|
|
const command = await runVolumeCommand(LOCAL_AGENT_ID, { name: "filesystem.browse", path: browsePath });
|
|
return command.result;
|
|
} catch (error) {
|
|
throw new InternalServerError(`Failed to browse filesystem: ${toMessage(error)}`);
|
|
}
|
|
};
|
|
|
|
export const volumeService = {
|
|
listVolumes,
|
|
createVolume,
|
|
mountVolume,
|
|
deleteVolume,
|
|
getVolume,
|
|
updateVolume,
|
|
testConnection,
|
|
unmountVolume,
|
|
checkHealth,
|
|
ensureHealthyVolume,
|
|
listFiles,
|
|
browseFilesystem,
|
|
};
|