import { type QueryClient, useQueryClient } from "@tanstack/react-query"; import { useCallback, useEffect, useRef } from "react"; import { logger } from "~/client/lib/logger"; import { serverEventNames, type ServerEventPayloadMap } from "~/schemas/server-events"; type LifecycleEventPayloadMap = { connected: { type: "connected"; timestamp: number }; heartbeat: { timestamp: number }; }; type ServerEventsPayloadMap = LifecycleEventPayloadMap & ServerEventPayloadMap; type ServerEventType = keyof ServerEventsPayloadMap; type EventHandler = (data: ServerEventsPayloadMap[T]) => void; type EventHandlerSet = Set>; type EventHandlerMap = { [K in ServerEventType]?: EventHandlerSet; }; type SharedServerEventsState = { eventSource: EventSource | null; handlers: EventHandlerMap; queryClient: QueryClient | null; subscribers: number; }; const invalidatingEvents = new Set([ "backup:completed", "volume:updated", "volume:status_changed", "notification:updated", "mirror:completed", "doctor:started", "doctor:completed", "doctor:cancelled", ]); export type RestoreProgressEvent = ServerEventsPayloadMap["restore:progress"]; export type RestoreCompletedEvent = ServerEventsPayloadMap["restore:completed"]; const sharedState: SharedServerEventsState = { eventSource: null, handlers: {}, queryClient: null, subscribers: 0, }; const parseEventData = (event: Event): ServerEventsPayloadMap[T] => JSON.parse((event as MessageEvent).data) as ServerEventsPayloadMap[T]; const isAbortError = (error: unknown): error is Error => error instanceof Error && error.name === "AbortError"; const emit = (eventName: T, data: ServerEventsPayloadMap[T]) => { const handlers = sharedState.handlers[eventName] as EventHandlerSet | undefined; handlers?.forEach((handler) => { handler(data); }); }; const refreshQueriesForEvent = (eventName: ServerEventType) => { if (!invalidatingEvents.has(eventName) || !sharedState.queryClient) { return; } void sharedState.queryClient.invalidateQueries().catch((error) => { if (!isAbortError(error)) { logger.error(`[SSE] Failed to refresh queries after ${eventName}:`, error); } }); }; const connectEventSource = (queryClient: QueryClient) => { sharedState.queryClient = queryClient; if (sharedState.eventSource) { return; } const eventSource = new EventSource("/api/v1/events"); sharedState.eventSource = eventSource; eventSource.addEventListener("connected", (event) => { const data = parseEventData<"connected">(event); logger.info("[SSE] Connected to server events"); emit("connected", data); }); eventSource.addEventListener("heartbeat", (event) => { emit("heartbeat", parseEventData<"heartbeat">(event)); }); for (const eventName of serverEventNames) { eventSource.addEventListener(eventName, (event) => { const data = parseEventData(event); logger.info(`[SSE] ${eventName}:`, data); refreshQueriesForEvent(eventName); if (eventName === "volume:status_changed") { const statusData = data as ServerEventsPayloadMap["volume:status_changed"]; emit("volume:status_changed", statusData); emit("volume:updated", statusData); return; } emit(eventName, data); }); } eventSource.onerror = (error) => { logger.error("[SSE] Connection error:", error); }; }; const disconnectEventSource = () => { if (!sharedState.eventSource) { return; } logger.info("[SSE] Disconnecting from server events"); sharedState.eventSource.close(); sharedState.eventSource = null; sharedState.queryClient = null; sharedState.handlers = {}; }; const addSharedEventListener = ( eventName: T, handler: EventHandler, options?: { signal?: AbortSignal }, ) => { if (options?.signal?.aborted) { return () => {}; } const existingHandlers = sharedState.handlers[eventName] as EventHandlerSet | undefined; const eventHandlers = existingHandlers ?? new Set>(); eventHandlers.add(handler); sharedState.handlers[eventName] = eventHandlers as EventHandlerMap[T]; const unsubscribe = () => { const handlers = sharedState.handlers[eventName] as EventHandlerSet | undefined; handlers?.delete(handler); if (handlers && handlers.size === 0) { delete sharedState.handlers[eventName]; } if (options?.signal) { options.signal.removeEventListener("abort", unsubscribe); } }; if (options?.signal) { options.signal.addEventListener("abort", unsubscribe, { once: true }); } return unsubscribe; }; /** * Hook to listen to Server-Sent Events (SSE) from the backend * Automatically handles cache invalidation for backup and volume events */ export function useServerEvents({ enabled = true }: { enabled?: boolean } = {}) { const queryClient = useQueryClient(); const addEventListener = useCallback(addSharedEventListener, []); const hasMountedRef = useRef(false); useEffect(() => { if (!enabled) { return; } connectEventSource(queryClient); if (!hasMountedRef.current) { sharedState.subscribers += 1; hasMountedRef.current = true; } return () => { if (!hasMountedRef.current) { return; } hasMountedRef.current = false; sharedState.subscribers = Math.max(0, sharedState.subscribers - 1); if (sharedState.subscribers === 0) { disconnectEventSource(); } }; }, [enabled, queryClient]); return { addEventListener }; }