Files
zerobyte/app/server/modules/events/events.controller.ts
2026-01-24 22:45:56 +01:00

191 lines
5.5 KiB
TypeScript

import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { logger } from "../../utils/logger";
import { serverEvents } from "../../core/events";
import { requireAuth } from "../auth/auth.middleware";
import type { DoctorResult } from "~/schemas/restic";
export const eventsController = new Hono().use(requireAuth).get("/", (c) => {
logger.info("Client connected to SSE endpoint");
const organizationId = c.get("organizationId");
return streamSSE(c, async (stream) => {
await stream.writeSSE({
data: JSON.stringify({ type: "connected", timestamp: Date.now() }),
event: "connected",
});
const onBackupStarted = async (data: {
organizationId: string;
scheduleId: number;
volumeName: string;
repositoryName: string;
}) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "backup:started",
});
};
const onBackupProgress = async (data: {
organizationId: string;
scheduleId: number;
volumeName: string;
repositoryName: string;
seconds_elapsed: number;
percent_done: number;
total_files: number;
files_done: number;
total_bytes: number;
bytes_done: number;
current_files: string[];
}) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "backup:progress",
});
};
const onBackupCompleted = async (data: {
organizationId: string;
scheduleId: number;
volumeName: string;
repositoryName: string;
status: "success" | "error" | "stopped" | "warning";
}) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "backup:completed",
});
};
const onVolumeMounted = async (data: { organizationId: string; volumeName: string }) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "volume:mounted",
});
};
const onVolumeUnmounted = async (data: { organizationId: string; volumeName: string }) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "volume:unmounted",
});
};
const onVolumeUpdated = async (data: { organizationId: string; volumeName: string }) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "volume:updated",
});
};
const onMirrorStarted = async (data: {
organizationId: string;
scheduleId: number;
repositoryId: string;
repositoryName: string;
}) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "mirror:started",
});
};
const onMirrorCompleted = async (data: {
organizationId: string;
scheduleId: number;
repositoryId: string;
repositoryName: string;
status: "success" | "error";
error?: string;
}) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "mirror:completed",
});
};
const onDoctorStarted = async (data: { organizationId: string; repositoryId: string; repositoryName: string }) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "doctor:started",
});
};
const onDoctorCompleted = async (
data: {
organizationId: string;
repositoryId: string;
repositoryName: string;
} & DoctorResult,
) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "doctor:completed",
});
};
const onDoctorCancelled = async (data: {
organizationId: string;
repositoryId: string;
repositoryName: string;
error?: string;
}) => {
if (data.organizationId !== organizationId) return;
await stream.writeSSE({
data: JSON.stringify(data),
event: "doctor:cancelled",
});
};
serverEvents.on("backup:started", onBackupStarted);
serverEvents.on("backup:progress", onBackupProgress);
serverEvents.on("backup:completed", onBackupCompleted);
serverEvents.on("volume:mounted", onVolumeMounted);
serverEvents.on("volume:unmounted", onVolumeUnmounted);
serverEvents.on("volume:updated", onVolumeUpdated);
serverEvents.on("mirror:started", onMirrorStarted);
serverEvents.on("mirror:completed", onMirrorCompleted);
serverEvents.on("doctor:started", onDoctorStarted);
serverEvents.on("doctor:completed", onDoctorCompleted);
serverEvents.on("doctor:cancelled", onDoctorCancelled);
let keepAlive = true;
stream.onAbort(() => {
logger.info("Client disconnected from SSE endpoint");
keepAlive = false;
serverEvents.off("backup:started", onBackupStarted);
serverEvents.off("backup:progress", onBackupProgress);
serverEvents.off("backup:completed", onBackupCompleted);
serverEvents.off("volume:mounted", onVolumeMounted);
serverEvents.off("volume:unmounted", onVolumeUnmounted);
serverEvents.off("volume:updated", onVolumeUpdated);
serverEvents.off("mirror:started", onMirrorStarted);
serverEvents.off("mirror:completed", onMirrorCompleted);
serverEvents.off("doctor:started", onDoctorStarted);
serverEvents.off("doctor:completed", onDoctorCompleted);
serverEvents.off("doctor:cancelled", onDoctorCancelled);
});
while (keepAlive) {
await stream.writeSSE({
data: JSON.stringify({ timestamp: Date.now() }),
event: "heartbeat",
});
await stream.sleep(5000);
}
});
});