From 75a63f87d8a5f270258eadee3c7edfec6e28200b Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 19 Apr 2026 17:55:53 +0200 Subject: [PATCH] feat(distributed): sync state with frontends, better backend management reporting (#9426) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(distributed): detect backend upgrades across worker nodes Before this change `DistributedBackendManager.CheckUpgrades` delegated to the local manager, which read backends from the frontend filesystem. In distributed deployments the frontend has no backends installed locally — they live on workers — so the upgrade-detection loop never ran and the UI silently never surfaced upgrades even when the gallery advertised newer versions or digests. Worker-side: NATS backend.list reply now carries Version, URI and Digest for each installed backend (read from metadata.json). Frontend-side: DistributedBackendManager.ListBackends aggregates per-node refs (name, status, version, digest) instead of deduping, and CheckUpgrades feeds that aggregation into gallery.CheckUpgradesAgainst — a new entrypoint factored out of CheckBackendUpgrades so both paths share the same core logic. Cluster drift policy: when per-node version/digest tuples disagree, the backend is flagged upgradeable regardless of whether any single node matches the gallery, and UpgradeInfo.NodeDrift enumerates the outliers so operators can see *why* it is out of sync. The next upgrade-all realigns the cluster. Tests cover: drift detection, unanimous-match (no upgrade), and the empty-installed-version path that the old distributed code silently missed. * feat(ui): surface backend upgrades in the System page The System page (Manage.jsx) only showed updates as a tiny inline arrow, so operators routinely missed them. Port the Backend Gallery's upgrade UX so System speaks the same visual language: - Yellow banner at the top of the Backends tab when upgrades are pending, with an "Upgrade all" button (serial fan-out, matches the gallery) and a "Updates only" filter toggle. - Warning pill (↑ N) next to the tab label so the count is glanceable even when the banner is scrolled out of view. - Per-row labeled "Upgrade to vX.Y" button (replaces the icon-only button that silently flipped semantics between Reinstall and Upgrade), plus an "Update available" badge in the new Version column. - New columns: Version (with upgrade + drift chips), Nodes (per-node attribution badges for distributed mode, degrading to a compact "on N nodes · M offline" chip above three nodes), Installed (relative time). - System backends render a "Protected" chip instead of a bare "—" so rows still align and the reason is obvious. - Delete uses the softer btn-danger-ghost so rows don't scream red; the ConfirmDialog still owns the "are you sure". The upgrade checker also needed the same per-worker fix as the previous commit: NewUpgradeChecker now takes a BackendManager getter so its periodic runs call the distributed CheckUpgrades (which asks workers) instead of the empty frontend filesystem. Without this the /api/backends/ upgrades endpoint stayed empty in distributed mode even with the protocol change in place. New CSS primitives — .upgrade-banner, .tab-pill, .badge-row, .cell-stack, .cell-mono, .cell-muted, .row-actions, .btn-danger-ghost — all live in App.css so other pages can adopt them without duplicating styles. * feat(ui): polish the Nodes page so it reads like a product The Nodes page was the biggest visual liability in distributed mode. Rework the main dashboard surfaces in place without changing behavior: StatCards: uniform height (96px min), left accent bar colored by the metric's semantic (success/warning/error/primary), icon lives in a 36x36 soft-tinted chip top-right, value is left-aligned and large. Grid auto-fills so the row doesn't collapse on narrow viewports. This replaces the previous thin-bordered boxes with inconsistent heights. Table rows: expandable rows now show a chevron cue on the left (rotates on expand) so users know rows open. Status cell became a dedicated chip with an LED-style halo dot instead of a bare bullet. Action buttons gained labels — "Approve", "Resume", "Drain" — so the icons aren't doing all the semantic work; the destructive remove action uses the softer btn-danger-ghost variant so rows don't scream red, with the ConfirmDialog still owning the real "are you sure". Applied cell-mono/cell-muted utility classes so label chips and addresses share one spacing/font grammar instead of re-declaring inline styles everywhere. Expanded drawer: empty states for Loaded Models and Installed Backends now render as a proper drawer-empty card (dashed border, icon, one-line hint) instead of a plain muted string that read like broken formatting. Tabs: three inline-styled buttons became the shared .tab class so they inherit focus ring, hover state, and the rest of the design system — matches the System page. "Add more workers" toggle turned into a .nodes-add-worker dashed-border button labelled "Register a new worker" (action voice) instead of a chevron + muted link that operators kept mistaking for broken text. New shared CSS primitives carry over to other pages: .stat-grid + .stat-card, .row-chevron, .node-status, .drawer-empty, .nodes-add-worker. * feat(distributed): durable backend fan-out + state reconciliation Two connected problems handled together: 1) Backend delete/install/upgrade used to silently skip non-healthy nodes, so a delete during an outage left a zombie on the offline node once it returned. The fan-out now records intent in a new pending_backend_ops table before attempting the NATS round-trip. Currently-healthy nodes get an immediate attempt; everyone else is queued. Unique index on (node_id, backend, op) means reissuing the same operation refreshes next_retry_at instead of stacking duplicates. 2) Loaded-model state could drift from reality: a worker OOM'd, got killed, or restarted a backend process would leave a node_models row claiming the model was still loaded, feeding ghost entries into the /api/nodes/models listing and the router's scheduling decisions. The existing ReplicaReconciler gains two new passes that run under a fresh KeyStateReconciler advisory lock (non-blocking, so one wedged frontend doesn't freeze the cluster): - drainPendingBackendOps: retries queued ops whose next_retry_at has passed on currently-healthy nodes. Success deletes the row; failure bumps attempts and pushes next_retry_at out with exponential backoff (30s → 15m cap). ErrNoResponders also marks the node unhealthy. - probeLoadedModels: gRPC-HealthChecks addresses the DB thinks are loaded but hasn't seen touched in the last probeStaleAfter (2m). Unreachable addresses are removed from the registry. A pluggable ModelProber lets tests substitute a fake without standing up gRPC. DistributedBackendManager exposes DeleteBackendDetailed so the HTTP handler can surface per-node outcomes ("2 succeeded, 1 queued") to the UI in a follow-up commit; the existing DeleteBackend still returns error-only for callers that don't care about node breakdown. Multi-frontend safety: the state pass uses advisorylock.TryWithLockCtx on a new key so N frontends coordinate — the same pattern the health monitor and replica reconciler already rely on. Single-node mode runs both passes inline (adapter is nil, state drain is a no-op). Tests cover the upsert semantics, backoff math, the probe removing an unreachable model but keeping a reachable one, and filtering by probeStaleAfter. * feat(ui): show cluster distribution of models in the System page When a frontend restarted in distributed mode, models that workers had already loaded weren't visible until the operator clicked into each node manually — the /api/models/capabilities endpoint only knew about configs on the frontend's filesystem, not the registry-backed truth. /api/models/capabilities now joins in ListAllLoadedModels() when the registry is active, returning loaded_on[] with node id/name/state/status for each model. Models that live in the registry but lack a local config (the actual ghosts, not recovered from the frontend's file cache) still surface with source="registry-only" so operators can see and persist them; without that emission they'd be invisible to this frontend. Manage → Models replaces the old Running/Idle pill with a distribution cell that lists the first three nodes the model is loaded on as chips colored by state (green loaded, blue loading, amber anything else). On wider clusters the remaining count collapses into a +N chip with a title-attribute breakdown. Disabled / single-node behavior unchanged. Adopted models get an extra "Adopted" ghost-icon chip with hover copy explaining what it means and how to make it permanent. Distributed mode also enables a 10s auto-refresh and a "Last synced Xs ago" indicator next to the Update button so ghost rows drop off within one reconcile tick after their owning process dies. Non-distributed mode is untouched — no polling, no cell-stack, same old Running/Idle. * feat(ui): NodeDistributionChip — shared per-node attribution component Large clusters were going to break the Manage → Backends Nodes column: the old inline logic rendered every node as a badge and would shred the layout at >10 workers, plus the Manage → Models distribution cell had copy-pasted its own slightly-different version. NodeDistributionChip handles any cluster size with two render modes: - small (≤3 nodes): inline chips of node names, colored by health. - large: a single "on N nodes · M offline · K drift" summary chip; clicking opens a Popover with a per-node table (name, status, version, digest for backends; name, status, state for models). Drift counting mirrors the backend's summarizeNodeDrift so the UI number matches UpgradeInfo.NodeDrift. Digests are truncated to the docker-style 12-char form with the full value preserved in the title. Popover is a new general-purpose primitive: fixed positioning anchored to the trigger, flips above when there's no room below, closes on outside-click or Escape, returns focus to the trigger. Uses .card as its surface so theming is inherited. Also useful for a future labels-editor popup and the user menu. Manage.jsx drops its duplicated inline Nodes-column + loaded_on cell and uses the shared chip with context="backends" / "models" respectively. Delete code removes ~40 lines of ad-hoc logic. * feat(ui): shared FilterBar across the System page tabs The Backends gallery had a nice search + chip + toggle strip; the System page had nothing, so the two surfaces felt like different apps. Lift the pattern into a reusable FilterBar and wire both System tabs through it. New component core/http/react-ui/src/components/FilterBar.jsx renders a search input, a role="tablist" chip row (aria-selected for a11y), and optional toggles / right slot. Chips support an optional `count` which the System page uses to show "User 3", "Updates 1" etc. System Models tab: search by id or backend; chips for All/Running/Idle/Disabled/Pinned plus a conditional Distributed chip in distributed mode. "Last synced" + Update button live in the right slot. System Backends tab: search by name/alias/meta-backend-for; chips for All/User/System/Meta plus conditional Updates / Offline-nodes chips when relevant. The old ad-hoc "Updates only" toggle from the upgrade banner folded into the Updates chip — one source of truth for that filter. Offline chip only appears in distributed mode when at least one backend has an unhealthy node, so the chip row stays quiet on healthy clusters. Filter state persists in URL query params (mq/mf/bq/bf) so deep links and tab switches keep the operator's filter context instead of resetting every time. Also adds an "Adopted" distribution path: when a model in /api/models/capabilities carries source="registry-only" (discovered on a worker but not configured locally), the Models tab shows a ghost chip labelled "Adopted" with hover copy explaining how to persist it — this is what closes the loop on the ghost-model story end-to-end. --- core/application/distributed.go | 20 +- core/application/startup.go | 7 +- core/application/upgrade_checker.go | 53 +- core/cli/worker.go | 3 + core/gallery/backends.go | 17 + core/gallery/upgrade.go | 134 ++++- core/gallery/upgrade_test.go | 91 ++++ core/http/react-ui/src/App.css | 395 +++++++++++++++ .../react-ui/src/components/FilterBar.jsx | 87 ++++ .../src/components/NodeDistributionChip.jsx | 168 +++++++ core/http/react-ui/src/components/Popover.jsx | 86 ++++ core/http/react-ui/src/pages/Manage.jsx | 468 ++++++++++++++---- core/http/react-ui/src/pages/Nodes.jsx | 138 +++--- core/http/routes/ui_api.go | 71 ++- core/services/advisorylock/keys.go | 1 + core/services/galleryop/service.go | 10 + core/services/messaging/subjects.go | 6 + core/services/nodes/managers_distributed.go | 276 ++++++++--- core/services/nodes/reconciler.go | 188 ++++++- core/services/nodes/reconciler_test.go | 135 +++++ core/services/nodes/registry.go | 143 +++++- 21 files changed, 2185 insertions(+), 312 deletions(-) create mode 100644 core/http/react-ui/src/components/FilterBar.jsx create mode 100644 core/http/react-ui/src/components/NodeDistributionChip.jsx create mode 100644 core/http/react-ui/src/components/Popover.jsx diff --git a/core/application/distributed.go b/core/application/distributed.go index 31e87fdab..26d56d121 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut DB: authDB, }) - // Create ReplicaReconciler for auto-scaling model replicas + // Create ReplicaReconciler for auto-scaling model replicas. Adapter + + // RegistrationToken feed the state-reconciliation passes: pending op + // drain uses the adapter, and model health probes use the token to auth + // against workers' gRPC HealthCheck. reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{ - Registry: registry, - Scheduler: router, - Unloader: remoteUnloader, - DB: authDB, - Interval: 30 * time.Second, - ScaleDownDelay: 5 * time.Minute, + Registry: registry, + Scheduler: router, + Unloader: remoteUnloader, + Adapter: remoteUnloader, + RegistrationToken: cfg.Distributed.RegistrationToken, + DB: authDB, + Interval: 30 * time.Second, + ScaleDownDelay: 5 * time.Minute, + ProbeStaleAfter: 2 * time.Minute, }) // Create ModelRouterAdapter to wire into ModelLoader diff --git a/core/application/startup.go b/core/application/startup.go index a03f17bd2..241ea8b22 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -235,7 +235,12 @@ func New(opts ...config.AppOption) (*Application, error) { // In distributed mode, uses PostgreSQL advisory lock so only one frontend // instance runs periodic checks (avoids duplicate upgrades across replicas). if len(options.BackendGalleries) > 0 { - uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB()) + // Pass a lazy getter for the backend manager so the checker always + // uses the active one — DistributedBackendManager is swapped in above + // and asks workers for their installed backends, which is what + // upgrade detection needs in distributed mode. + bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() } + uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn) application.upgradeChecker = uc go uc.Run(options.Context) } diff --git a/core/application/upgrade_checker.go b/core/application/upgrade_checker.go index 94fb3f6c7..3b2d94544 100644 --- a/core/application/upgrade_checker.go +++ b/core/application/upgrade_checker.go @@ -8,6 +8,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" "github.com/mudler/LocalAI/core/services/advisorylock" + "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/xlog" @@ -26,6 +27,12 @@ type UpgradeChecker struct { galleries []config.Gallery systemState *system.SystemState db *gorm.DB // non-nil in distributed mode + // backendManagerFn lazily returns the current backend manager (may be + // swapped from Local to Distributed after startup). Pulled through each + // check so the UpgradeChecker uses whichever is active. In distributed + // mode this ensures CheckUpgrades asks workers instead of the (empty) + // frontend filesystem — fixing the bug where upgrades never surfaced. + backendManagerFn func() galleryop.BackendManager checkInterval time.Duration stop chan struct{} @@ -40,18 +47,22 @@ type UpgradeChecker struct { // NewUpgradeChecker creates a new UpgradeChecker service. // Pass db=nil for standalone mode, or a *gorm.DB for distributed mode // (uses advisory locks so only one instance runs periodic checks). -func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker { +// backendManagerFn is optional; when set, CheckUpgrades is routed through +// the active backend manager — required in distributed mode so the check +// aggregates from workers rather than the empty frontend filesystem. +func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB, backendManagerFn func() galleryop.BackendManager) *UpgradeChecker { return &UpgradeChecker{ - appConfig: appConfig, - modelLoader: ml, - galleries: appConfig.BackendGalleries, - systemState: appConfig.SystemState, - db: db, - checkInterval: 6 * time.Hour, - stop: make(chan struct{}), - done: make(chan struct{}), - triggerCh: make(chan struct{}, 1), - lastUpgrades: make(map[string]gallery.UpgradeInfo), + appConfig: appConfig, + modelLoader: ml, + galleries: appConfig.BackendGalleries, + systemState: appConfig.SystemState, + db: db, + backendManagerFn: backendManagerFn, + checkInterval: 6 * time.Hour, + stop: make(chan struct{}), + done: make(chan struct{}), + triggerCh: make(chan struct{}, 1), + lastUpgrades: make(map[string]gallery.UpgradeInfo), } } @@ -64,13 +75,16 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade func (uc *UpgradeChecker) Run(ctx context.Context) { defer close(uc.done) - // Initial delay: don't slow down startup + // Initial delay: don't slow down startup. Short enough that operators + // don't stare at an empty upgrade banner for long; long enough that + // workers have registered and reported their installed backends. + initialDelay := 10 * time.Second select { case <-ctx.Done(): return case <-uc.stop: return - case <-time.After(30 * time.Second): + case <-time.After(initialDelay): } // First check always runs locally (to warm the cache on this instance) @@ -144,7 +158,18 @@ func (uc *UpgradeChecker) GetAvailableUpgrades() map[string]gallery.UpgradeInfo } func (uc *UpgradeChecker) runCheck(ctx context.Context) { - upgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState) + var ( + upgrades map[string]gallery.UpgradeInfo + err error + ) + if uc.backendManagerFn != nil { + if bm := uc.backendManagerFn(); bm != nil { + upgrades, err = bm.CheckUpgrades(ctx) + } + } + if upgrades == nil && err == nil { + upgrades, err = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState) + } uc.mu.Lock() uc.lastCheckTime = time.Now() diff --git a/core/cli/worker.go b/core/cli/worker.go index affde4b08..186fe298e 100644 --- a/core/cli/worker.go +++ b/core/cli/worker.go @@ -738,6 +738,9 @@ func (s *backendSupervisor) subscribeLifecycleEvents() { if b.Metadata != nil { info.InstalledAt = b.Metadata.InstalledAt info.GalleryURL = b.Metadata.GalleryURL + info.Version = b.Metadata.Version + info.URI = b.Metadata.URI + info.Digest = b.Metadata.Digest } infos = append(infos, info) } diff --git a/core/gallery/backends.go b/core/gallery/backends.go index c2622c272..ee3ca906d 100644 --- a/core/gallery/backends.go +++ b/core/gallery/backends.go @@ -394,6 +394,23 @@ type SystemBackend struct { Metadata *BackendMetadata UpgradeAvailable bool `json:"upgrade_available,omitempty"` AvailableVersion string `json:"available_version,omitempty"` + // Nodes holds per-node attribution in distributed mode. Empty in single-node. + // Each entry describes a node that has this backend installed, with the + // version/digest it reports. Lets the UI surface drift and per-node status. + Nodes []NodeBackendRef `json:"nodes,omitempty"` +} + +// NodeBackendRef describes one node's view of an installed backend. Used both +// for per-node attribution in the UI and for drift detection during upgrade +// checks (a cluster with mismatched versions/digests is flagged upgradeable). +type NodeBackendRef struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + NodeStatus string `json:"node_status"` // healthy | unhealthy | offline | draining | pending + Version string `json:"version,omitempty"` + Digest string `json:"digest,omitempty"` + URI string `json:"uri,omitempty"` + InstalledAt string `json:"installed_at,omitempty"` } type SystemBackends map[string]SystemBackend diff --git a/core/gallery/upgrade.go b/core/gallery/upgrade.go index dde33300f..d0671617e 100644 --- a/core/gallery/upgrade.go +++ b/core/gallery/upgrade.go @@ -23,22 +23,45 @@ type UpgradeInfo struct { AvailableVersion string `json:"available_version"` InstalledDigest string `json:"installed_digest,omitempty"` AvailableDigest string `json:"available_digest,omitempty"` + // NodeDrift lists nodes whose installed version or digest differs from + // the cluster majority. Non-empty means the cluster has diverged and an + // upgrade will realign it. Empty in single-node mode. + NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"` } -// CheckBackendUpgrades compares installed backends against gallery entries -// and returns a map of backend names to UpgradeInfo for those that have -// newer versions or different OCI digests available. +// NodeDriftInfo describes one node that disagrees with the cluster majority +// on which version/digest of a backend is installed. +type NodeDriftInfo struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + Version string `json:"version,omitempty"` + Digest string `json:"digest,omitempty"` +} + +// CheckBackendUpgrades is the single-node entrypoint. Distributed callers use +// CheckUpgradesAgainst directly with their aggregated SystemBackends. func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState) (map[string]UpgradeInfo, error) { + installed, err := ListSystemBackends(systemState) + if err != nil { + return nil, fmt.Errorf("failed to list installed backends: %w", err) + } + return CheckUpgradesAgainst(ctx, galleries, systemState, installed) +} + +// CheckUpgradesAgainst compares a caller-supplied SystemBackends set against +// the gallery. Fixes the distributed-mode bug where the old code passed the +// frontend's (empty) local filesystem through ListSystemBackends and so never +// surfaced any upgrades. +// +// Cluster drift policy: if a backend's per-node versions/digests disagree, the +// row is flagged upgradeable regardless of whether any node matches the gallery +// — next Upgrade All realigns the cluster. NodeDrift lists the outliers. +func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) { galleryBackends, err := AvailableBackends(galleries, systemState) if err != nil { return nil, fmt.Errorf("failed to list available backends: %w", err) } - installedBackends, err := ListSystemBackends(systemState) - if err != nil { - return nil, fmt.Errorf("failed to list installed backends: %w", err) - } - result := make(map[string]UpgradeInfo) for _, installed := range installedBackends { @@ -57,34 +80,48 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste } installedVersion := installed.Metadata.Version + installedDigest := installed.Metadata.Digest galleryVersion := galleryEntry.Version - // If both sides have versions, compare them + // Detect cluster drift: does every node report the same version+digest? + // In single-node mode this stays empty (Nodes is nil). + majority, drift := summarizeNodeDrift(installed.Nodes) + if majority.version != "" { + installedVersion = majority.version + } + if majority.digest != "" { + installedDigest = majority.digest + } + + makeInfo := func(info UpgradeInfo) UpgradeInfo { + info.NodeDrift = drift + return info + } + + // If versions are available on both sides, they're the source of truth. if galleryVersion != "" && installedVersion != "" { - if galleryVersion != installedVersion { - result[installed.Metadata.Name] = UpgradeInfo{ + if galleryVersion != installedVersion || len(drift) > 0 { + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ BackendName: installed.Metadata.Name, InstalledVersion: installedVersion, AvailableVersion: galleryVersion, - } + }) } - // Versions match — no upgrade needed continue } - // Gallery has a version but installed doesn't — this happens for backends - // installed before version tracking was added. Flag as upgradeable so - // users can re-install to pick up version metadata. + // Gallery has a version but installed doesn't — backends installed before + // version tracking was added. Flag as upgradeable to pick up metadata. if galleryVersion != "" && installedVersion == "" { - result[installed.Metadata.Name] = UpgradeInfo{ + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ BackendName: installed.Metadata.Name, InstalledVersion: "", AvailableVersion: galleryVersion, - } + }) continue } - // Fall back to OCI digest comparison when versions are unavailable + // Fall back to OCI digest comparison when versions are unavailable. if downloader.URI(galleryEntry.URI).LooksLikeOCI() { remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil) if err != nil { @@ -92,21 +129,68 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste continue } // If we have a stored digest, compare; otherwise any remote digest - // means we can't confirm we're up to date — flag as upgradeable - if installed.Metadata.Digest == "" || remoteDigest != installed.Metadata.Digest { - result[installed.Metadata.Name] = UpgradeInfo{ + // means we can't confirm we're up to date — flag as upgradeable. + if installedDigest == "" || remoteDigest != installedDigest || len(drift) > 0 { + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ BackendName: installed.Metadata.Name, - InstalledDigest: installed.Metadata.Digest, + InstalledDigest: installedDigest, AvailableDigest: remoteDigest, - } + }) } + } else if len(drift) > 0 { + // No version/digest path but nodes disagree — still worth flagging. + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ + BackendName: installed.Metadata.Name, + InstalledVersion: installedVersion, + InstalledDigest: installedDigest, + }) } - // No version info and non-OCI URI — cannot determine, skip } return result, nil } +// summarizeNodeDrift collapses per-node version/digest tuples to a majority +// pair and returns the outliers. In single-node mode (empty nodes slice) this +// returns zero values and a nil drift list. +func summarizeNodeDrift(nodes []NodeBackendRef) (majority struct{ version, digest string }, drift []NodeDriftInfo) { + if len(nodes) == 0 { + return majority, nil + } + + type key struct{ version, digest string } + counts := map[key]int{} + var topKey key + var topCount int + for _, n := range nodes { + k := key{n.Version, n.Digest} + counts[k]++ + if counts[k] > topCount { + topCount = counts[k] + topKey = k + } + } + + majority.version = topKey.version + majority.digest = topKey.digest + + if len(counts) == 1 { + return majority, nil // unanimous — no drift + } + for _, n := range nodes { + if n.Version == majority.version && n.Digest == majority.digest { + continue + } + drift = append(drift, NodeDriftInfo{ + NodeID: n.NodeID, + NodeName: n.NodeName, + Version: n.Version, + Digest: n.Digest, + }) + } + return majority, drift +} + // UpgradeBackend upgrades a single backend to the latest gallery version using // an atomic swap with backup-based rollback on failure. func UpgradeBackend(ctx context.Context, systemState *system.SystemState, modelLoader *model.ModelLoader, galleries []config.Gallery, backendName string, downloadStatus func(string, string, string, float64)) error { diff --git a/core/gallery/upgrade_test.go b/core/gallery/upgrade_test.go index f65b4276b..6fd386b2e 100644 --- a/core/gallery/upgrade_test.go +++ b/core/gallery/upgrade_test.go @@ -144,6 +144,97 @@ var _ = Describe("Upgrade Detection and Execution", func() { }) }) + // CheckUpgradesAgainst is the entry point used by DistributedBackendManager. + // It takes installed backends directly — typically aggregated from workers — + // instead of reading the frontend filesystem. These tests exercise drift + // detection, which is the feature the distributed path relies on. + Describe("CheckUpgradesAgainst (distributed)", func() { + It("flags upgrade when cluster nodes disagree on version, even if gallery matches majority", func() { + writeGalleryYAML([]GalleryBackend{ + { + Metadata: Metadata{Name: "my-backend"}, + URI: filepath.Join(tempDir, "some-source"), + Version: "2.0.0", + }, + }) + + installed := SystemBackends{ + "my-backend": SystemBackend{ + Name: "my-backend", + Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"}, + Nodes: []NodeBackendRef{ + {NodeID: "a", NodeName: "worker-1", Version: "2.0.0"}, + {NodeID: "b", NodeName: "worker-2", Version: "2.0.0"}, + {NodeID: "c", NodeName: "worker-3", Version: "1.0.0"}, // drift + }, + }, + } + + upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed) + Expect(err).NotTo(HaveOccurred()) + Expect(upgrades).To(HaveKey("my-backend")) + info := upgrades["my-backend"] + Expect(info.AvailableVersion).To(Equal("2.0.0")) + Expect(info.NodeDrift).To(HaveLen(1)) + Expect(info.NodeDrift[0].NodeName).To(Equal("worker-3")) + Expect(info.NodeDrift[0].Version).To(Equal("1.0.0")) + }) + + It("does not flag upgrade when all nodes agree and match gallery", func() { + writeGalleryYAML([]GalleryBackend{ + { + Metadata: Metadata{Name: "my-backend"}, + URI: filepath.Join(tempDir, "some-source"), + Version: "2.0.0", + }, + }) + + installed := SystemBackends{ + "my-backend": SystemBackend{ + Name: "my-backend", + Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"}, + Nodes: []NodeBackendRef{ + {NodeID: "a", NodeName: "worker-1", Version: "2.0.0"}, + {NodeID: "b", NodeName: "worker-2", Version: "2.0.0"}, + }, + }, + } + + upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed) + Expect(err).NotTo(HaveOccurred()) + Expect(upgrades).To(BeEmpty()) + }) + + It("surfaces empty-installed-version path the old distributed code silently missed", func() { + // Simulates the real-world bug: worker has a backend, its version + // is empty (pre-tracking or OCI-pinned-to-latest), gallery has a + // version. Pre-fix CheckUpgrades returned nothing; now it surfaces. + writeGalleryYAML([]GalleryBackend{ + { + Metadata: Metadata{Name: "my-backend"}, + URI: filepath.Join(tempDir, "some-source"), + Version: "2.0.0", + }, + }) + + installed := SystemBackends{ + "my-backend": SystemBackend{ + Name: "my-backend", + Metadata: &BackendMetadata{Name: "my-backend"}, + Nodes: []NodeBackendRef{ + {NodeID: "a", NodeName: "worker-1"}, + }, + }, + } + + upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed) + Expect(err).NotTo(HaveOccurred()) + Expect(upgrades).To(HaveKey("my-backend")) + Expect(upgrades["my-backend"].InstalledVersion).To(BeEmpty()) + Expect(upgrades["my-backend"].AvailableVersion).To(Equal("2.0.0")) + }) + }) + Describe("UpgradeBackend", func() { It("should replace backend directory and update metadata", func() { // Install v1 diff --git a/core/http/react-ui/src/App.css b/core/http/react-ui/src/App.css index c9e945d16..03c448243 100644 --- a/core/http/react-ui/src/App.css +++ b/core/http/react-ui/src/App.css @@ -1529,6 +1529,401 @@ select.input { background: var(--color-warning-light); color: var(--color-warning); } +.badge-accent { + background: var(--color-accent-light); + color: var(--color-accent); +} + +/* Horizontal row of badges used inside table cells — consistent spacing so + cells line up regardless of how many badges are present. */ +.badge-row { + display: inline-flex; + flex-wrap: wrap; + gap: 4px; + align-items: center; +} + +/* Vertically stacked cell content (e.g. version + update chip + drift chip). + Keeps rows readable at scale without inline style={{...}} everywhere. */ +.cell-stack { + display: flex; + flex-direction: column; + gap: 4px; + align-items: flex-start; +} + +.cell-mono { + font-family: 'JetBrains Mono', ui-monospace, monospace; + font-size: var(--text-xs); + color: var(--color-text-primary); +} + +.cell-muted { + color: var(--color-text-muted); + font-size: var(--text-xs); +} + +.cell-subtle { + color: var(--color-text-muted); + font-size: var(--text-xs); + font-weight: 400; + margin-left: 8px; +} + +.cell-name { + display: inline-flex; + align-items: center; + gap: var(--spacing-xs); + font-weight: 500; +} +.cell-name > i { + color: var(--color-accent); + font-size: var(--text-xs); +} + +.row-actions { + display: flex; + gap: var(--spacing-xs); + justify-content: flex-end; + align-items: center; +} + +/* Softer delete button for dense tables — the destructive confirm dialog + already owns the "are you sure" affordance, so the button itself doesn't + need to scream. Keeps the delete red readable without dominating rows. */ +.btn.btn-danger-ghost { + background: transparent; + color: var(--color-error); + border-color: transparent; +} +.btn.btn-danger-ghost:hover:not(:disabled) { + background: var(--color-error-light); + color: var(--color-error); + border-color: var(--color-error-light); +} + +/* Small count pill used inside tabs ("(3) ↑ 2") so update counts are + glanceable without extra rows of UI. */ +.tab-pill { + display: inline-flex; + align-items: center; + gap: 3px; + margin-left: 6px; + padding: 1px 6px; + border-radius: var(--radius-full); + font-size: var(--text-xs); + font-weight: 600; + line-height: 1.4; +} +.tab-pill--warning { + background: var(--color-warning-light); + color: var(--color-warning); +} + +/* Stat cards — uniform-height cluster metrics for the Nodes dashboard. + Left accent bar ties the color to the metric's semantic (success/warning/ + error/primary), icon chip sits top-right, value is left-aligned and + prominent so you can scan a row of cards without reading labels. */ +.stat-grid { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(180px, 1fr)); + gap: var(--spacing-md); + margin-bottom: var(--spacing-xl); +} + +.stat-card { + position: relative; + display: flex; + align-items: center; + justify-content: space-between; + gap: var(--spacing-sm); + padding: var(--spacing-md); + min-height: 96px; + background: var(--color-bg-raised, var(--color-bg-secondary)); + border: 1px solid var(--color-border-subtle); + border-radius: var(--radius-lg); + transition: transform var(--duration-fast) var(--ease-default), + box-shadow var(--duration-fast) var(--ease-default), + border-color var(--duration-fast) var(--ease-default); + overflow: hidden; +} +.stat-card::before { + content: ''; + position: absolute; + left: 0; top: 0; bottom: 0; + width: 3px; + background: var(--stat-accent, var(--color-border-subtle)); + transition: background var(--duration-fast) var(--ease-default); +} +.stat-card:hover { + transform: translateY(-1px); + box-shadow: var(--shadow-sm); + border-color: var(--color-border); +} + +.stat-card__body { + display: flex; + flex-direction: column; + gap: 6px; + min-width: 0; +} +.stat-card__label { + font-size: var(--text-xs); + font-weight: 600; + letter-spacing: 0.08em; + text-transform: uppercase; + color: var(--color-text-muted); + white-space: normal; + line-height: 1.2; +} +.stat-card__value { + font-size: var(--text-2xl); + font-weight: 600; + font-family: 'JetBrains Mono', ui-monospace, monospace; + line-height: 1; + color: var(--color-text-primary); + word-break: break-word; +} +.stat-card__icon { + display: inline-flex; + align-items: center; + justify-content: center; + width: 36px; + height: 36px; + border-radius: var(--radius-md); + background: color-mix(in srgb, var(--stat-accent, var(--color-text-muted)) 12%, transparent); + color: var(--stat-accent, var(--color-text-muted)); + font-size: var(--text-lg); + flex-shrink: 0; +} + +/* Subtle "Register a new worker" trigger replacing the broken-text chevron + link. Still opens the same hint card — just reads like a button now. */ +.nodes-add-worker { + display: inline-flex; + align-items: center; + gap: var(--spacing-xs); + padding: var(--spacing-xs) var(--spacing-sm); + background: transparent; + border: 1px dashed var(--color-border); + border-radius: var(--radius-md); + color: var(--color-text-secondary); + font-size: var(--text-sm); + font-family: inherit; + font-weight: 500; + cursor: pointer; + margin-bottom: var(--spacing-md); + transition: background var(--duration-fast) var(--ease-default), + border-color var(--duration-fast) var(--ease-default), + color var(--duration-fast) var(--ease-default); +} +.nodes-add-worker:hover { + background: var(--color-bg-raised, var(--color-bg-secondary)); + border-color: var(--color-border-strong); + color: var(--color-text-primary); +} + +/* Shared FilterBar layout — search strip + chip row + toggle strip. Lives + outside the .filter-bar chip row so the padding and wrapping behavior is + consistent between the Backends gallery and the System tabs. */ +.filter-bar-group { + display: flex; + flex-direction: column; + gap: var(--spacing-sm); + margin-bottom: var(--spacing-md); +} +.filter-bar-group__search { + min-width: 200px; + flex: 1; +} +.filter-bar-group__row { + display: flex; + gap: var(--spacing-md); + align-items: center; + flex-wrap: wrap; +} +.filter-bar-group__right { + display: flex; + gap: var(--spacing-md); + align-items: center; + flex-wrap: wrap; + padding-left: var(--spacing-md); + border-left: 1px solid var(--color-border-subtle); +} +.filter-bar-group__toggle { + display: flex; + align-items: center; + gap: var(--spacing-xs); + font-size: var(--text-xs); + color: var(--color-text-secondary); + cursor: pointer; + user-select: none; + white-space: nowrap; +} +.filter-btn__count { + display: inline-flex; + align-items: center; + justify-content: center; + margin-left: 6px; + min-width: 18px; + padding: 0 5px; + background: color-mix(in srgb, currentColor 18%, transparent); + border-radius: var(--radius-full); + font-size: 0.625rem; + font-weight: 600; +} + +/* Popover — floating surface anchored to a trigger element. Uses the .card + base so theming is free, adds z-index + fixed-position + scroll cap so it + behaves on tables with many rows. Kept deliberately unstyled beyond that + — content is expected to provide its own header/body structure. */ +.popover { + position: fixed; + z-index: 200; + min-width: 260px; + max-width: min(420px, 95vw); + max-height: min(420px, 70vh); + display: flex; + flex-direction: column; + padding: 0; /* sections provide their own padding */ + overflow: hidden; + box-shadow: var(--shadow-lg); + animation: popoverIn var(--duration-fast) var(--ease-default); +} + +@keyframes popoverIn { + from { opacity: 0; transform: translateY(-4px) scale(0.98); } + to { opacity: 1; transform: translateY(0) scale(1); } +} + +.popover__header { + display: flex; + align-items: center; + gap: var(--spacing-sm); + padding: var(--spacing-sm) var(--spacing-md); + border-bottom: 1px solid var(--color-border-subtle); + font-size: var(--text-sm); +} + +.popover__scroll { + overflow: auto; + padding: 0; +} + +.popover__table { + margin: 0; + width: 100%; +} +.popover__table th { + position: sticky; + top: 0; + background: var(--color-bg-raised, var(--color-bg-secondary)); + z-index: 1; +} + +/* Inline-table chip trigger — looks like a badge but is a button (cursor, + focus ring inherited from global :focus-visible). */ +.chip-trigger { + border: none; + cursor: pointer; + font-family: inherit; +} +.chip-trigger:hover { + filter: brightness(1.08); +} + +/* Truncate + ellipsize a long cell (e.g. OCI digest) without breaking the + table layout. Tooltip preserves the full value. */ +.cell-truncate { + max-width: 160px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +/* Compact empty-state used inside expanded drawer sections (e.g. "No + models loaded on this node"). Dimmer than the page-level .empty-state + because it lives inside another container and shouldn't compete with + the row's primary content. */ +.drawer-empty { + display: flex; + align-items: center; + gap: var(--spacing-sm); + padding: var(--spacing-sm) var(--spacing-md); + background: var(--color-bg-tertiary); + border: 1px dashed var(--color-border-subtle); + border-radius: var(--radius-md); + color: var(--color-text-muted); + font-size: var(--text-sm); +} +.drawer-empty > i { + font-size: var(--text-sm); + color: var(--color-text-muted); + opacity: 0.8; +} + +/* Node-status indicator — replaces the tiny bullet with a proper LED-style + dot next to a bold status label. Colors are applied inline from statusConfig + so one primitive handles healthy/unhealthy/draining/pending in one shape. */ +.node-status { + display: inline-flex; + align-items: center; + gap: 8px; + font-size: var(--text-sm); + font-weight: 600; +} +.node-status__dot { + width: 8px; + height: 8px; + border-radius: 50%; + box-shadow: 0 0 0 3px color-mix(in srgb, currentColor 15%, transparent); + flex-shrink: 0; +} + +/* Row-chevron cell — small 20px toggle used in table rows that expand. + The row itself is still clickable; the chevron provides the visible + affordance users were missing. */ +.row-chevron { + display: inline-flex; + align-items: center; + justify-content: center; + width: 20px; + height: 20px; + font-size: var(--text-xs); + color: var(--color-text-muted); + transition: transform var(--duration-fast) var(--ease-default); +} +.row-chevron.is-expanded { + transform: rotate(90deg); + color: var(--color-text-primary); +} + +/* Upgrade banner — the yellow strip operators see when updates are available. + Mirrors the gallery so both pages speak the same visual language. */ +.upgrade-banner { + display: flex; + align-items: center; + justify-content: space-between; + gap: var(--spacing-md); + padding: var(--spacing-sm) var(--spacing-md); + margin-bottom: var(--spacing-md); + background: var(--color-warning-light); + border: 1px solid var(--color-warning); + border-radius: var(--radius-md); + color: var(--color-warning); +} +.upgrade-banner__text { + display: inline-flex; + align-items: center; + gap: var(--spacing-sm); + font-weight: 500; + font-size: var(--text-sm); +} +.upgrade-banner__actions { + display: inline-flex; + gap: var(--spacing-xs); + align-items: center; +} /* Tabs */ .tabs { diff --git a/core/http/react-ui/src/components/FilterBar.jsx b/core/http/react-ui/src/components/FilterBar.jsx new file mode 100644 index 000000000..95f7b2135 --- /dev/null +++ b/core/http/react-ui/src/components/FilterBar.jsx @@ -0,0 +1,87 @@ +import Toggle from './Toggle' + +// FilterBar is the shared search + chip filter + toggles control strip that +// the Backends gallery pioneered. Pulled into its own component so the System +// page's two tabs stop looking like a different app — matching visual +// grammar + matching keyboard behavior. +// +// Props: +// search: controlled value for the search input. +// onSearchChange: (value) => void; null disables the search input entirely. +// searchPlaceholder: placeholder for the search input. +// filters: [{ key, label, icon }]; activeFilter is compared by key. +// Omit to hide the chip row. +// activeFilter: currently-selected filter key (use '' for "all" if +// that's the first entry in `filters`). +// onFilterChange: (key) => void. +// toggles: [{ key, label, icon?, checked, onChange }]; optional +// right-side toggle group (e.g. "Show all", "Development"). +// rightSlot: arbitrary element rendered after the toggles — use for +// sort controls or extra buttons. +export default function FilterBar({ + search, + onSearchChange, + searchPlaceholder = 'Search...', + filters, + activeFilter, + onFilterChange, + toggles, + rightSlot, +}) { + const hasFilters = Array.isArray(filters) && filters.length > 0 + const hasToggles = Array.isArray(toggles) && toggles.length > 0 + + return ( +
+ {onSearchChange && ( +
+ + onSearchChange(e.target.value)} + aria-label={searchPlaceholder} + /> +
+ )} + + {(hasFilters || hasToggles || rightSlot) && ( +
+ {hasFilters && ( +
+ {filters.map(f => ( + + ))} +
+ )} + + {(hasToggles || rightSlot) && ( +
+ {hasToggles && toggles.map(t => ( + + ))} + {rightSlot} +
+ )} +
+ )} +
+ ) +} diff --git a/core/http/react-ui/src/components/NodeDistributionChip.jsx b/core/http/react-ui/src/components/NodeDistributionChip.jsx new file mode 100644 index 000000000..952d8be0e --- /dev/null +++ b/core/http/react-ui/src/components/NodeDistributionChip.jsx @@ -0,0 +1,168 @@ +import { useRef, useState } from 'react' +import Popover from './Popover' + +// NodeDistributionChip shows where something is installed/loaded across a +// cluster. Used by both Manage → Backends (per-row Nodes column, data = +// gallery NodeBackendRef with version/digest) and by the Models tab (data = +// LoadedOn with state/status). Supports arbitrary cluster size — small +// clusters render node-name chips inline, larger clusters collapse to a +// summary chip and reveal the full per-node table in a popover on click. +// +// Field names are intentionally forgiving: both {node_name, node_status} and +// {NodeName, NodeStatus} are supported so the component works whether it's +// reading directly off the JSON or off a hydrated class. +// +// Props: +// nodes: array of node refs (see shape below). +// compactThreshold: max nodes to render inline before collapsing (default 3). +// context: 'backends' (default) shows version/digest; 'models' +// shows state. +// emptyLabel: what to render when nodes is empty (default "—"). +export default function NodeDistributionChip({ + nodes, + compactThreshold = 3, + context = 'backends', + emptyLabel = '—', +}) { + const triggerRef = useRef(null) + const [open, setOpen] = useState(false) + + const list = Array.isArray(nodes) ? nodes : [] + if (list.length === 0) { + return {emptyLabel} + } + + const getName = n => n.node_name ?? n.NodeName ?? '' + const getStatus = n => n.node_status ?? n.NodeStatus ?? '' + const getState = n => n.state ?? n.State ?? '' + const getVersion = n => n.version ?? n.Version ?? '' + const getDigest = n => n.digest ?? n.Digest ?? '' + + // Inline mode: render every node as its own chip. Good for small clusters + // where seeing the names directly is more useful than a summary. + if (list.length <= compactThreshold) { + return ( +
+ {list.map(n => { + const status = getStatus(n) + const variant = status === 'healthy' ? 'badge-success' + : status === 'draining' ? 'badge-info' + : 'badge-warning' + const title = context === 'models' + ? `${getName(n)} — ${getState(n)} (${status})` + : `${getName(n)} — ${status}${getVersion(n) ? ` · v${getVersion(n)}` : ''}` + return ( + + {getName(n)} + + ) + })} +
+ ) + } + + // Summary mode for anything bigger. Count unhealthy/offline explicitly so + // the chip tells an operator at-a-glance whether to click in. "Drift" for + // backends = more than one (version, digest) tuple across healthy nodes. + const total = list.length + const offline = list.filter(n => { + const s = getStatus(n) + return s !== 'healthy' && s !== 'draining' + }).length + const drift = context === 'backends' ? countDrift(list) : 0 + const severity = offline > 0 || drift > 0 ? 'badge-warning' : 'badge-info' + + return ( + <> + + setOpen(false)} + ariaLabel={context === 'models' ? 'Model distribution' : 'Backend distribution'} + > +
+ Installed on {total} node{total === 1 ? '' : 's'} + {offline > 0 && {offline} offline} + {drift > 0 && {drift} drift} +
+
+ + + + + + {context === 'models' ? : <> + + + } + + + + {list.map(n => ( + + + + {context === 'models' ? ( + + ) : ( + <> + + + + )} + + ))} + +
NodeStatusStateVersionDigest
{getName(n)} + + {getStatus(n)} + + {getState(n) || '—'}{getVersion(n) ? `v${getVersion(n)}` : '—'} + {getDigest(n) ? shortenDigest(getDigest(n)) : '—'} +
+
+
+ + ) +} + +// countDrift counts nodes whose (version, digest) disagrees with the cluster +// majority. Mirrors the backend summarizeNodeDrift logic so the UI number +// matches what CheckUpgradesAgainst emits in UpgradeInfo.NodeDrift. +function countDrift(nodes) { + if (nodes.length <= 1) return 0 + const counts = new Map() + for (const n of nodes) { + const key = `${n.version ?? n.Version ?? ''}|${n.digest ?? n.Digest ?? ''}` + counts.set(key, (counts.get(key) || 0) + 1) + } + if (counts.size === 1) return 0 // unanimous + let topKey = '' + let topCount = 0 + for (const [k, v] of counts.entries()) { + if (v > topCount) { topKey = k; topCount = v } + } + return nodes.length - topCount +} + +// shortenDigest trims a full OCI digest to the common 12-char form used in +// docker/oci tooling. Falls back to the raw value if it doesn't match. +function shortenDigest(digest) { + const m = /^(sha\d+:)?([a-f0-9]+)$/i.exec(digest) + if (!m) return digest + const hex = m[2] + return (m[1] ?? '') + hex.slice(0, 12) +} diff --git a/core/http/react-ui/src/components/Popover.jsx b/core/http/react-ui/src/components/Popover.jsx new file mode 100644 index 000000000..96a9e217e --- /dev/null +++ b/core/http/react-ui/src/components/Popover.jsx @@ -0,0 +1,86 @@ +import { useEffect, useRef, useState, useCallback } from 'react' + +// Minimal popover: positions itself below-right of the trigger's bounding box, +// flips above when there isn't room below, closes on outside click or Escape, +// returns focus to the trigger. Uses the existing .card surface so it picks +// up theme/border/shadow automatically — no new theming work. +// +// Props: +// anchor: ref to the trigger DOMElement (required) +// open: boolean +// onClose: () => void +// children: popover body +// ariaLabel: accessible label for the dialog +export default function Popover({ anchor, open, onClose, children, ariaLabel }) { + const popoverRef = useRef(null) + const [pos, setPos] = useState({ top: 0, left: 0, flipped: false }) + + // Compute position from the anchor's bounding box whenever we open or the + // viewport changes. 240px is the minimum width we'll reserve; bigger content + // grows naturally. + const reposition = useCallback(() => { + if (!anchor?.current) return + const rect = anchor.current.getBoundingClientRect() + const popoverHeight = popoverRef.current?.offsetHeight ?? 0 + const spaceBelow = window.innerHeight - rect.bottom + const flipped = popoverHeight > spaceBelow - 16 && rect.top > popoverHeight + const top = flipped ? rect.top - popoverHeight - 8 : rect.bottom + 8 + // Prefer left-aligned; clamp so we don't go off-screen right. + const left = Math.min(rect.left, window.innerWidth - 320) + setPos({ top, left: Math.max(8, left), flipped }) + }, [anchor]) + + useEffect(() => { + if (!open) return + reposition() + window.addEventListener('resize', reposition) + window.addEventListener('scroll', reposition, true) + return () => { + window.removeEventListener('resize', reposition) + window.removeEventListener('scroll', reposition, true) + } + }, [open, reposition]) + + // Close on outside click or Escape. Mousedown (not click) so the close + // happens before a parent handler could re-trigger us. + useEffect(() => { + if (!open) return + const onMouseDown = (e) => { + if (popoverRef.current && !popoverRef.current.contains(e.target) && !anchor?.current?.contains(e.target)) { + onClose() + } + } + const onKey = (e) => { if (e.key === 'Escape') onClose() } + document.addEventListener('mousedown', onMouseDown) + document.addEventListener('keydown', onKey) + return () => { + document.removeEventListener('mousedown', onMouseDown) + document.removeEventListener('keydown', onKey) + } + }, [open, onClose, anchor]) + + // Return focus to the trigger when the popover closes — keyboard users + // shouldn't have to tab back through the whole page to find their spot. + useEffect(() => { + if (!open && anchor?.current) { + // requestAnimationFrame so the close is painted before focus jumps; + // otherwise screen readers announce the trigger mid-transition. + const raf = requestAnimationFrame(() => anchor.current?.focus?.()) + return () => cancelAnimationFrame(raf) + } + }, [open, anchor]) + + if (!open) return null + + return ( +
+ {children} +
+ ) +} diff --git a/core/http/react-ui/src/pages/Manage.jsx b/core/http/react-ui/src/pages/Manage.jsx index bd6bda938..3f11a7744 100644 --- a/core/http/react-ui/src/pages/Manage.jsx +++ b/core/http/react-ui/src/pages/Manage.jsx @@ -3,6 +3,8 @@ import { useNavigate, useOutletContext, useSearchParams } from 'react-router-dom import ResourceMonitor from '../components/ResourceMonitor' import ConfirmDialog from '../components/ConfirmDialog' import Toggle from '../components/Toggle' +import NodeDistributionChip from '../components/NodeDistributionChip' +import FilterBar from '../components/FilterBar' import { useModels } from '../hooks/useModels' import { backendControlApi, modelsApi, backendsApi, systemApi, nodesApi } from '../utils/api' @@ -11,6 +13,22 @@ const TABS = [ { key: 'backends', label: 'Backends', icon: 'fa-server' }, ] +// formatInstalledAt renders an installed_at timestamp as a short relative/abs +// string suitable for dense tables. Returns the raw value if parsing fails so +// we never display "Invalid Date". +function formatInstalledAt(value) { + if (!value) return '—' + const d = new Date(value) + if (isNaN(d.getTime())) return value + const now = Date.now() + const diffMin = Math.floor((now - d.getTime()) / 60000) + if (diffMin < 1) return 'just now' + if (diffMin < 60) return `${diffMin}m ago` + if (diffMin < 60 * 24) return `${Math.floor(diffMin / 60)}h ago` + if (diffMin < 60 * 24 * 30) return `${Math.floor(diffMin / (60 * 24))}d ago` + return d.toISOString().slice(0, 10) +} + export default function Manage() { const { addToast } = useOutletContext() const navigate = useNavigate() @@ -28,6 +46,24 @@ export default function Manage() { const [distributedMode, setDistributedMode] = useState(false) const [togglingModels, setTogglingModels] = useState(new Set()) const [pinningModels, setPinningModels] = useState(new Set()) + // Filter state per tab. Persisted in the URL query so switching tabs + // doesn't lose the filter the operator just set. + const [modelsSearch, setModelsSearch] = useState(() => searchParams.get('mq') || '') + const [modelsFilter, setModelsFilter] = useState(() => searchParams.get('mf') || 'all') + const [backendsSearch, setBackendsSearch] = useState(() => searchParams.get('bq') || '') + const [backendsFilter, setBackendsFilter] = useState(() => searchParams.get('bf') || 'all') + + // Sync filter state into the URL so deep-links + tab switches survive. + useEffect(() => { + const p = new URLSearchParams(searchParams) + const setOrDelete = (k, v) => { if (v && v !== 'all') p.set(k, v); else p.delete(k) } + setOrDelete('mq', modelsSearch) + setOrDelete('mf', modelsFilter) + setOrDelete('bq', backendsSearch) + setOrDelete('bf', backendsFilter) + setSearchParams(p, { replace: true }) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [modelsSearch, modelsFilter, backendsSearch, backendsFilter]) const handleTabChange = (tab) => { setActiveTab(tab) @@ -64,6 +100,35 @@ export default function Manage() { nodesApi.list().then(() => setDistributedMode(true)).catch(() => {}) }, [fetchLoadedModels, fetchBackends]) + // Auto-refresh the Models tab every 10s in distributed mode so ghost models + // (loaded on a worker but absent from this frontend's in-memory cache) + // clear on their own without the user clicking Update. + const [lastSyncedAt, setLastSyncedAt] = useState(() => Date.now()) + const [nowTick, setNowTick] = useState(() => Date.now()) + useEffect(() => { + if (!distributedMode || activeTab !== 'models') return + const interval = setInterval(() => { + refetchModels() + fetchLoadedModels() + setLastSyncedAt(Date.now()) + }, 10000) + return () => clearInterval(interval) + }, [distributedMode, activeTab, refetchModels, fetchLoadedModels]) + + // Drive the "last synced Ns ago" label without over-rendering the table. + useEffect(() => { + if (!distributedMode) return + const interval = setInterval(() => setNowTick(Date.now()), 1000) + return () => clearInterval(interval) + }, [distributedMode]) + const lastSyncedAgo = (() => { + const s = Math.max(0, Math.floor((nowTick - lastSyncedAt) / 1000)) + if (s < 5) return 'just now' + if (s < 60) return `${s}s ago` + const m = Math.floor(s / 60) + return `${m}m ago` + })() + // Fetch available backend upgrades useEffect(() => { if (activeTab === 'backends') { @@ -196,6 +261,29 @@ export default function Manage() { } } + const [upgradingAll, setUpgradingAll] = useState(false) + const [showOnlyUpgradable, setShowOnlyUpgradable] = useState(false) + const handleUpgradeAll = async () => { + const names = Object.keys(upgrades) + if (names.length === 0) return + setUpgradingAll(true) + try { + // Serial upgrade — matches the gallery's Upgrade All behavior. + // Each backend upgrade is itself a cluster-wide fan-out, so parallel + // calls would multiply load on every worker. + for (const name of names) { + try { + await backendsApi.upgrade(name) + } catch (err) { + addToast(`Upgrade failed for ${name}: ${err.message}`, 'error') + } + } + addToast(`Upgrade started for ${names.length} backend${names.length === 1 ? '' : 's'}`, 'info') + } finally { + setUpgradingAll(false) + } + } + const handleDeleteBackend = (name) => { setConfirmDialog({ title: 'Delete Backend', @@ -227,29 +315,74 @@ export default function Manage() { {/* Tabs */}
- {TABS.map(t => ( - - ))} + {TABS.map(t => { + const upgradeCount = t.key === 'backends' ? Object.keys(upgrades).length : 0 + return ( + + ) + })}
{/* Models Tab */} - {activeTab === 'models' && ( + {activeTab === 'models' && (() => { + // Computed filters — done here so the result is available both to + // the FilterBar counts and to the table body. + const MODEL_FILTERS = [ + { key: 'all', label: 'All', icon: 'fa-layer-group' }, + { key: 'running', label: 'Running', icon: 'fa-circle-play' }, + { key: 'idle', label: 'Idle', icon: 'fa-pause' }, + { key: 'disabled', label: 'Disabled', icon: 'fa-ban' }, + { key: 'pinned', label: 'Pinned', icon: 'fa-thumbtack' }, + ...(distributedMode ? [{ key: 'distributed', label: 'Distributed', icon: 'fa-server' }] : []), + ] + const passesFilter = (m) => { + if (modelsFilter === 'running') return !m.disabled && (loadedModelIds.has(m.id) || (m.loaded_on && m.loaded_on.length > 0)) + if (modelsFilter === 'idle') return !m.disabled && !loadedModelIds.has(m.id) && !(m.loaded_on && m.loaded_on.length > 0) + if (modelsFilter === 'disabled') return !!m.disabled + if (modelsFilter === 'pinned') return !!m.pinned + if (modelsFilter === 'distributed') return Array.isArray(m.loaded_on) && m.loaded_on.length > 0 + return true + } + const q = modelsSearch.trim().toLowerCase() + const passesSearch = (m) => !q || (m.id || '').toLowerCase().includes(q) || (m.backend || '').toLowerCase().includes(q) + const visibleModels = models.filter(m => passesFilter(m) && passesSearch(m)) + return (
-
- -
+ + {distributedMode && ( + + Last synced {lastSyncedAgo} + + )} + + + )} + /> {modelsLoading ? (
@@ -274,6 +407,12 @@ export default function Manage() {
+ ) : visibleModels.length === 0 ? ( +
+ +

No models match the current filter.

+ +
) : (
@@ -288,7 +427,7 @@ export default function Manage() { - {models.map(model => ( + {visibleModels.map(model => ( {/* Enable/Disable toggle */} - {/* Status */} + {/* Status / Distribution */} {/* Backend */}
@@ -329,21 +468,33 @@ export default function Manage() { - {model.disabled ? ( - - Disabled - - ) : loadedModelIds.has(model.id) ? ( - - Running - - ) : ( - - Idle - - )} +
+ {model.disabled ? ( + + Disabled + + ) : model.loaded_on && model.loaded_on.length > 0 ? ( + // Distributed mode: surface where the model is + // actually loaded. Shared chip scales to any cluster + // size (inline for <=3, popover for larger). + + ) : loadedModelIds.has(model.id) ? ( + + Running + + ) : ( + + Idle + + )} + {model.source === 'registry-only' && ( + + Adopted + + )} +
@@ -394,11 +545,34 @@ export default function Manage() { )} - )} + ) + })()} {/* Backends Tab */} {activeTab === 'backends' && (
+ {/* Upgrade banner — mirrors the gallery so operators can't miss updates */} + {!backendsLoading && Object.keys(upgrades).length > 0 && ( +
+
+ + + {Object.keys(upgrades).length} backend{Object.keys(upgrades).length === 1 ? ' has' : 's have'} updates available + +
+
+ +
+
+ )} + {backendsLoading ? (
Loading backends... @@ -419,109 +593,217 @@ export default function Manage() {
- ) : ( -
+ ) : (() => { + // Count chip badges: show N in the filter buttons so operators can + // see at a glance how their chips bucket the list. + const upgradableCount = backends.filter(b => upgrades[b.Name]).length + const userCount = backends.filter(b => !b.IsSystem).length + const systemCount = backends.filter(b => b.IsSystem).length + const metaCount = backends.filter(b => b.IsMeta).length + const offlineCount = backends.filter(b => { + const n = b.Nodes || b.nodes || [] + return n.some(x => { + const s = x.node_status || x.NodeStatus + return s && s !== 'healthy' && s !== 'draining' + }) + }).length + + const BACKEND_FILTERS = [ + { key: 'all', label: 'All', icon: 'fa-layer-group', count: backends.length }, + { key: 'user', label: 'User', icon: 'fa-download', count: userCount }, + { key: 'system', label: 'System', icon: 'fa-shield-alt', count: systemCount }, + { key: 'meta', label: 'Meta', icon: 'fa-layer-group', count: metaCount }, + ...(upgradableCount > 0 ? [{ key: 'upgradable', label: 'Updates', icon: 'fa-arrow-up', count: upgradableCount }] : []), + ...(distributedMode && offlineCount > 0 ? [{ key: 'offline', label: 'Offline nodes', icon: 'fa-exclamation-circle', count: offlineCount }] : []), + ] + const q = backendsSearch.trim().toLowerCase() + const passesSearch = (b) => !q + || (b.Name || '').toLowerCase().includes(q) + || (b.Metadata?.alias || '').toLowerCase().includes(q) + || (b.Metadata?.meta_backend_for || '').toLowerCase().includes(q) + const passesFilter = (b) => { + switch (backendsFilter) { + case 'user': return !b.IsSystem + case 'system': return !!b.IsSystem + case 'meta': return !!b.IsMeta + case 'upgradable': return !!upgrades[b.Name] + case 'offline': { + const n = b.Nodes || b.nodes || [] + return n.some(x => { + const s = x.node_status || x.NodeStatus + return s && s !== 'healthy' && s !== 'draining' + }) + } + default: return true + } + } + // Legacy "showOnlyUpgradable" toggle is now the 'upgradable' chip — + // keep backward-compat by mapping it onto the new filter. + if (showOnlyUpgradable && backendsFilter !== 'upgradable') { + // One-shot reconciliation — the old state becomes the new chip. + setBackendsFilter('upgradable') + setShowOnlyUpgradable(false) + } + const visibleBackends = backends.filter(b => passesFilter(b) && passesSearch(b)) + if (visibleBackends.length === 0) { + return ( + <> + +
+ +

No backends match the current filter.

+ +
+ + ) + } + return ( + <> + +
- + + {distributedMode && } + - {backends.map((backend, i) => ( + {visibleBackends.map((backend, i) => { + const upgradeInfo = upgrades[backend.Name] + const hasDrift = upgradeInfo?.node_drift?.length > 0 + const nodes = backend.Nodes || backend.nodes || [] + return ( + {distributedMode && ( + + )} + - ))} + ) + })}
Name TypeMetadataVersionNodesInstalled Actions
-
- - {backend.Name} +
+ + {backend.Name} + {backend.Metadata?.alias && ( + alias: {backend.Metadata.alias} + )} + {backend.Metadata?.meta_backend_for && ( + for: {backend.Metadata.meta_backend_for} + )}
-
+
{backend.IsSystem ? ( - - System + + System ) : ( - - User + + User )} {backend.IsMeta && ( - - Meta + + Meta )}
-
- {backend.Metadata?.alias && ( - - - Alias: {backend.Metadata.alias} +
+ {backend.Metadata?.version ? ( + v{backend.Metadata.version} + ) : ( + + )} + {upgradeInfo && ( + + + {upgradeInfo.available_version ? ` v${upgradeInfo.available_version}` : ' Update available'} )} - {backend.Metadata?.meta_backend_for && ( - - - For: {backend.Metadata.meta_backend_for} + {hasDrift && ( + `${d.node_name}${d.version ? ' v' + d.version : ''}`).join(', ')}`} + > + + {' '}Drift: {upgradeInfo.node_drift.length} node{upgradeInfo.node_drift.length === 1 ? '' : 's'} )} - {backend.Metadata?.version && ( - - - Version: v{backend.Metadata.version} - {upgrades[backend.Name] && ( - - → v{upgrades[backend.Name].available_version} - - )} - - )} - {backend.Metadata?.installed_at && ( - - - {backend.Metadata.installed_at} - - )} - {!backend.Metadata?.alias && !backend.Metadata?.meta_backend_for && !backend.Metadata?.installed_at && '—'}
+ + -
- {!backend.IsSystem ? ( + + {backend.Metadata?.installed_at ? formatInstalledAt(backend.Metadata.installed_at) : '—'} + +
+
+ {backend.IsSystem ? ( + + Protected + + ) : ( <> + {upgradeInfo ? ( + + ) : ( + + )} - - ) : ( - )}
-
- )} +
+ + ) + })()} )} diff --git a/core/http/react-ui/src/pages/Nodes.jsx b/core/http/react-ui/src/pages/Nodes.jsx index 0903dfb34..0e0698241 100644 --- a/core/http/react-ui/src/pages/Nodes.jsx +++ b/core/http/react-ui/src/pages/Nodes.jsx @@ -51,15 +51,22 @@ const modelStateConfig = { idle: { bg: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)', border: 'var(--color-border-subtle)' }, } -function StatCard({ icon, label, value, color }) { +function StatCard({ icon, label, value, color, accentVar }) { + // accentVar: optional CSS variable for the left edge + icon chip, e.g. + // "--color-success". When unset the card reads neutral — used for simple + // counts so they don't compete with the semantic cards for attention. + const accent = color || (accentVar ? `var(${accentVar})` : 'var(--color-text-primary)') return ( -
-
- - {label} +
+
+
{label}
+
{value}
-
- {value} +
+
) @@ -543,45 +550,24 @@ export default function Nodes() {
{/* Tabs */} -
+
{showTips && } @@ -685,23 +671,28 @@ export default function Nodes() { >
- + {canExpand && ( + + )} +
-
{node.name}
-
+
{node.name}
+
{node.address}
{node.labels && Object.keys(node.labels).length > 0 && (
{Object.entries(node.labels).slice(0, 5).map(([k, v]) => ( - {k}={v} ))} {Object.keys(node.labels).length > 5 && ( - + +{Object.keys(node.labels).length - 5} more )} @@ -711,12 +702,10 @@ export default function Nodes() {
-
- - - {status.label} - -
+ + + {status.label} +
{hasGPU && totalVRAMStr ? ( @@ -745,38 +734,37 @@ export default function Nodes() { -
e.stopPropagation()}> +
e.stopPropagation()}> {node.status === 'pending' && ( )} {node.status === 'draining' && ( )} {node.status !== 'draining' && node.status !== 'pending' && ( )} @@ -794,7 +782,10 @@ export default function Nodes() { {!models ? ( ) : models.length === 0 ? ( -

No models loaded on this node

+
+ + No models loaded on this node yet. +
) : ( @@ -870,7 +861,10 @@ export default function Nodes() { {!backends ? ( ) : backends.length === 0 ? ( -

No backends installed on this node

+
+ + No backends installed on this node. Install one from the gallery to schedule models here. +
) : (
diff --git a/core/http/routes/ui_api.go b/core/http/routes/ui_api.go index b6db24e09..8d089f873 100644 --- a/core/http/routes/ui_api.go +++ b/core/http/routes/ui_api.go @@ -510,28 +510,89 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model modelConfigs := cl.GetAllModelsConfigs() modelsWithoutConfig, _ := galleryop.ListModels(cl, ml, config.NoFilterFn, galleryop.LOOSE_ONLY) + type loadedOn struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + State string `json:"state"` + NodeStatus string `json:"node_status"` + } type modelCapability struct { - ID string `json:"id"` - Capabilities []string `json:"capabilities"` - Backend string `json:"backend"` - Disabled bool `json:"disabled"` - Pinned bool `json:"pinned"` + ID string `json:"id"` + Capabilities []string `json:"capabilities"` + Backend string `json:"backend"` + Disabled bool `json:"disabled"` + Pinned bool `json:"pinned"` + // LoadedOn is populated only when the node registry is active + // (distributed mode). Lets the UI show "loaded on worker-1" without + // the operator having to expand every node manually. An empty slice + // with nil reports "no loaded replicas" vs. nil reports "not in + // cluster mode" — the frontend treats both as "no distribution info". + LoadedOn []loadedOn `json:"loaded_on,omitempty"` + // Source="registry-only" marks models adopted from the cluster that + // have no local config yet (ghosts that the reconciler discovered). + Source string `json:"source,omitempty"` + } + + // Join with the node registry when we have one (distributed mode). A + // single registry fetch + map join beats per-model queries for the + // 100-model case. + var loadedByModel map[string][]loadedOn + if ds := applicationInstance.Distributed(); ds != nil && ds.Registry != nil { + nodeModels, err := ds.Registry.ListAllLoadedModels(c.Request().Context()) + if err == nil { + allNodes, _ := ds.Registry.List(c.Request().Context()) + nameByID := make(map[string]string, len(allNodes)) + statusByID := make(map[string]string, len(allNodes)) + for _, n := range allNodes { + nameByID[n.ID] = n.Name + statusByID[n.ID] = n.Status + } + loadedByModel = make(map[string][]loadedOn) + for _, nm := range nodeModels { + loadedByModel[nm.ModelName] = append(loadedByModel[nm.ModelName], loadedOn{ + NodeID: nm.NodeID, + NodeName: nameByID[nm.NodeID], + State: nm.State, + NodeStatus: statusByID[nm.NodeID], + }) + } + } } result := make([]modelCapability, 0, len(modelConfigs)+len(modelsWithoutConfig)) + seen := make(map[string]bool, len(modelConfigs)+len(modelsWithoutConfig)) for _, cfg := range modelConfigs { + seen[cfg.Name] = true result = append(result, modelCapability{ ID: cfg.Name, Capabilities: cfg.KnownUsecaseStrings, Backend: cfg.Backend, Disabled: cfg.IsDisabled(), Pinned: cfg.IsPinned(), + LoadedOn: loadedByModel[cfg.Name], }) } for _, name := range modelsWithoutConfig { + seen[name] = true result = append(result, modelCapability{ ID: name, Capabilities: []string{}, + LoadedOn: loadedByModel[name], + }) + } + // Emit entries for cluster models that have no local config — these + // are the actual ghosts. Without this the operator would have no way + // to see a model the cluster is running if its config file wasn't + // synced to this frontend's filesystem. + for name, loc := range loadedByModel { + if seen[name] { + continue + } + result = append(result, modelCapability{ + ID: name, + Capabilities: []string{}, + LoadedOn: loc, + Source: "registry-only", }) } diff --git a/core/services/advisorylock/keys.go b/core/services/advisorylock/keys.go index d5378a5d1..277817229 100644 --- a/core/services/advisorylock/keys.go +++ b/core/services/advisorylock/keys.go @@ -11,4 +11,5 @@ const ( KeyHealthCheck int64 = 104 KeySchemaMigrate int64 = 105 KeyBackendUpgradeCheck int64 = 106 + KeyStateReconciler int64 = 107 ) diff --git a/core/services/galleryop/service.go b/core/services/galleryop/service.go index fef638425..3d77d11d6 100644 --- a/core/services/galleryop/service.go +++ b/core/services/galleryop/service.go @@ -57,6 +57,16 @@ func (g *GalleryService) SetBackendManager(b BackendManager) { g.backendManager = b } +// BackendManager returns the current backend manager. Callers like the +// periodic upgrade checker need this so they run CheckUpgrades through the +// distributed implementation (which asks workers) instead of the frontend's +// local filesystem — the latter is always empty in distributed deployments. +func (g *GalleryService) BackendManager() BackendManager { + g.Lock() + defer g.Unlock() + return g.backendManager +} + // SetNATSClient sets the NATS client for distributed progress publishing. func (g *GalleryService) SetNATSClient(nc messaging.Publisher) { g.Lock() diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 397f63ff0..3e9af53a9 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -157,6 +157,12 @@ type NodeBackendInfo struct { IsMeta bool `json:"is_meta"` InstalledAt string `json:"installed_at,omitempty"` GalleryURL string `json:"gallery_url,omitempty"` + // Version, URI and Digest enable cluster-wide upgrade detection — + // without them, the frontend cannot tell whether the installed OCI + // image matches the gallery entry, and upgrades silently never surface. + Version string `json:"version,omitempty"` + URI string `json:"uri,omitempty"` + Digest string `json:"digest,omitempty"` } // SubjectNodeBackendStop tells a worker node to stop its gRPC backend process. diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 62cb32552..e524756da 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -10,6 +10,7 @@ import ( "github.com/mudler/LocalAI/core/gallery" "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/xlog" "github.com/nats-io/nats.go" ) @@ -53,6 +54,7 @@ type DistributedBackendManager struct { adapter *RemoteUnloaderAdapter registry *NodeRegistry backendGalleries []config.Gallery + systemState *system.SystemState } // NewDistributedBackendManager creates a DistributedBackendManager. @@ -62,46 +64,161 @@ func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model adapter: adapter, registry: registry, backendGalleries: appConfig.BackendGalleries, + systemState: appConfig.SystemState, } } +// NodeOpStatus is the per-node outcome of a backend lifecycle operation. +// Returned as part of BackendOpResult so the frontend can surface exactly +// what happened on each worker instead of a single joined error string. +type NodeOpStatus struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + Status string `json:"status"` // "success" | "queued" | "error" + Error string `json:"error,omitempty"` +} + +// BackendOpResult aggregates per-node outcomes. +type BackendOpResult struct { + Nodes []NodeOpStatus `json:"nodes"` +} + +// enqueueAndDrainBackendOp is the shared scaffolding for +// delete/install/upgrade. Every non-pending node gets a pending_backend_ops +// row (intent is durable even if the node is offline). Currently-healthy +// nodes get an immediate attempt; success deletes the row, failure records +// the error and leaves the row for the reconciler to retry. +// +// `apply` is the NATS round-trip for one node. Returning an error keeps the +// row in the queue and marks the per-node status as "error"; returning nil +// deletes the row and reports "success". For non-healthy nodes the status +// is "queued" — no attempt is made right now, reconciler will pick it up +// when the node returns. +func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) { + allNodes, err := d.registry.List(ctx) + if err != nil { + return BackendOpResult{}, err + } + + result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))} + for _, node := range allNodes { + // Pending nodes haven't been approved yet — no intent to apply. + if node.Status == StatusPending { + continue + } + if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { + xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "error", + Error: fmt.Sprintf("enqueue failed: %v", err), + }) + continue + } + + if node.Status != StatusHealthy { + // Intent is recorded; reconciler will retry when the node recovers. + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "queued", + Error: fmt.Sprintf("node %s, will retry when healthy", node.Status), + }) + continue + } + + applyErr := apply(node) + if applyErr == nil { + // Find the row we just upserted and delete it; cheap but requires + // a lookup since UpsertPendingBackendOp doesn't return the ID. + if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil { + xlog.Debug("Failed to clear pending backend op after success", "error", err) + } + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "success", + }) + continue + } + + // Record failure for backoff. If it's an ErrNoResponders, the node's + // gone AWOL — mark unhealthy so the router stops picking it too. + errMsg := applyErr.Error() + if errors.Is(applyErr, nats.ErrNoResponders) { + xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) + d.registry.MarkUnhealthy(ctx, node.ID) + } + if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil { + _ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg) + } + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg, + }) + } + return result, nil +} + +// findPendingRow looks up the ID of a pending_backend_ops row by its +// composite key. Used to hand off to RecordPendingBackendOpFailure / +// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same +// composite key. +func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) { + var row PendingBackendOp + if err := d.registry.db.WithContext(ctx). + Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op). + First(&row).Error; err != nil { + return 0, err + } + return row.ID, nil +} + +// deletePendingRow removes the queue row keyed by (nodeID, backend, op). +func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error { + return d.registry.db.WithContext(ctx). + Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op). + Delete(&PendingBackendOp{}).Error +} + +// DeleteBackend fans out backend deletion to every known node. The previous +// implementation silently skipped non-healthy nodes, which meant zombies +// reappeared once those nodes returned. Now the intent is durable — see +// enqueueAndDrainBackendOp — and the reconciler catches up later. func (d *DistributedBackendManager) DeleteBackend(name string) error { - // Try local deletion but ignore "not found" errors — in distributed mode - // the frontend node typically doesn't have backends installed locally; - // they only exist on worker nodes. + // Local delete first (frontend rarely has backends installed in + // distributed mode, but the gallery operation still expects it; ignore + // "not found" which is the common case). if err := d.local.DeleteBackend(name); err != nil { if !errors.Is(err, gallery.ErrBackendNotFound) { return err } xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name) } - // Fan out backend.delete to all healthy nodes - allNodes, listErr := d.registry.List(context.Background()) - if listErr != nil { - xlog.Warn("Failed to list nodes for backend deletion fan-out", "error", listErr) - return listErr - } - var errs []error - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } - if _, delErr := d.adapter.DeleteBackend(node.ID, name); delErr != nil { - if errors.Is(delErr, nats.ErrNoResponders) { - // Node's NATS subscription is gone — likely restarted with a new ID. - // Mark it unhealthy so future fan-outs skip it. - xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - xlog.Warn("Failed to propagate backend deletion to worker", "node", node.Name, "backend", name, "error", delErr) - errs = append(errs, fmt.Errorf("node %s: %w", node.Name, delErr)) - } - } - return errors.Join(errs...) + + ctx := context.Background() + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + _, err := d.adapter.DeleteBackend(node.ID, name) + return err + }) + return err } -// ListBackends aggregates installed backends from all healthy worker nodes. +// DeleteBackendDetailed is the per-node-result variant called by the HTTP +// handler so the UI can render a per-node status drawer. DeleteBackend still +// returns error-only for callers that don't care about node breakdown. +func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) { + if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) { + return BackendOpResult{}, err + } + return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + _, err := d.adapter.DeleteBackend(node.ID, name) + return err + }) +} + +// ListBackends aggregates installed backends from all worker nodes, preserving +// per-node attribution. Each SystemBackend.Nodes entry records which node has +// the backend and the version/digest it reports. The top-level Metadata is +// populated from the first node seen so single-node-minded callers still work. +// +// Pending/offline/draining nodes are skipped because they aren't expected to +// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders +// then marks them unhealthy and the loop continues. func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) { result := make(gallery.SystemBackends) allNodes, err := d.registry.List(context.Background()) @@ -110,7 +227,7 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro } for _, node := range allNodes { - if node.Status != StatusHealthy { + if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining { continue } reply, err := d.adapter.ListBackends(node.ID) @@ -128,89 +245,92 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro continue } for _, b := range reply.Backends { - if _, exists := result[b.Name]; !exists { - result[b.Name] = gallery.SystemBackend{ + ref := gallery.NodeBackendRef{ + NodeID: node.ID, + NodeName: node.Name, + NodeStatus: node.Status, + Version: b.Version, + Digest: b.Digest, + URI: b.URI, + InstalledAt: b.InstalledAt, + } + entry, exists := result[b.Name] + if !exists { + entry = gallery.SystemBackend{ Name: b.Name, IsSystem: b.IsSystem, IsMeta: b.IsMeta, Metadata: &gallery.BackendMetadata{ + Name: b.Name, InstalledAt: b.InstalledAt, GalleryURL: b.GalleryURL, + Version: b.Version, + URI: b.URI, + Digest: b.Digest, }, } } + entry.Nodes = append(entry.Nodes, ref) + result[b.Name] = entry } } return result, nil } -// InstallBackend fans out backend installation to all healthy worker nodes. +// InstallBackend fans out installation through the pending-ops queue so +// non-healthy nodes get retried when they come back instead of being silently +// skipped. Reply success from the NATS round-trip deletes the queue row; +// reply.Success==false is treated as an error so the row stays for retry. func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error { - allNodes, err := d.registry.List(context.Background()) - if err != nil { - return err - } - galleriesJSON, _ := json.Marshal(op.Galleries) backendName := op.GalleryElementName - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON)) if err != nil { - if errors.Is(err, nats.ErrNoResponders) { - xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - xlog.Warn("Failed to install backend on worker", "node", node.Name, "backend", backendName, "error", err) - continue + return err } if !reply.Success { - xlog.Warn("Backend install failed on worker", "node", node.Name, "backend", backendName, "error", reply.Error) + return fmt.Errorf("install failed: %s", reply.Error) } - } - return nil + return nil + }) + return err } -// UpgradeBackend fans out a backend upgrade to all healthy worker nodes. -// TODO: Add dedicated NATS subject for upgrade (currently reuses install with force flag) +// UpgradeBackend reuses the install NATS subject (the worker re-downloads +// from the gallery). Same queue semantics as Install/Delete. func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { - allNodes, err := d.registry.List(context.Background()) - if err != nil { - return err - } - galleriesJSON, _ := json.Marshal(d.backendGalleries) - var errs []error - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } - // Reuse install endpoint which will re-download the backend (force mode) + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON)) if err != nil { - if errors.Is(err, nats.ErrNoResponders) { - xlog.Warn("No NATS responders for node during upgrade, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - errs = append(errs, fmt.Errorf("node %s: %w", node.Name, err)) - continue + return err } if !reply.Success { - errs = append(errs, fmt.Errorf("node %s: %s", node.Name, reply.Error)) + return fmt.Errorf("upgrade failed: %s", reply.Error) } - } - - return errors.Join(errs...) + return nil + }) + return err } -// CheckUpgrades checks for available backend upgrades. -// Gallery comparison is global (not per-node), so we delegate to the local manager. +// CheckUpgrades checks for available backend upgrades across the cluster. +// +// The previous implementation delegated to d.local, which called +// ListSystemBackends on the frontend — but in distributed mode the frontend +// has no backends installed locally, so the upgrade loop never ran and the UI +// never surfaced any upgrades. We now feed the cluster-wide aggregation +// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so +// digest-based detection actually works and cluster drift is visible. func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) { - return d.local.CheckUpgrades(ctx) + installed, err := d.ListBackends() + if err != nil { + return nil, err + } + // systemState is used by AvailableBackends (gallery paths + meta-backend + // resolution). The `installed` argument is what the old code got wrong — + // it used to come from the empty frontend filesystem. + return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed) } diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 92ba76edc..6d87165b4 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -3,26 +3,57 @@ package nodes import ( "context" "encoding/json" + "fmt" "time" "github.com/mudler/LocalAI/core/services/advisorylock" + grpcclient "github.com/mudler/LocalAI/pkg/grpc" "github.com/mudler/xlog" "gorm.io/gorm" ) +// ModelProber checks whether a model's backend process is still reachable. +// Defaulted to a gRPC health probe but overridable for tests so we don't +// need to stand up a real server. Returning false without an error means the +// process is reachable but unhealthy (same as a timeout for our purposes). +type ModelProber interface { + IsAlive(ctx context.Context, address string) bool +} + +// grpcModelProber does a 1s HealthCheck on the model's stored gRPC address. +type grpcModelProber struct{ token string } + +func (g grpcModelProber) IsAlive(ctx context.Context, address string) bool { + client := grpcclient.NewClientWithToken(address, false, nil, false, g.token) + probeCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + ok, _ := client.HealthCheck(probeCtx) + return ok +} + // ReplicaReconciler periodically ensures model replica counts match their // scheduling configs. It scales up replicas when below MinReplicas or when // all replicas are busy (up to MaxReplicas), and scales down idle replicas // above MinReplicas. // +// Alongside replica scaling it runs two state-reconciliation passes — draining +// the pending_backend_ops queue and probing loaded models' gRPC addresses to +// orphan ghosts. Both passes are wrapped in the KeyStateReconciler advisory +// lock so N frontends don't stampede. +// // Only processes models with auto-scaling enabled (MinReplicas > 0 or MaxReplicas > 0). type ReplicaReconciler struct { registry *NodeRegistry scheduler ModelScheduler // interface for scheduling new models unloader NodeCommandSender + adapter *RemoteUnloaderAdapter // NATS sender for pending-op drain + prober ModelProber // health probe for model gRPC addrs db *gorm.DB interval time.Duration scaleDownDelay time.Duration + // probeStaleAfter: only probe node_models rows older than this so we + // don't hammer every worker every tick for models we just heard from. + probeStaleAfter time.Duration } // ModelScheduler abstracts the scheduling logic needed by the reconciler. @@ -35,12 +66,21 @@ type ModelScheduler interface { // ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler. type ReplicaReconcilerOptions struct { - Registry *NodeRegistry - Scheduler ModelScheduler - Unloader NodeCommandSender - DB *gorm.DB - Interval time.Duration // default 30s - ScaleDownDelay time.Duration // default 5m + Registry *NodeRegistry + Scheduler ModelScheduler + Unloader NodeCommandSender + // Adapter is the NATS sender used to retry pending backend ops. When nil, + // the state-reconciler pending-drain pass is a no-op (single-node mode). + Adapter *RemoteUnloaderAdapter + // RegistrationToken is used by the default gRPC prober when probing model + // addresses. Matches the worker's token so HealthCheck auth succeeds. + RegistrationToken string + // Prober overrides the default gRPC health probe (used by tests). + Prober ModelProber + DB *gorm.DB + Interval time.Duration // default 30s + ScaleDownDelay time.Duration // default 5m + ProbeStaleAfter time.Duration // default 2m } // NewReplicaReconciler creates a new ReplicaReconciler. @@ -53,13 +93,24 @@ func NewReplicaReconciler(opts ReplicaReconcilerOptions) *ReplicaReconciler { if scaleDownDelay == 0 { scaleDownDelay = 5 * time.Minute } + probeStaleAfter := opts.ProbeStaleAfter + if probeStaleAfter == 0 { + probeStaleAfter = 2 * time.Minute + } + prober := opts.Prober + if prober == nil { + prober = grpcModelProber{token: opts.RegistrationToken} + } return &ReplicaReconciler{ - registry: opts.Registry, - scheduler: opts.Scheduler, - unloader: opts.Unloader, - db: opts.DB, - interval: interval, - scaleDownDelay: scaleDownDelay, + registry: opts.Registry, + scheduler: opts.Scheduler, + unloader: opts.Unloader, + adapter: opts.Adapter, + prober: prober, + db: opts.DB, + interval: interval, + scaleDownDelay: scaleDownDelay, + probeStaleAfter: probeStaleAfter, } } @@ -78,17 +129,122 @@ func (rc *ReplicaReconciler) Run(ctx context.Context) { } } -// reconcileOnce performs a single reconciliation pass. -// Uses an advisory lock so only one frontend instance reconciles at a time. +// reconcileOnce performs a single reconciliation pass. Replica work and +// state-reconciliation work run under *different* advisory locks so multiple +// frontends can share load across passes, and one long-running pass doesn't +// block the other forever if a frontend wedges. func (rc *ReplicaReconciler) reconcileOnce(ctx context.Context) { if rc.db != nil { - lockKey := advisorylock.KeyFromString("replica-reconciler") - _ = advisorylock.WithLockCtx(ctx, rc.db, lockKey, func() error { + replicaKey := advisorylock.KeyFromString("replica-reconciler") + _ = advisorylock.WithLockCtx(ctx, rc.db, replicaKey, func() error { rc.reconcile(ctx) return nil }) + // Try, don't block: if another frontend is already running the state + // pass, this tick is a no-op. Matches the health monitor pattern. + _, _ = advisorylock.TryWithLockCtx(ctx, rc.db, advisorylock.KeyStateReconciler, func() error { + rc.reconcileState(ctx) + return nil + }) } else { rc.reconcile(ctx) + rc.reconcileState(ctx) + } +} + +// reconcileState runs the state-reconciliation passes: drain pending backend +// ops for freshly-healthy nodes, then probe model gRPC addresses to orphan +// ghosts. Both passes are best-effort: a failure on one node doesn't stop +// the rest. +func (rc *ReplicaReconciler) reconcileState(ctx context.Context) { + if rc.adapter != nil { + rc.drainPendingBackendOps(ctx) + } + rc.probeLoadedModels(ctx) +} + +// drainPendingBackendOps retries queued backend ops whose next_retry_at has +// passed on nodes that are currently healthy. On success the row is deleted; +// on failure attempts++ and next_retry_at moves out via exponential backoff. +func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { + ops, err := rc.registry.ListDuePendingBackendOps(ctx) + if err != nil { + xlog.Warn("Reconciler: failed to list pending backend ops", "error", err) + return + } + if len(ops) == 0 { + return + } + xlog.Debug("Reconciler: draining pending backend ops", "count", len(ops)) + + for _, op := range ops { + if err := ctx.Err(); err != nil { + return + } + var applyErr error + switch op.Op { + case OpBackendDelete: + _, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend) + case OpBackendInstall, OpBackendUpgrade: + reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries)) + if err != nil { + applyErr = err + } else if !reply.Success { + applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error) + } + default: + xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID) + continue + } + + if applyErr == nil { + if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil { + xlog.Warn("Reconciler: failed to delete drained op row", "id", op.ID, "error", err) + } else { + xlog.Info("Reconciler: pending backend op applied", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1) + } + continue + } + _ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error()) + xlog.Warn("Reconciler: pending backend op retry failed", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr) + } +} + +// probeLoadedModels gRPC-health-checks model addresses that the DB says are +// loaded. If a model's backend process is gone (OOM, crash, manual restart) +// we remove the row so ghosts don't linger. Only probes rows older than +// probeStaleAfter so we don't hammer every worker every tick for models we +// just heard from. +func (rc *ReplicaReconciler) probeLoadedModels(ctx context.Context) { + var stale []NodeModel + cutoff := time.Now().Add(-rc.probeStaleAfter) + err := rc.registry.db.WithContext(ctx). + Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id"). + Where("node_models.state = ? AND backend_nodes.status = ? AND node_models.updated_at < ? AND node_models.address != ''", + "loaded", StatusHealthy, cutoff). + Find(&stale).Error + if err != nil { + xlog.Warn("Reconciler: failed to list loaded models for probe", "error", err) + return + } + for _, m := range stale { + if err := ctx.Err(); err != nil { + return + } + if rc.prober.IsAlive(ctx, m.Address) { + // Bump updated_at so we don't probe this row again immediately. + _ = rc.registry.db.WithContext(ctx).Model(&NodeModel{}). + Where("id = ?", m.ID).Update("updated_at", time.Now()).Error + continue + } + if err := rc.registry.RemoveNodeModel(ctx, m.NodeID, m.ModelName); err != nil { + xlog.Warn("Reconciler: failed to remove unreachable model", "node", m.NodeID, "model", m.ModelName, "error", err) + continue + } + xlog.Warn("Reconciler: model unreachable, removed from registry", + "node", m.NodeID, "model", m.ModelName, "address", m.Address) } } diff --git a/core/services/nodes/reconciler_test.go b/core/services/nodes/reconciler_test.go index e95f8bcea..52a488a2a 100644 --- a/core/services/nodes/reconciler_test.go +++ b/core/services/nodes/reconciler_test.go @@ -239,3 +239,138 @@ var _ = Describe("ReplicaReconciler", func() { }) }) }) + +// fakeProber lets tests control whether a model's gRPC address "responds". +type fakeProber struct { + alive map[string]bool + calls int +} + +func (f *fakeProber) IsAlive(_ context.Context, address string) bool { + f.calls++ + if f.alive == nil { + return false + } + return f.alive[address] +} + +var _ = Describe("ReplicaReconciler — state reconciliation", func() { + var ( + db *gorm.DB + registry *NodeRegistry + ) + + BeforeEach(func() { + if runtime.GOOS == "darwin" { + Skip("testcontainers requires Docker, not available on macOS CI") + } + db = testutil.SetupTestDB() + var err error + registry, err = NewNodeRegistry(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("probeLoadedModels", func() { + It("removes loaded models whose gRPC address is unreachable", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + // Two loaded models — one stale (will probe), one fresh (skipped). + stale := &NodeModel{ + ID: "stale-1", + NodeID: node.ID, + ModelName: "stale-model", + Address: "10.0.0.1:12345", + State: "loaded", + UpdatedAt: time.Now().Add(-5 * time.Minute), + } + fresh := &NodeModel{ + ID: "fresh-1", + NodeID: node.ID, + ModelName: "fresh-model", + Address: "10.0.0.1:54321", + State: "loaded", + UpdatedAt: time.Now(), // within probeStaleAfter + } + Expect(db.Create(stale).Error).To(Succeed()) + Expect(db.Create(fresh).Error).To(Succeed()) + + prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": false}} + rc := NewReplicaReconciler(ReplicaReconcilerOptions{ + Registry: registry, + DB: db, + Prober: prober, + ProbeStaleAfter: 2 * time.Minute, + }) + + rc.probeLoadedModels(context.Background()) + + // Stale was unreachable — row removed. + var after []NodeModel + Expect(db.Find(&after).Error).To(Succeed()) + Expect(after).To(HaveLen(1)) + Expect(after[0].ModelName).To(Equal("fresh-model")) + // Prober was only called once (the fresh row was filtered out). + Expect(prober.calls).To(Equal(1)) + }) + + It("keeps reachable models and bumps their updated_at", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + stale := &NodeModel{ + ID: "stale-2", + NodeID: node.ID, + ModelName: "alive-model", + Address: "10.0.0.1:12345", + State: "loaded", + UpdatedAt: time.Now().Add(-5 * time.Minute), + } + Expect(db.Create(stale).Error).To(Succeed()) + + prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": true}} + rc := NewReplicaReconciler(ReplicaReconcilerOptions{ + Registry: registry, + DB: db, + Prober: prober, + ProbeStaleAfter: 2 * time.Minute, + }) + + rc.probeLoadedModels(context.Background()) + + var after NodeModel + Expect(db.First(&after, "id = ?", "stale-2").Error).To(Succeed()) + Expect(after.UpdatedAt).To(BeTemporally("~", time.Now(), time.Second)) + }) + }) + + Describe("UpsertPendingBackendOp + RecordPendingBackendOpFailure", func() { + It("upserts on the composite key rather than duplicating rows", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + // Second call for the same (node, backend, op) should not create a + // new row — that's how re-issuing a delete works. + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + + var rows []PendingBackendOp + Expect(db.Find(&rows).Error).To(Succeed()) + Expect(rows).To(HaveLen(1)) + }) + + It("increments attempts and moves next_retry_at out on failure", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + + var row PendingBackendOp + Expect(db.First(&row).Error).To(Succeed()) + before := row.NextRetryAt + + Expect(registry.RecordPendingBackendOpFailure(context.Background(), row.ID, "boom")).To(Succeed()) + Expect(db.First(&row, row.ID).Error).To(Succeed()) + Expect(row.Attempts).To(Equal(1)) + Expect(row.LastError).To(Equal("boom")) + Expect(row.NextRetryAt).To(BeTemporally(">", before)) + }) + }) +}) diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index f56fcedcc..3894b41c5 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -104,6 +104,36 @@ type NodeWithExtras struct { Labels map[string]string `json:"labels,omitempty"` } +// PendingBackendOp is a durable intent for a backend lifecycle operation +// (delete/install/upgrade) that needs to eventually apply on a specific node. +// +// Without this table, a backend delete against an offline node silently +// dropped: the frontend skipped the node, the node came back later with the +// backend still installed, and the operator saw a zombie. Now the intent is +// recorded regardless of node status; the state reconciler drains the queue +// whenever a node is healthy and removes the row on success. Reissuing the +// same operation while a row exists updates NextRetryAt instead of stacking +// duplicates (see the unique index). +type PendingBackendOp struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + NodeID string `gorm:"index;size:36;not null;uniqueIndex:idx_pending_backend_op,priority:1" json:"node_id"` + Backend string `gorm:"index;size:255;not null;uniqueIndex:idx_pending_backend_op,priority:2" json:"backend"` + Op string `gorm:"size:16;not null;uniqueIndex:idx_pending_backend_op,priority:3" json:"op"` + Galleries []byte `gorm:"type:bytea" json:"-"` // serialized JSON for install/upgrade retries + Attempts int `gorm:"default:0" json:"attempts"` + LastError string `gorm:"type:text" json:"last_error,omitempty"` + CreatedAt time.Time `json:"created_at"` + NextRetryAt time.Time `gorm:"index" json:"next_retry_at"` +} + +// Op constants mirror the operation names used by DistributedBackendManager +// so callers don't repeat stringly-typed values. +const ( + OpBackendDelete = "delete" + OpBackendInstall = "install" + OpBackendUpgrade = "upgrade" +) + // NodeRegistry manages backend node registration and lookup in PostgreSQL. type NodeRegistry struct { db *gorm.DB @@ -114,7 +144,7 @@ type NodeRegistry struct { // when multiple instances (frontend + workers) start at the same time. func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) { if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { - return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}) + return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{}) }); err != nil { return nil, fmt.Errorf("migrating node tables: %w", err) } @@ -946,3 +976,114 @@ func (r *NodeRegistry) ApplyAutoLabels(ctx context.Context, nodeID string, node _ = r.SetNodeLabel(ctx, nodeID, "node.name", node.Name) } } + +// UpsertPendingBackendOp records or refreshes a pending backend operation for +// a node. If a row already exists for (nodeID, backend, op) we keep its +// Attempts/LastError but reset NextRetryAt to now, so reissuing the same +// delete/upgrade nudges it to the front of the queue instead of stacking a +// duplicate intent. +func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backend, op string, galleries []byte) error { + row := PendingBackendOp{ + NodeID: nodeID, + Backend: backend, + Op: op, + Galleries: galleries, + NextRetryAt: time.Now(), + } + return r.db.WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}}, + DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}), + }).Create(&row).Error +} + +// ListDuePendingBackendOps returns queued ops whose NextRetryAt has passed +// AND whose node is currently healthy. The reconciler drains this list; we +// filter by node status in the query so a tick doesn't hammer NATS for +// nodes that obviously can't answer. +func (r *NodeRegistry) ListDuePendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) { + var ops []PendingBackendOp + err := r.db.WithContext(ctx). + Joins("JOIN backend_nodes ON backend_nodes.id = pending_backend_ops.node_id"). + Where("pending_backend_ops.next_retry_at <= ? AND backend_nodes.status = ?", time.Now(), StatusHealthy). + Order("pending_backend_ops.next_retry_at ASC"). + Find(&ops).Error + if err != nil { + return nil, fmt.Errorf("listing due pending backend ops: %w", err) + } + return ops, nil +} + +// ListPendingBackendOps returns every queued row (for the UI "pending on N +// nodes" chip and the pre-delete ConfirmDialog). +func (r *NodeRegistry) ListPendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) { + var ops []PendingBackendOp + if err := r.db.WithContext(ctx).Order("backend ASC, created_at ASC").Find(&ops).Error; err != nil { + return nil, fmt.Errorf("listing pending backend ops: %w", err) + } + return ops, nil +} + +// DeletePendingBackendOp removes a queue row — called after the op succeeds. +func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) error { + if err := r.db.WithContext(ctx).Delete(&PendingBackendOp{}, id).Error; err != nil { + return fmt.Errorf("deleting pending backend op %d: %w", id, err) + } + return nil +} + +// RecordPendingBackendOpFailure bumps Attempts, captures the error, and +// pushes NextRetryAt out with exponential backoff capped at 15 minutes. +func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error { + return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var row PendingBackendOp + if err := tx.First(&row, id).Error; err != nil { + return err + } + row.Attempts++ + row.LastError = errMsg + row.NextRetryAt = time.Now().Add(backoffForAttempt(row.Attempts)) + return tx.Save(&row).Error + }) +} + +// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The +// reconciler tick is 30s so anything shorter would just re-fire immediately. +func backoffForAttempt(attempts int) time.Duration { + const cap = 15 * time.Minute + base := 30 * time.Second + shift := attempts - 1 + if shift < 0 { + shift = 0 + } + if shift > 10 { // 2^10 * 30s already exceeds the cap + shift = 10 + } + d := base << shift + if d > cap { + return cap + } + return d +} + +// CountPendingBackendOpsByBackend returns a map of backend name to the count +// of pending rows. Used to decorate Manage → Backends with a "pending on N +// nodes" chip without exposing the full queue. +func (r *NodeRegistry) CountPendingBackendOpsByBackend(ctx context.Context) (map[string]int, error) { + type row struct { + Backend string + Count int + } + var rows []row + err := r.db.WithContext(ctx).Model(&PendingBackendOp{}). + Select("backend, COUNT(*) as count"). + Group("backend"). + Scan(&rows).Error + if err != nil { + return nil, fmt.Errorf("counting pending backend ops: %w", err) + } + out := make(map[string]int, len(rows)) + for _, r := range rows { + out[r.Backend] = r.Count + } + return out, nil +}