feat(distributed): sync state with frontends, better backend management reporting (#9426)

* fix(distributed): detect backend upgrades across worker nodes

Before this change `DistributedBackendManager.CheckUpgrades` delegated to the
local manager, which read backends from the frontend filesystem. In
distributed deployments the frontend has no backends installed locally —
they live on workers — so the upgrade-detection loop never ran and the UI
silently never surfaced upgrades even when the gallery advertised newer
versions or digests.

Worker-side: NATS backend.list reply now carries Version, URI and Digest
for each installed backend (read from metadata.json).

Frontend-side: DistributedBackendManager.ListBackends aggregates per-node
refs (name, status, version, digest) instead of deduping, and CheckUpgrades
feeds that aggregation into gallery.CheckUpgradesAgainst — a new entrypoint
factored out of CheckBackendUpgrades so both paths share the same core
logic.

Cluster drift policy: when per-node version/digest tuples disagree, the
backend is flagged upgradeable regardless of whether any single node
matches the gallery, and UpgradeInfo.NodeDrift enumerates the outliers so
operators can see *why* it is out of sync. The next upgrade-all realigns
the cluster.

Tests cover: drift detection, unanimous-match (no upgrade), and the
empty-installed-version path that the old distributed code silently
missed.

* feat(ui): surface backend upgrades in the System page

The System page (Manage.jsx) only showed updates as a tiny inline arrow,
so operators routinely missed them. Port the Backend Gallery's upgrade UX
so System speaks the same visual language:

- Yellow banner at the top of the Backends tab when upgrades are pending,
  with an "Upgrade all" button (serial fan-out, matches the gallery) and a
  "Updates only" filter toggle.
- Warning pill (↑ N) next to the tab label so the count is glanceable even
  when the banner is scrolled out of view.
- Per-row labeled "Upgrade to vX.Y" button (replaces the icon-only button
  that silently flipped semantics between Reinstall and Upgrade), plus an
  "Update available" badge in the new Version column.
- New columns: Version (with upgrade + drift chips), Nodes (per-node
  attribution badges for distributed mode, degrading to a compact
  "on N nodes · M offline" chip above three nodes), Installed (relative
  time).
- System backends render a "Protected" chip instead of a bare "—" so rows
  still align and the reason is obvious.
- Delete uses the softer btn-danger-ghost so rows don't scream red; the
  ConfirmDialog still owns the "are you sure".

The upgrade checker also needed the same per-worker fix as the previous
commit: NewUpgradeChecker now takes a BackendManager getter so its
periodic runs call the distributed CheckUpgrades (which asks workers)
instead of the empty frontend filesystem. Without this the /api/backends/
upgrades endpoint stayed empty in distributed mode even with the protocol
change in place.

New CSS primitives — .upgrade-banner, .tab-pill, .badge-row, .cell-stack,
.cell-mono, .cell-muted, .row-actions, .btn-danger-ghost — all live in
App.css so other pages can adopt them without duplicating styles.

* feat(ui): polish the Nodes page so it reads like a product

The Nodes page was the biggest visual liability in distributed mode.
Rework the main dashboard surfaces in place without changing behavior:

StatCards: uniform height (96px min), left accent bar colored by the
metric's semantic (success/warning/error/primary), icon lives in a
36x36 soft-tinted chip top-right, value is left-aligned and large.
Grid auto-fills so the row doesn't collapse on narrow viewports. This
replaces the previous thin-bordered boxes with inconsistent heights.

Table rows: expandable rows now show a chevron cue on the left (rotates
on expand) so users know rows open. Status cell became a dedicated chip
with an LED-style halo dot instead of a bare bullet. Action buttons gained
labels — "Approve", "Resume", "Drain" — so the icons aren't doing all
the semantic work; the destructive remove action uses the softer
btn-danger-ghost variant so rows don't scream red, with the ConfirmDialog
still owning the real "are you sure". Applied cell-mono/cell-muted
utility classes so label chips and addresses share one spacing/font
grammar instead of re-declaring inline styles everywhere.

Expanded drawer: empty states for Loaded Models and Installed Backends
now render as a proper drawer-empty card (dashed border, icon, one-line
hint) instead of a plain muted string that read like broken formatting.

Tabs: three inline-styled buttons became the shared .tab class so they
inherit focus ring, hover state, and the rest of the design system —
matches the System page.

"Add more workers" toggle turned into a .nodes-add-worker dashed-border
button labelled "Register a new worker" (action voice) instead of a
chevron + muted link that operators kept mistaking for broken text.

New shared CSS primitives carry over to other pages:
.stat-grid + .stat-card, .row-chevron, .node-status, .drawer-empty,
.nodes-add-worker.

* feat(distributed): durable backend fan-out + state reconciliation

Two connected problems handled together:

1) Backend delete/install/upgrade used to silently skip non-healthy nodes,
   so a delete during an outage left a zombie on the offline node once it
   returned. The fan-out now records intent in a new pending_backend_ops
   table before attempting the NATS round-trip. Currently-healthy nodes
   get an immediate attempt; everyone else is queued. Unique index on
   (node_id, backend, op) means reissuing the same operation refreshes
   next_retry_at instead of stacking duplicates.

2) Loaded-model state could drift from reality: a worker OOM'd, got
   killed, or restarted a backend process would leave a node_models row
   claiming the model was still loaded, feeding ghost entries into the
   /api/nodes/models listing and the router's scheduling decisions.

The existing ReplicaReconciler gains two new passes that run under a
fresh KeyStateReconciler advisory lock (non-blocking, so one wedged
frontend doesn't freeze the cluster):

  - drainPendingBackendOps: retries queued ops whose next_retry_at has
    passed on currently-healthy nodes. Success deletes the row; failure
    bumps attempts and pushes next_retry_at out with exponential backoff
    (30s → 15m cap). ErrNoResponders also marks the node unhealthy.

  - probeLoadedModels: gRPC-HealthChecks addresses the DB thinks are
    loaded but hasn't seen touched in the last probeStaleAfter (2m).
    Unreachable addresses are removed from the registry. A pluggable
    ModelProber lets tests substitute a fake without standing up gRPC.

DistributedBackendManager exposes DeleteBackendDetailed so the HTTP
handler can surface per-node outcomes ("2 succeeded, 1 queued") to the
UI in a follow-up commit; the existing DeleteBackend still returns
error-only for callers that don't care about node breakdown.

Multi-frontend safety: the state pass uses advisorylock.TryWithLockCtx
on a new key so N frontends coordinate — the same pattern the health
monitor and replica reconciler already rely on. Single-node mode runs
both passes inline (adapter is nil, state drain is a no-op).

Tests cover the upsert semantics, backoff math, the probe removing an
unreachable model but keeping a reachable one, and filtering by
probeStaleAfter.

* feat(ui): show cluster distribution of models in the System page

When a frontend restarted in distributed mode, models that workers had
already loaded weren't visible until the operator clicked into each node
manually — the /api/models/capabilities endpoint only knew about
configs on the frontend's filesystem, not the registry-backed truth.

/api/models/capabilities now joins in ListAllLoadedModels() when the
registry is active, returning loaded_on[] with node id/name/state/status
for each model. Models that live in the registry but lack a local config
(the actual ghosts, not recovered from the frontend's file cache) still
surface with source="registry-only" so operators can see and persist
them; without that emission they'd be invisible to this frontend.

Manage → Models replaces the old Running/Idle pill with a distribution
cell that lists the first three nodes the model is loaded on as chips
colored by state (green loaded, blue loading, amber anything else). On
wider clusters the remaining count collapses into a +N chip with a
title-attribute breakdown. Disabled / single-node behavior unchanged.

Adopted models get an extra "Adopted" ghost-icon chip with hover copy
explaining what it means and how to make it permanent.

Distributed mode also enables a 10s auto-refresh and a "Last synced Xs
ago" indicator next to the Update button so ghost rows drop off within
one reconcile tick after their owning process dies. Non-distributed
mode is untouched — no polling, no cell-stack, same old Running/Idle.

* feat(ui): NodeDistributionChip — shared per-node attribution component

Large clusters were going to break the Manage → Backends Nodes column:
the old inline logic rendered every node as a badge and would shred the
layout at >10 workers, plus the Manage → Models distribution cell had
copy-pasted its own slightly-different version.

NodeDistributionChip handles any cluster size with two render modes:
  - small (≤3 nodes): inline chips of node names, colored by health.
  - large: a single "on N nodes · M offline · K drift" summary chip;
    clicking opens a Popover with a per-node table (name, status,
    version, digest for backends; name, status, state for models).

Drift counting mirrors the backend's summarizeNodeDrift so the UI
number matches UpgradeInfo.NodeDrift. Digests are truncated to the
docker-style 12-char form with the full value preserved in the title.

Popover is a new general-purpose primitive: fixed positioning anchored
to the trigger, flips above when there's no room below, closes on
outside-click or Escape, returns focus to the trigger. Uses .card as
its surface so theming is inherited. Also useful for a future
labels-editor popup and the user menu.

Manage.jsx drops its duplicated inline Nodes-column + loaded_on cell
and uses the shared chip with context="backends" / "models"
respectively. Delete code removes ~40 lines of ad-hoc logic.

* feat(ui): shared FilterBar across the System page tabs

The Backends gallery had a nice search + chip + toggle strip; the System
page had nothing, so the two surfaces felt like different apps. Lift the
pattern into a reusable FilterBar and wire both System tabs through it.

New component core/http/react-ui/src/components/FilterBar.jsx renders a
search input, a role="tablist" chip row (aria-selected for a11y), and
optional toggles / right slot. Chips support an optional `count` which
the System page uses to show "User 3", "Updates 1" etc.

System Models tab: search by id or backend; chips for
All/Running/Idle/Disabled/Pinned plus a conditional Distributed chip in
distributed mode. "Last synced" + Update button live in the right slot.

System Backends tab: search by name/alias/meta-backend-for; chips for
All/User/System/Meta plus conditional Updates / Offline-nodes chips
when relevant. The old ad-hoc "Updates only" toggle from the upgrade
banner folded into the Updates chip — one source of truth for that
filter. Offline chip only appears in distributed mode when at least
one backend has an unhealthy node, so the chip row stays quiet on
healthy clusters.

Filter state persists in URL query params (mq/mf/bq/bf) so deep links
and tab switches keep the operator's filter context instead of
resetting every time.

Also adds an "Adopted" distribution path: when a model in
/api/models/capabilities carries source="registry-only" (discovered on
a worker but not configured locally), the Models tab shows a ghost chip
labelled "Adopted" with hover copy explaining how to persist it — this
is what closes the loop on the ghost-model story end-to-end.
This commit is contained in:
Ettore Di Giacinto
2026-04-19 17:55:53 +02:00
committed by GitHub
parent 9cd8d7951f
commit 75a63f87d8
21 changed files with 2185 additions and 312 deletions

View File

@@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut
DB: authDB,
})
// Create ReplicaReconciler for auto-scaling model replicas
// Create ReplicaReconciler for auto-scaling model replicas. Adapter +
// RegistrationToken feed the state-reconciliation passes: pending op
// drain uses the adapter, and model health probes use the token to auth
// against workers' gRPC HealthCheck.
reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
Adapter: remoteUnloader,
RegistrationToken: cfg.Distributed.RegistrationToken,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
ProbeStaleAfter: 2 * time.Minute,
})
// Create ModelRouterAdapter to wire into ModelLoader

View File

@@ -235,7 +235,12 @@ func New(opts ...config.AppOption) (*Application, error) {
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
// instance runs periodic checks (avoids duplicate upgrades across replicas).
if len(options.BackendGalleries) > 0 {
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB())
// Pass a lazy getter for the backend manager so the checker always
// uses the active one — DistributedBackendManager is swapped in above
// and asks workers for their installed backends, which is what
// upgrade detection needs in distributed mode.
bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() }
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn)
application.upgradeChecker = uc
go uc.Run(options.Context)
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/advisorylock"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
@@ -26,6 +27,12 @@ type UpgradeChecker struct {
galleries []config.Gallery
systemState *system.SystemState
db *gorm.DB // non-nil in distributed mode
// backendManagerFn lazily returns the current backend manager (may be
// swapped from Local to Distributed after startup). Pulled through each
// check so the UpgradeChecker uses whichever is active. In distributed
// mode this ensures CheckUpgrades asks workers instead of the (empty)
// frontend filesystem — fixing the bug where upgrades never surfaced.
backendManagerFn func() galleryop.BackendManager
checkInterval time.Duration
stop chan struct{}
@@ -40,18 +47,22 @@ type UpgradeChecker struct {
// NewUpgradeChecker creates a new UpgradeChecker service.
// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode
// (uses advisory locks so only one instance runs periodic checks).
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker {
// backendManagerFn is optional; when set, CheckUpgrades is routed through
// the active backend manager — required in distributed mode so the check
// aggregates from workers rather than the empty frontend filesystem.
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB, backendManagerFn func() galleryop.BackendManager) *UpgradeChecker {
return &UpgradeChecker{
appConfig: appConfig,
modelLoader: ml,
galleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
db: db,
checkInterval: 6 * time.Hour,
stop: make(chan struct{}),
done: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
lastUpgrades: make(map[string]gallery.UpgradeInfo),
appConfig: appConfig,
modelLoader: ml,
galleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
db: db,
backendManagerFn: backendManagerFn,
checkInterval: 6 * time.Hour,
stop: make(chan struct{}),
done: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
lastUpgrades: make(map[string]gallery.UpgradeInfo),
}
}
@@ -64,13 +75,16 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade
func (uc *UpgradeChecker) Run(ctx context.Context) {
defer close(uc.done)
// Initial delay: don't slow down startup
// Initial delay: don't slow down startup. Short enough that operators
// don't stare at an empty upgrade banner for long; long enough that
// workers have registered and reported their installed backends.
initialDelay := 10 * time.Second
select {
case <-ctx.Done():
return
case <-uc.stop:
return
case <-time.After(30 * time.Second):
case <-time.After(initialDelay):
}
// First check always runs locally (to warm the cache on this instance)
@@ -144,7 +158,18 @@ func (uc *UpgradeChecker) GetAvailableUpgrades() map[string]gallery.UpgradeInfo
}
func (uc *UpgradeChecker) runCheck(ctx context.Context) {
upgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
var (
upgrades map[string]gallery.UpgradeInfo
err error
)
if uc.backendManagerFn != nil {
if bm := uc.backendManagerFn(); bm != nil {
upgrades, err = bm.CheckUpgrades(ctx)
}
}
if upgrades == nil && err == nil {
upgrades, err = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
}
uc.mu.Lock()
uc.lastCheckTime = time.Now()

View File

@@ -738,6 +738,9 @@ func (s *backendSupervisor) subscribeLifecycleEvents() {
if b.Metadata != nil {
info.InstalledAt = b.Metadata.InstalledAt
info.GalleryURL = b.Metadata.GalleryURL
info.Version = b.Metadata.Version
info.URI = b.Metadata.URI
info.Digest = b.Metadata.Digest
}
infos = append(infos, info)
}

View File

@@ -394,6 +394,23 @@ type SystemBackend struct {
Metadata *BackendMetadata
UpgradeAvailable bool `json:"upgrade_available,omitempty"`
AvailableVersion string `json:"available_version,omitempty"`
// Nodes holds per-node attribution in distributed mode. Empty in single-node.
// Each entry describes a node that has this backend installed, with the
// version/digest it reports. Lets the UI surface drift and per-node status.
Nodes []NodeBackendRef `json:"nodes,omitempty"`
}
// NodeBackendRef describes one node's view of an installed backend. Used both
// for per-node attribution in the UI and for drift detection during upgrade
// checks (a cluster with mismatched versions/digests is flagged upgradeable).
type NodeBackendRef struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
NodeStatus string `json:"node_status"` // healthy | unhealthy | offline | draining | pending
Version string `json:"version,omitempty"`
Digest string `json:"digest,omitempty"`
URI string `json:"uri,omitempty"`
InstalledAt string `json:"installed_at,omitempty"`
}
type SystemBackends map[string]SystemBackend

View File

@@ -23,22 +23,45 @@ type UpgradeInfo struct {
AvailableVersion string `json:"available_version"`
InstalledDigest string `json:"installed_digest,omitempty"`
AvailableDigest string `json:"available_digest,omitempty"`
// NodeDrift lists nodes whose installed version or digest differs from
// the cluster majority. Non-empty means the cluster has diverged and an
// upgrade will realign it. Empty in single-node mode.
NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"`
}
// CheckBackendUpgrades compares installed backends against gallery entries
// and returns a map of backend names to UpgradeInfo for those that have
// newer versions or different OCI digests available.
// NodeDriftInfo describes one node that disagrees with the cluster majority
// on which version/digest of a backend is installed.
type NodeDriftInfo struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Version string `json:"version,omitempty"`
Digest string `json:"digest,omitempty"`
}
// CheckBackendUpgrades is the single-node entrypoint. Distributed callers use
// CheckUpgradesAgainst directly with their aggregated SystemBackends.
func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState) (map[string]UpgradeInfo, error) {
installed, err := ListSystemBackends(systemState)
if err != nil {
return nil, fmt.Errorf("failed to list installed backends: %w", err)
}
return CheckUpgradesAgainst(ctx, galleries, systemState, installed)
}
// CheckUpgradesAgainst compares a caller-supplied SystemBackends set against
// the gallery. Fixes the distributed-mode bug where the old code passed the
// frontend's (empty) local filesystem through ListSystemBackends and so never
// surfaced any upgrades.
//
// Cluster drift policy: if a backend's per-node versions/digests disagree, the
// row is flagged upgradeable regardless of whether any node matches the gallery
// — next Upgrade All realigns the cluster. NodeDrift lists the outliers.
func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) {
galleryBackends, err := AvailableBackends(galleries, systemState)
if err != nil {
return nil, fmt.Errorf("failed to list available backends: %w", err)
}
installedBackends, err := ListSystemBackends(systemState)
if err != nil {
return nil, fmt.Errorf("failed to list installed backends: %w", err)
}
result := make(map[string]UpgradeInfo)
for _, installed := range installedBackends {
@@ -57,34 +80,48 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste
}
installedVersion := installed.Metadata.Version
installedDigest := installed.Metadata.Digest
galleryVersion := galleryEntry.Version
// If both sides have versions, compare them
// Detect cluster drift: does every node report the same version+digest?
// In single-node mode this stays empty (Nodes is nil).
majority, drift := summarizeNodeDrift(installed.Nodes)
if majority.version != "" {
installedVersion = majority.version
}
if majority.digest != "" {
installedDigest = majority.digest
}
makeInfo := func(info UpgradeInfo) UpgradeInfo {
info.NodeDrift = drift
return info
}
// If versions are available on both sides, they're the source of truth.
if galleryVersion != "" && installedVersion != "" {
if galleryVersion != installedVersion {
result[installed.Metadata.Name] = UpgradeInfo{
if galleryVersion != installedVersion || len(drift) > 0 {
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: installedVersion,
AvailableVersion: galleryVersion,
}
})
}
// Versions match — no upgrade needed
continue
}
// Gallery has a version but installed doesn't — this happens for backends
// installed before version tracking was added. Flag as upgradeable so
// users can re-install to pick up version metadata.
// Gallery has a version but installed doesn't — backends installed before
// version tracking was added. Flag as upgradeable to pick up metadata.
if galleryVersion != "" && installedVersion == "" {
result[installed.Metadata.Name] = UpgradeInfo{
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: "",
AvailableVersion: galleryVersion,
}
})
continue
}
// Fall back to OCI digest comparison when versions are unavailable
// Fall back to OCI digest comparison when versions are unavailable.
if downloader.URI(galleryEntry.URI).LooksLikeOCI() {
remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil)
if err != nil {
@@ -92,21 +129,68 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste
continue
}
// If we have a stored digest, compare; otherwise any remote digest
// means we can't confirm we're up to date — flag as upgradeable
if installed.Metadata.Digest == "" || remoteDigest != installed.Metadata.Digest {
result[installed.Metadata.Name] = UpgradeInfo{
// means we can't confirm we're up to date — flag as upgradeable.
if installedDigest == "" || remoteDigest != installedDigest || len(drift) > 0 {
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledDigest: installed.Metadata.Digest,
InstalledDigest: installedDigest,
AvailableDigest: remoteDigest,
}
})
}
} else if len(drift) > 0 {
// No version/digest path but nodes disagree — still worth flagging.
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: installedVersion,
InstalledDigest: installedDigest,
})
}
// No version info and non-OCI URI — cannot determine, skip
}
return result, nil
}
// summarizeNodeDrift collapses per-node version/digest tuples to a majority
// pair and returns the outliers. In single-node mode (empty nodes slice) this
// returns zero values and a nil drift list.
func summarizeNodeDrift(nodes []NodeBackendRef) (majority struct{ version, digest string }, drift []NodeDriftInfo) {
if len(nodes) == 0 {
return majority, nil
}
type key struct{ version, digest string }
counts := map[key]int{}
var topKey key
var topCount int
for _, n := range nodes {
k := key{n.Version, n.Digest}
counts[k]++
if counts[k] > topCount {
topCount = counts[k]
topKey = k
}
}
majority.version = topKey.version
majority.digest = topKey.digest
if len(counts) == 1 {
return majority, nil // unanimous — no drift
}
for _, n := range nodes {
if n.Version == majority.version && n.Digest == majority.digest {
continue
}
drift = append(drift, NodeDriftInfo{
NodeID: n.NodeID,
NodeName: n.NodeName,
Version: n.Version,
Digest: n.Digest,
})
}
return majority, drift
}
// UpgradeBackend upgrades a single backend to the latest gallery version using
// an atomic swap with backup-based rollback on failure.
func UpgradeBackend(ctx context.Context, systemState *system.SystemState, modelLoader *model.ModelLoader, galleries []config.Gallery, backendName string, downloadStatus func(string, string, string, float64)) error {

View File

@@ -144,6 +144,97 @@ var _ = Describe("Upgrade Detection and Execution", func() {
})
})
// CheckUpgradesAgainst is the entry point used by DistributedBackendManager.
// It takes installed backends directly — typically aggregated from workers —
// instead of reading the frontend filesystem. These tests exercise drift
// detection, which is the feature the distributed path relies on.
Describe("CheckUpgradesAgainst (distributed)", func() {
It("flags upgrade when cluster nodes disagree on version, even if gallery matches majority", func() {
writeGalleryYAML([]GalleryBackend{
{
Metadata: Metadata{Name: "my-backend"},
URI: filepath.Join(tempDir, "some-source"),
Version: "2.0.0",
},
})
installed := SystemBackends{
"my-backend": SystemBackend{
Name: "my-backend",
Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"},
Nodes: []NodeBackendRef{
{NodeID: "a", NodeName: "worker-1", Version: "2.0.0"},
{NodeID: "b", NodeName: "worker-2", Version: "2.0.0"},
{NodeID: "c", NodeName: "worker-3", Version: "1.0.0"}, // drift
},
},
}
upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed)
Expect(err).NotTo(HaveOccurred())
Expect(upgrades).To(HaveKey("my-backend"))
info := upgrades["my-backend"]
Expect(info.AvailableVersion).To(Equal("2.0.0"))
Expect(info.NodeDrift).To(HaveLen(1))
Expect(info.NodeDrift[0].NodeName).To(Equal("worker-3"))
Expect(info.NodeDrift[0].Version).To(Equal("1.0.0"))
})
It("does not flag upgrade when all nodes agree and match gallery", func() {
writeGalleryYAML([]GalleryBackend{
{
Metadata: Metadata{Name: "my-backend"},
URI: filepath.Join(tempDir, "some-source"),
Version: "2.0.0",
},
})
installed := SystemBackends{
"my-backend": SystemBackend{
Name: "my-backend",
Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"},
Nodes: []NodeBackendRef{
{NodeID: "a", NodeName: "worker-1", Version: "2.0.0"},
{NodeID: "b", NodeName: "worker-2", Version: "2.0.0"},
},
},
}
upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed)
Expect(err).NotTo(HaveOccurred())
Expect(upgrades).To(BeEmpty())
})
It("surfaces empty-installed-version path the old distributed code silently missed", func() {
// Simulates the real-world bug: worker has a backend, its version
// is empty (pre-tracking or OCI-pinned-to-latest), gallery has a
// version. Pre-fix CheckUpgrades returned nothing; now it surfaces.
writeGalleryYAML([]GalleryBackend{
{
Metadata: Metadata{Name: "my-backend"},
URI: filepath.Join(tempDir, "some-source"),
Version: "2.0.0",
},
})
installed := SystemBackends{
"my-backend": SystemBackend{
Name: "my-backend",
Metadata: &BackendMetadata{Name: "my-backend"},
Nodes: []NodeBackendRef{
{NodeID: "a", NodeName: "worker-1"},
},
},
}
upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed)
Expect(err).NotTo(HaveOccurred())
Expect(upgrades).To(HaveKey("my-backend"))
Expect(upgrades["my-backend"].InstalledVersion).To(BeEmpty())
Expect(upgrades["my-backend"].AvailableVersion).To(Equal("2.0.0"))
})
})
Describe("UpgradeBackend", func() {
It("should replace backend directory and update metadata", func() {
// Install v1

View File

@@ -1529,6 +1529,401 @@ select.input {
background: var(--color-warning-light);
color: var(--color-warning);
}
.badge-accent {
background: var(--color-accent-light);
color: var(--color-accent);
}
/* Horizontal row of badges used inside table cells — consistent spacing so
cells line up regardless of how many badges are present. */
.badge-row {
display: inline-flex;
flex-wrap: wrap;
gap: 4px;
align-items: center;
}
/* Vertically stacked cell content (e.g. version + update chip + drift chip).
Keeps rows readable at scale without inline style={{...}} everywhere. */
.cell-stack {
display: flex;
flex-direction: column;
gap: 4px;
align-items: flex-start;
}
.cell-mono {
font-family: 'JetBrains Mono', ui-monospace, monospace;
font-size: var(--text-xs);
color: var(--color-text-primary);
}
.cell-muted {
color: var(--color-text-muted);
font-size: var(--text-xs);
}
.cell-subtle {
color: var(--color-text-muted);
font-size: var(--text-xs);
font-weight: 400;
margin-left: 8px;
}
.cell-name {
display: inline-flex;
align-items: center;
gap: var(--spacing-xs);
font-weight: 500;
}
.cell-name > i {
color: var(--color-accent);
font-size: var(--text-xs);
}
.row-actions {
display: flex;
gap: var(--spacing-xs);
justify-content: flex-end;
align-items: center;
}
/* Softer delete button for dense tables — the destructive confirm dialog
already owns the "are you sure" affordance, so the button itself doesn't
need to scream. Keeps the delete red readable without dominating rows. */
.btn.btn-danger-ghost {
background: transparent;
color: var(--color-error);
border-color: transparent;
}
.btn.btn-danger-ghost:hover:not(:disabled) {
background: var(--color-error-light);
color: var(--color-error);
border-color: var(--color-error-light);
}
/* Small count pill used inside tabs ("(3) ↑ 2") so update counts are
glanceable without extra rows of UI. */
.tab-pill {
display: inline-flex;
align-items: center;
gap: 3px;
margin-left: 6px;
padding: 1px 6px;
border-radius: var(--radius-full);
font-size: var(--text-xs);
font-weight: 600;
line-height: 1.4;
}
.tab-pill--warning {
background: var(--color-warning-light);
color: var(--color-warning);
}
/* Stat cards — uniform-height cluster metrics for the Nodes dashboard.
Left accent bar ties the color to the metric's semantic (success/warning/
error/primary), icon chip sits top-right, value is left-aligned and
prominent so you can scan a row of cards without reading labels. */
.stat-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(180px, 1fr));
gap: var(--spacing-md);
margin-bottom: var(--spacing-xl);
}
.stat-card {
position: relative;
display: flex;
align-items: center;
justify-content: space-between;
gap: var(--spacing-sm);
padding: var(--spacing-md);
min-height: 96px;
background: var(--color-bg-raised, var(--color-bg-secondary));
border: 1px solid var(--color-border-subtle);
border-radius: var(--radius-lg);
transition: transform var(--duration-fast) var(--ease-default),
box-shadow var(--duration-fast) var(--ease-default),
border-color var(--duration-fast) var(--ease-default);
overflow: hidden;
}
.stat-card::before {
content: '';
position: absolute;
left: 0; top: 0; bottom: 0;
width: 3px;
background: var(--stat-accent, var(--color-border-subtle));
transition: background var(--duration-fast) var(--ease-default);
}
.stat-card:hover {
transform: translateY(-1px);
box-shadow: var(--shadow-sm);
border-color: var(--color-border);
}
.stat-card__body {
display: flex;
flex-direction: column;
gap: 6px;
min-width: 0;
}
.stat-card__label {
font-size: var(--text-xs);
font-weight: 600;
letter-spacing: 0.08em;
text-transform: uppercase;
color: var(--color-text-muted);
white-space: normal;
line-height: 1.2;
}
.stat-card__value {
font-size: var(--text-2xl);
font-weight: 600;
font-family: 'JetBrains Mono', ui-monospace, monospace;
line-height: 1;
color: var(--color-text-primary);
word-break: break-word;
}
.stat-card__icon {
display: inline-flex;
align-items: center;
justify-content: center;
width: 36px;
height: 36px;
border-radius: var(--radius-md);
background: color-mix(in srgb, var(--stat-accent, var(--color-text-muted)) 12%, transparent);
color: var(--stat-accent, var(--color-text-muted));
font-size: var(--text-lg);
flex-shrink: 0;
}
/* Subtle "Register a new worker" trigger replacing the broken-text chevron
link. Still opens the same hint card — just reads like a button now. */
.nodes-add-worker {
display: inline-flex;
align-items: center;
gap: var(--spacing-xs);
padding: var(--spacing-xs) var(--spacing-sm);
background: transparent;
border: 1px dashed var(--color-border);
border-radius: var(--radius-md);
color: var(--color-text-secondary);
font-size: var(--text-sm);
font-family: inherit;
font-weight: 500;
cursor: pointer;
margin-bottom: var(--spacing-md);
transition: background var(--duration-fast) var(--ease-default),
border-color var(--duration-fast) var(--ease-default),
color var(--duration-fast) var(--ease-default);
}
.nodes-add-worker:hover {
background: var(--color-bg-raised, var(--color-bg-secondary));
border-color: var(--color-border-strong);
color: var(--color-text-primary);
}
/* Shared FilterBar layout — search strip + chip row + toggle strip. Lives
outside the .filter-bar chip row so the padding and wrapping behavior is
consistent between the Backends gallery and the System tabs. */
.filter-bar-group {
display: flex;
flex-direction: column;
gap: var(--spacing-sm);
margin-bottom: var(--spacing-md);
}
.filter-bar-group__search {
min-width: 200px;
flex: 1;
}
.filter-bar-group__row {
display: flex;
gap: var(--spacing-md);
align-items: center;
flex-wrap: wrap;
}
.filter-bar-group__right {
display: flex;
gap: var(--spacing-md);
align-items: center;
flex-wrap: wrap;
padding-left: var(--spacing-md);
border-left: 1px solid var(--color-border-subtle);
}
.filter-bar-group__toggle {
display: flex;
align-items: center;
gap: var(--spacing-xs);
font-size: var(--text-xs);
color: var(--color-text-secondary);
cursor: pointer;
user-select: none;
white-space: nowrap;
}
.filter-btn__count {
display: inline-flex;
align-items: center;
justify-content: center;
margin-left: 6px;
min-width: 18px;
padding: 0 5px;
background: color-mix(in srgb, currentColor 18%, transparent);
border-radius: var(--radius-full);
font-size: 0.625rem;
font-weight: 600;
}
/* Popover — floating surface anchored to a trigger element. Uses the .card
base so theming is free, adds z-index + fixed-position + scroll cap so it
behaves on tables with many rows. Kept deliberately unstyled beyond that
— content is expected to provide its own header/body structure. */
.popover {
position: fixed;
z-index: 200;
min-width: 260px;
max-width: min(420px, 95vw);
max-height: min(420px, 70vh);
display: flex;
flex-direction: column;
padding: 0; /* sections provide their own padding */
overflow: hidden;
box-shadow: var(--shadow-lg);
animation: popoverIn var(--duration-fast) var(--ease-default);
}
@keyframes popoverIn {
from { opacity: 0; transform: translateY(-4px) scale(0.98); }
to { opacity: 1; transform: translateY(0) scale(1); }
}
.popover__header {
display: flex;
align-items: center;
gap: var(--spacing-sm);
padding: var(--spacing-sm) var(--spacing-md);
border-bottom: 1px solid var(--color-border-subtle);
font-size: var(--text-sm);
}
.popover__scroll {
overflow: auto;
padding: 0;
}
.popover__table {
margin: 0;
width: 100%;
}
.popover__table th {
position: sticky;
top: 0;
background: var(--color-bg-raised, var(--color-bg-secondary));
z-index: 1;
}
/* Inline-table chip trigger — looks like a badge but is a button (cursor,
focus ring inherited from global :focus-visible). */
.chip-trigger {
border: none;
cursor: pointer;
font-family: inherit;
}
.chip-trigger:hover {
filter: brightness(1.08);
}
/* Truncate + ellipsize a long cell (e.g. OCI digest) without breaking the
table layout. Tooltip preserves the full value. */
.cell-truncate {
max-width: 160px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
/* Compact empty-state used inside expanded drawer sections (e.g. "No
models loaded on this node"). Dimmer than the page-level .empty-state
because it lives inside another container and shouldn't compete with
the row's primary content. */
.drawer-empty {
display: flex;
align-items: center;
gap: var(--spacing-sm);
padding: var(--spacing-sm) var(--spacing-md);
background: var(--color-bg-tertiary);
border: 1px dashed var(--color-border-subtle);
border-radius: var(--radius-md);
color: var(--color-text-muted);
font-size: var(--text-sm);
}
.drawer-empty > i {
font-size: var(--text-sm);
color: var(--color-text-muted);
opacity: 0.8;
}
/* Node-status indicator — replaces the tiny bullet with a proper LED-style
dot next to a bold status label. Colors are applied inline from statusConfig
so one primitive handles healthy/unhealthy/draining/pending in one shape. */
.node-status {
display: inline-flex;
align-items: center;
gap: 8px;
font-size: var(--text-sm);
font-weight: 600;
}
.node-status__dot {
width: 8px;
height: 8px;
border-radius: 50%;
box-shadow: 0 0 0 3px color-mix(in srgb, currentColor 15%, transparent);
flex-shrink: 0;
}
/* Row-chevron cell — small 20px toggle used in table rows that expand.
The row itself is still clickable; the chevron provides the visible
affordance users were missing. */
.row-chevron {
display: inline-flex;
align-items: center;
justify-content: center;
width: 20px;
height: 20px;
font-size: var(--text-xs);
color: var(--color-text-muted);
transition: transform var(--duration-fast) var(--ease-default);
}
.row-chevron.is-expanded {
transform: rotate(90deg);
color: var(--color-text-primary);
}
/* Upgrade banner — the yellow strip operators see when updates are available.
Mirrors the gallery so both pages speak the same visual language. */
.upgrade-banner {
display: flex;
align-items: center;
justify-content: space-between;
gap: var(--spacing-md);
padding: var(--spacing-sm) var(--spacing-md);
margin-bottom: var(--spacing-md);
background: var(--color-warning-light);
border: 1px solid var(--color-warning);
border-radius: var(--radius-md);
color: var(--color-warning);
}
.upgrade-banner__text {
display: inline-flex;
align-items: center;
gap: var(--spacing-sm);
font-weight: 500;
font-size: var(--text-sm);
}
.upgrade-banner__actions {
display: inline-flex;
gap: var(--spacing-xs);
align-items: center;
}
/* Tabs */
.tabs {

View File

@@ -0,0 +1,87 @@
import Toggle from './Toggle'
// FilterBar is the shared search + chip filter + toggles control strip that
// the Backends gallery pioneered. Pulled into its own component so the System
// page's two tabs stop looking like a different app — matching visual
// grammar + matching keyboard behavior.
//
// Props:
// search: controlled value for the search input.
// onSearchChange: (value) => void; null disables the search input entirely.
// searchPlaceholder: placeholder for the search input.
// filters: [{ key, label, icon }]; activeFilter is compared by key.
// Omit to hide the chip row.
// activeFilter: currently-selected filter key (use '' for "all" if
// that's the first entry in `filters`).
// onFilterChange: (key) => void.
// toggles: [{ key, label, icon?, checked, onChange }]; optional
// right-side toggle group (e.g. "Show all", "Development").
// rightSlot: arbitrary element rendered after the toggles — use for
// sort controls or extra buttons.
export default function FilterBar({
search,
onSearchChange,
searchPlaceholder = 'Search...',
filters,
activeFilter,
onFilterChange,
toggles,
rightSlot,
}) {
const hasFilters = Array.isArray(filters) && filters.length > 0
const hasToggles = Array.isArray(toggles) && toggles.length > 0
return (
<div className="filter-bar-group">
{onSearchChange && (
<div className="search-bar filter-bar-group__search">
<i className="fas fa-search search-icon" />
<input
className="input"
placeholder={searchPlaceholder}
value={search ?? ''}
onChange={e => onSearchChange(e.target.value)}
aria-label={searchPlaceholder}
/>
</div>
)}
{(hasFilters || hasToggles || rightSlot) && (
<div className="filter-bar-group__row">
{hasFilters && (
<div className="filter-bar" role="tablist" aria-label="Filter">
{filters.map(f => (
<button
key={f.key}
role="tab"
aria-selected={activeFilter === f.key}
className={`filter-btn ${activeFilter === f.key ? 'active' : ''}`}
onClick={() => onFilterChange(f.key)}
>
{f.icon && <i className={`fas ${f.icon}`} style={{ marginRight: 4 }} />}
{f.label}
{typeof f.count === 'number' && (
<span className="filter-btn__count">{f.count}</span>
)}
</button>
))}
</div>
)}
{(hasToggles || rightSlot) && (
<div className="filter-bar-group__right">
{hasToggles && toggles.map(t => (
<label key={t.key} className="filter-bar-group__toggle">
<Toggle checked={t.checked} onChange={t.onChange} />
{t.icon && <i className={`fas ${t.icon}`} />}
{t.label}
</label>
))}
{rightSlot}
</div>
)}
</div>
)}
</div>
)
}

View File

@@ -0,0 +1,168 @@
import { useRef, useState } from 'react'
import Popover from './Popover'
// NodeDistributionChip shows where something is installed/loaded across a
// cluster. Used by both Manage → Backends (per-row Nodes column, data =
// gallery NodeBackendRef with version/digest) and by the Models tab (data =
// LoadedOn with state/status). Supports arbitrary cluster size — small
// clusters render node-name chips inline, larger clusters collapse to a
// summary chip and reveal the full per-node table in a popover on click.
//
// Field names are intentionally forgiving: both {node_name, node_status} and
// {NodeName, NodeStatus} are supported so the component works whether it's
// reading directly off the JSON or off a hydrated class.
//
// Props:
// nodes: array of node refs (see shape below).
// compactThreshold: max nodes to render inline before collapsing (default 3).
// context: 'backends' (default) shows version/digest; 'models'
// shows state.
// emptyLabel: what to render when nodes is empty (default "—").
export default function NodeDistributionChip({
nodes,
compactThreshold = 3,
context = 'backends',
emptyLabel = '—',
}) {
const triggerRef = useRef(null)
const [open, setOpen] = useState(false)
const list = Array.isArray(nodes) ? nodes : []
if (list.length === 0) {
return <span className="cell-muted">{emptyLabel}</span>
}
const getName = n => n.node_name ?? n.NodeName ?? ''
const getStatus = n => n.node_status ?? n.NodeStatus ?? ''
const getState = n => n.state ?? n.State ?? ''
const getVersion = n => n.version ?? n.Version ?? ''
const getDigest = n => n.digest ?? n.Digest ?? ''
// Inline mode: render every node as its own chip. Good for small clusters
// where seeing the names directly is more useful than a summary.
if (list.length <= compactThreshold) {
return (
<div className="badge-row">
{list.map(n => {
const status = getStatus(n)
const variant = status === 'healthy' ? 'badge-success'
: status === 'draining' ? 'badge-info'
: 'badge-warning'
const title = context === 'models'
? `${getName(n)}${getState(n)} (${status})`
: `${getName(n)}${status}${getVersion(n) ? ` · v${getVersion(n)}` : ''}`
return (
<span key={n.node_id ?? n.NodeID ?? getName(n)} className={`badge ${variant}`} title={title}>
<i className="fas fa-server" /> {getName(n)}
</span>
)
})}
</div>
)
}
// Summary mode for anything bigger. Count unhealthy/offline explicitly so
// the chip tells an operator at-a-glance whether to click in. "Drift" for
// backends = more than one (version, digest) tuple across healthy nodes.
const total = list.length
const offline = list.filter(n => {
const s = getStatus(n)
return s !== 'healthy' && s !== 'draining'
}).length
const drift = context === 'backends' ? countDrift(list) : 0
const severity = offline > 0 || drift > 0 ? 'badge-warning' : 'badge-info'
return (
<>
<button
ref={triggerRef}
type="button"
className={`badge ${severity} chip-trigger`}
aria-expanded={open}
aria-haspopup="dialog"
onClick={e => { e.stopPropagation(); setOpen(v => !v) }}
>
<i className="fas fa-server" />
{' '}on {total} node{total === 1 ? '' : 's'}
{offline > 0 ? ` · ${offline} offline` : ''}
{drift > 0 ? ` · ${drift} drift` : ''}
</button>
<Popover
anchor={triggerRef}
open={open}
onClose={() => setOpen(false)}
ariaLabel={context === 'models' ? 'Model distribution' : 'Backend distribution'}
>
<div className="popover__header">
<strong>Installed on {total} node{total === 1 ? '' : 's'}</strong>
{offline > 0 && <span className="badge badge-warning">{offline} offline</span>}
{drift > 0 && <span className="badge badge-warning">{drift} drift</span>}
</div>
<div className="popover__scroll">
<table className="table popover__table">
<thead>
<tr>
<th>Node</th>
<th>Status</th>
{context === 'models' ? <th>State</th> : <>
<th>Version</th>
<th>Digest</th>
</>}
</tr>
</thead>
<tbody>
{list.map(n => (
<tr key={n.node_id ?? n.NodeID ?? getName(n)}>
<td className="cell-mono">{getName(n)}</td>
<td>
<span className={`badge ${getStatus(n) === 'healthy' ? 'badge-success' : 'badge-warning'}`}>
{getStatus(n)}
</span>
</td>
{context === 'models' ? (
<td className="cell-mono">{getState(n) || '—'}</td>
) : (
<>
<td className="cell-mono">{getVersion(n) ? `v${getVersion(n)}` : '—'}</td>
<td className="cell-mono cell-truncate" title={getDigest(n)}>
{getDigest(n) ? shortenDigest(getDigest(n)) : '—'}
</td>
</>
)}
</tr>
))}
</tbody>
</table>
</div>
</Popover>
</>
)
}
// countDrift counts nodes whose (version, digest) disagrees with the cluster
// majority. Mirrors the backend summarizeNodeDrift logic so the UI number
// matches what CheckUpgradesAgainst emits in UpgradeInfo.NodeDrift.
function countDrift(nodes) {
if (nodes.length <= 1) return 0
const counts = new Map()
for (const n of nodes) {
const key = `${n.version ?? n.Version ?? ''}|${n.digest ?? n.Digest ?? ''}`
counts.set(key, (counts.get(key) || 0) + 1)
}
if (counts.size === 1) return 0 // unanimous
let topKey = ''
let topCount = 0
for (const [k, v] of counts.entries()) {
if (v > topCount) { topKey = k; topCount = v }
}
return nodes.length - topCount
}
// shortenDigest trims a full OCI digest to the common 12-char form used in
// docker/oci tooling. Falls back to the raw value if it doesn't match.
function shortenDigest(digest) {
const m = /^(sha\d+:)?([a-f0-9]+)$/i.exec(digest)
if (!m) return digest
const hex = m[2]
return (m[1] ?? '') + hex.slice(0, 12)
}

View File

@@ -0,0 +1,86 @@
import { useEffect, useRef, useState, useCallback } from 'react'
// Minimal popover: positions itself below-right of the trigger's bounding box,
// flips above when there isn't room below, closes on outside click or Escape,
// returns focus to the trigger. Uses the existing .card surface so it picks
// up theme/border/shadow automatically — no new theming work.
//
// Props:
// anchor: ref to the trigger DOMElement (required)
// open: boolean
// onClose: () => void
// children: popover body
// ariaLabel: accessible label for the dialog
export default function Popover({ anchor, open, onClose, children, ariaLabel }) {
const popoverRef = useRef(null)
const [pos, setPos] = useState({ top: 0, left: 0, flipped: false })
// Compute position from the anchor's bounding box whenever we open or the
// viewport changes. 240px is the minimum width we'll reserve; bigger content
// grows naturally.
const reposition = useCallback(() => {
if (!anchor?.current) return
const rect = anchor.current.getBoundingClientRect()
const popoverHeight = popoverRef.current?.offsetHeight ?? 0
const spaceBelow = window.innerHeight - rect.bottom
const flipped = popoverHeight > spaceBelow - 16 && rect.top > popoverHeight
const top = flipped ? rect.top - popoverHeight - 8 : rect.bottom + 8
// Prefer left-aligned; clamp so we don't go off-screen right.
const left = Math.min(rect.left, window.innerWidth - 320)
setPos({ top, left: Math.max(8, left), flipped })
}, [anchor])
useEffect(() => {
if (!open) return
reposition()
window.addEventListener('resize', reposition)
window.addEventListener('scroll', reposition, true)
return () => {
window.removeEventListener('resize', reposition)
window.removeEventListener('scroll', reposition, true)
}
}, [open, reposition])
// Close on outside click or Escape. Mousedown (not click) so the close
// happens before a parent handler could re-trigger us.
useEffect(() => {
if (!open) return
const onMouseDown = (e) => {
if (popoverRef.current && !popoverRef.current.contains(e.target) && !anchor?.current?.contains(e.target)) {
onClose()
}
}
const onKey = (e) => { if (e.key === 'Escape') onClose() }
document.addEventListener('mousedown', onMouseDown)
document.addEventListener('keydown', onKey)
return () => {
document.removeEventListener('mousedown', onMouseDown)
document.removeEventListener('keydown', onKey)
}
}, [open, onClose, anchor])
// Return focus to the trigger when the popover closes — keyboard users
// shouldn't have to tab back through the whole page to find their spot.
useEffect(() => {
if (!open && anchor?.current) {
// requestAnimationFrame so the close is painted before focus jumps;
// otherwise screen readers announce the trigger mid-transition.
const raf = requestAnimationFrame(() => anchor.current?.focus?.())
return () => cancelAnimationFrame(raf)
}
}, [open, anchor])
if (!open) return null
return (
<div
ref={popoverRef}
role="dialog"
aria-label={ariaLabel}
className="popover card"
style={{ top: pos.top, left: pos.left }}
>
{children}
</div>
)
}

View File

@@ -3,6 +3,8 @@ import { useNavigate, useOutletContext, useSearchParams } from 'react-router-dom
import ResourceMonitor from '../components/ResourceMonitor'
import ConfirmDialog from '../components/ConfirmDialog'
import Toggle from '../components/Toggle'
import NodeDistributionChip from '../components/NodeDistributionChip'
import FilterBar from '../components/FilterBar'
import { useModels } from '../hooks/useModels'
import { backendControlApi, modelsApi, backendsApi, systemApi, nodesApi } from '../utils/api'
@@ -11,6 +13,22 @@ const TABS = [
{ key: 'backends', label: 'Backends', icon: 'fa-server' },
]
// formatInstalledAt renders an installed_at timestamp as a short relative/abs
// string suitable for dense tables. Returns the raw value if parsing fails so
// we never display "Invalid Date".
function formatInstalledAt(value) {
if (!value) return '—'
const d = new Date(value)
if (isNaN(d.getTime())) return value
const now = Date.now()
const diffMin = Math.floor((now - d.getTime()) / 60000)
if (diffMin < 1) return 'just now'
if (diffMin < 60) return `${diffMin}m ago`
if (diffMin < 60 * 24) return `${Math.floor(diffMin / 60)}h ago`
if (diffMin < 60 * 24 * 30) return `${Math.floor(diffMin / (60 * 24))}d ago`
return d.toISOString().slice(0, 10)
}
export default function Manage() {
const { addToast } = useOutletContext()
const navigate = useNavigate()
@@ -28,6 +46,24 @@ export default function Manage() {
const [distributedMode, setDistributedMode] = useState(false)
const [togglingModels, setTogglingModels] = useState(new Set())
const [pinningModels, setPinningModels] = useState(new Set())
// Filter state per tab. Persisted in the URL query so switching tabs
// doesn't lose the filter the operator just set.
const [modelsSearch, setModelsSearch] = useState(() => searchParams.get('mq') || '')
const [modelsFilter, setModelsFilter] = useState(() => searchParams.get('mf') || 'all')
const [backendsSearch, setBackendsSearch] = useState(() => searchParams.get('bq') || '')
const [backendsFilter, setBackendsFilter] = useState(() => searchParams.get('bf') || 'all')
// Sync filter state into the URL so deep-links + tab switches survive.
useEffect(() => {
const p = new URLSearchParams(searchParams)
const setOrDelete = (k, v) => { if (v && v !== 'all') p.set(k, v); else p.delete(k) }
setOrDelete('mq', modelsSearch)
setOrDelete('mf', modelsFilter)
setOrDelete('bq', backendsSearch)
setOrDelete('bf', backendsFilter)
setSearchParams(p, { replace: true })
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [modelsSearch, modelsFilter, backendsSearch, backendsFilter])
const handleTabChange = (tab) => {
setActiveTab(tab)
@@ -64,6 +100,35 @@ export default function Manage() {
nodesApi.list().then(() => setDistributedMode(true)).catch(() => {})
}, [fetchLoadedModels, fetchBackends])
// Auto-refresh the Models tab every 10s in distributed mode so ghost models
// (loaded on a worker but absent from this frontend's in-memory cache)
// clear on their own without the user clicking Update.
const [lastSyncedAt, setLastSyncedAt] = useState(() => Date.now())
const [nowTick, setNowTick] = useState(() => Date.now())
useEffect(() => {
if (!distributedMode || activeTab !== 'models') return
const interval = setInterval(() => {
refetchModels()
fetchLoadedModels()
setLastSyncedAt(Date.now())
}, 10000)
return () => clearInterval(interval)
}, [distributedMode, activeTab, refetchModels, fetchLoadedModels])
// Drive the "last synced Ns ago" label without over-rendering the table.
useEffect(() => {
if (!distributedMode) return
const interval = setInterval(() => setNowTick(Date.now()), 1000)
return () => clearInterval(interval)
}, [distributedMode])
const lastSyncedAgo = (() => {
const s = Math.max(0, Math.floor((nowTick - lastSyncedAt) / 1000))
if (s < 5) return 'just now'
if (s < 60) return `${s}s ago`
const m = Math.floor(s / 60)
return `${m}m ago`
})()
// Fetch available backend upgrades
useEffect(() => {
if (activeTab === 'backends') {
@@ -196,6 +261,29 @@ export default function Manage() {
}
}
const [upgradingAll, setUpgradingAll] = useState(false)
const [showOnlyUpgradable, setShowOnlyUpgradable] = useState(false)
const handleUpgradeAll = async () => {
const names = Object.keys(upgrades)
if (names.length === 0) return
setUpgradingAll(true)
try {
// Serial upgrade — matches the gallery's Upgrade All behavior.
// Each backend upgrade is itself a cluster-wide fan-out, so parallel
// calls would multiply load on every worker.
for (const name of names) {
try {
await backendsApi.upgrade(name)
} catch (err) {
addToast(`Upgrade failed for ${name}: ${err.message}`, 'error')
}
}
addToast(`Upgrade started for ${names.length} backend${names.length === 1 ? '' : 's'}`, 'info')
} finally {
setUpgradingAll(false)
}
}
const handleDeleteBackend = (name) => {
setConfirmDialog({
title: 'Delete Backend',
@@ -227,29 +315,74 @@ export default function Manage() {
{/* Tabs */}
<div className="tabs" style={{ marginTop: 'var(--spacing-lg)', marginBottom: 'var(--spacing-md)' }}>
{TABS.map(t => (
<button
key={t.key}
className={`tab ${activeTab === t.key ? 'tab-active' : ''}`}
onClick={() => handleTabChange(t.key)}
>
<i className={`fas ${t.icon}`} style={{ marginRight: 6 }} />
{t.label}
{t.key === 'models' && !modelsLoading && ` (${models.length})`}
{t.key === 'backends' && !backendsLoading && ` (${backends.length})`}
</button>
))}
{TABS.map(t => {
const upgradeCount = t.key === 'backends' ? Object.keys(upgrades).length : 0
return (
<button
key={t.key}
className={`tab ${activeTab === t.key ? 'tab-active' : ''}`}
onClick={() => handleTabChange(t.key)}
>
<i className={`fas ${t.icon}`} style={{ marginRight: 6 }} />
{t.label}
{t.key === 'models' && !modelsLoading && ` (${models.length})`}
{t.key === 'backends' && !backendsLoading && ` (${backends.length})`}
{upgradeCount > 0 && (
<span className="tab-pill tab-pill--warning" title={`${upgradeCount} update${upgradeCount === 1 ? '' : 's'} available`}>
<i className="fas fa-arrow-up" /> {upgradeCount}
</span>
)}
</button>
)
})}
</div>
{/* Models Tab */}
{activeTab === 'models' && (
{activeTab === 'models' && (() => {
// Computed filters — done here so the result is available both to
// the FilterBar counts and to the table body.
const MODEL_FILTERS = [
{ key: 'all', label: 'All', icon: 'fa-layer-group' },
{ key: 'running', label: 'Running', icon: 'fa-circle-play' },
{ key: 'idle', label: 'Idle', icon: 'fa-pause' },
{ key: 'disabled', label: 'Disabled', icon: 'fa-ban' },
{ key: 'pinned', label: 'Pinned', icon: 'fa-thumbtack' },
...(distributedMode ? [{ key: 'distributed', label: 'Distributed', icon: 'fa-server' }] : []),
]
const passesFilter = (m) => {
if (modelsFilter === 'running') return !m.disabled && (loadedModelIds.has(m.id) || (m.loaded_on && m.loaded_on.length > 0))
if (modelsFilter === 'idle') return !m.disabled && !loadedModelIds.has(m.id) && !(m.loaded_on && m.loaded_on.length > 0)
if (modelsFilter === 'disabled') return !!m.disabled
if (modelsFilter === 'pinned') return !!m.pinned
if (modelsFilter === 'distributed') return Array.isArray(m.loaded_on) && m.loaded_on.length > 0
return true
}
const q = modelsSearch.trim().toLowerCase()
const passesSearch = (m) => !q || (m.id || '').toLowerCase().includes(q) || (m.backend || '').toLowerCase().includes(q)
const visibleModels = models.filter(m => passesFilter(m) && passesSearch(m))
return (
<div>
<div style={{ display: 'flex', alignItems: 'center', justifyContent: 'flex-end', marginBottom: 'var(--spacing-md)' }}>
<button className="btn btn-secondary btn-sm" onClick={handleReload} disabled={reloading}>
<i className={`fas ${reloading ? 'fa-spinner fa-spin' : 'fa-rotate'}`} />
{reloading ? 'Updating...' : 'Update'}
</button>
</div>
<FilterBar
search={modelsSearch}
onSearchChange={setModelsSearch}
searchPlaceholder="Search models by name or backend..."
filters={MODEL_FILTERS}
activeFilter={modelsFilter}
onFilterChange={setModelsFilter}
rightSlot={(
<>
{distributedMode && (
<span className="cell-muted" title="Auto-refreshes every 10s in distributed mode so ghost models clear promptly">
<i className="fas fa-rotate" /> Last synced {lastSyncedAgo}
</span>
)}
<button className="btn btn-secondary btn-sm" onClick={handleReload} disabled={reloading}>
<i className={`fas ${reloading ? 'fa-spinner fa-spin' : 'fa-rotate'}`} />
{reloading ? ' Updating...' : ' Update'}
</button>
</>
)}
/>
{modelsLoading ? (
<div className="card" style={{ padding: 'var(--spacing-xl)', textAlign: 'center', color: 'var(--color-text-muted)' }}>
@@ -274,6 +407,12 @@ export default function Manage() {
</a>
</div>
</div>
) : visibleModels.length === 0 ? (
<div className="empty-state">
<i className="fas fa-filter" />
<p>No models match the current filter.</p>
<button className="btn btn-ghost btn-sm" onClick={() => { setModelsSearch(''); setModelsFilter('all') }}>Clear filters</button>
</div>
) : (
<div className="table-container">
<table className="table">
@@ -288,7 +427,7 @@ export default function Manage() {
</tr>
</thead>
<tbody>
{models.map(model => (
{visibleModels.map(model => (
<tr key={model.id} style={{ opacity: model.disabled ? 0.55 : 1, transition: 'opacity 0.2s' }}>
{/* Enable/Disable toggle */}
<td>
@@ -329,21 +468,33 @@ export default function Manage() {
</div>
</div>
</td>
{/* Status */}
{/* Status / Distribution */}
<td>
{model.disabled ? (
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
<i className="fas fa-ban" style={{ fontSize: '6px' }} /> Disabled
</span>
) : loadedModelIds.has(model.id) ? (
<span className="badge badge-success">
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Running
</span>
) : (
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Idle
</span>
)}
<div className="cell-stack">
{model.disabled ? (
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
<i className="fas fa-ban" /> Disabled
</span>
) : model.loaded_on && model.loaded_on.length > 0 ? (
// Distributed mode: surface where the model is
// actually loaded. Shared chip scales to any cluster
// size (inline for <=3, popover for larger).
<NodeDistributionChip nodes={model.loaded_on} context="models" />
) : loadedModelIds.has(model.id) ? (
<span className="badge badge-success">
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Running
</span>
) : (
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Idle
</span>
)}
{model.source === 'registry-only' && (
<span className="badge badge-warning" title="Discovered on a worker but not configured locally. Persist the config to make it permanent.">
<i className="fas fa-ghost" /> Adopted
</span>
)}
</div>
</td>
{/* Backend */}
<td>
@@ -394,11 +545,34 @@ export default function Manage() {
</div>
)}
</div>
)}
)
})()}
{/* Backends Tab */}
{activeTab === 'backends' && (
<div>
{/* Upgrade banner — mirrors the gallery so operators can't miss updates */}
{!backendsLoading && Object.keys(upgrades).length > 0 && (
<div className="upgrade-banner">
<div className="upgrade-banner__text">
<i className="fas fa-arrow-up" />
<span>
{Object.keys(upgrades).length} backend{Object.keys(upgrades).length === 1 ? ' has' : 's have'} updates available
</span>
</div>
<div className="upgrade-banner__actions">
<button
className="btn btn-primary btn-sm"
onClick={handleUpgradeAll}
disabled={upgradingAll}
>
<i className={`fas ${upgradingAll ? 'fa-spinner fa-spin' : 'fa-arrow-up'}`} />
{upgradingAll ? ' Upgrading...' : ' Upgrade all'}
</button>
</div>
</div>
)}
{backendsLoading ? (
<div style={{ textAlign: 'center', padding: 'var(--spacing-md)', color: 'var(--color-text-muted)', fontSize: '0.875rem' }}>
Loading backends...
@@ -419,109 +593,217 @@ export default function Manage() {
</a>
</div>
</div>
) : (
<div className="table-container">
) : (() => {
// Count chip badges: show N in the filter buttons so operators can
// see at a glance how their chips bucket the list.
const upgradableCount = backends.filter(b => upgrades[b.Name]).length
const userCount = backends.filter(b => !b.IsSystem).length
const systemCount = backends.filter(b => b.IsSystem).length
const metaCount = backends.filter(b => b.IsMeta).length
const offlineCount = backends.filter(b => {
const n = b.Nodes || b.nodes || []
return n.some(x => {
const s = x.node_status || x.NodeStatus
return s && s !== 'healthy' && s !== 'draining'
})
}).length
const BACKEND_FILTERS = [
{ key: 'all', label: 'All', icon: 'fa-layer-group', count: backends.length },
{ key: 'user', label: 'User', icon: 'fa-download', count: userCount },
{ key: 'system', label: 'System', icon: 'fa-shield-alt', count: systemCount },
{ key: 'meta', label: 'Meta', icon: 'fa-layer-group', count: metaCount },
...(upgradableCount > 0 ? [{ key: 'upgradable', label: 'Updates', icon: 'fa-arrow-up', count: upgradableCount }] : []),
...(distributedMode && offlineCount > 0 ? [{ key: 'offline', label: 'Offline nodes', icon: 'fa-exclamation-circle', count: offlineCount }] : []),
]
const q = backendsSearch.trim().toLowerCase()
const passesSearch = (b) => !q
|| (b.Name || '').toLowerCase().includes(q)
|| (b.Metadata?.alias || '').toLowerCase().includes(q)
|| (b.Metadata?.meta_backend_for || '').toLowerCase().includes(q)
const passesFilter = (b) => {
switch (backendsFilter) {
case 'user': return !b.IsSystem
case 'system': return !!b.IsSystem
case 'meta': return !!b.IsMeta
case 'upgradable': return !!upgrades[b.Name]
case 'offline': {
const n = b.Nodes || b.nodes || []
return n.some(x => {
const s = x.node_status || x.NodeStatus
return s && s !== 'healthy' && s !== 'draining'
})
}
default: return true
}
}
// Legacy "showOnlyUpgradable" toggle is now the 'upgradable' chip —
// keep backward-compat by mapping it onto the new filter.
if (showOnlyUpgradable && backendsFilter !== 'upgradable') {
// One-shot reconciliation — the old state becomes the new chip.
setBackendsFilter('upgradable')
setShowOnlyUpgradable(false)
}
const visibleBackends = backends.filter(b => passesFilter(b) && passesSearch(b))
if (visibleBackends.length === 0) {
return (
<>
<FilterBar
search={backendsSearch}
onSearchChange={setBackendsSearch}
searchPlaceholder="Search backends by name or alias..."
filters={BACKEND_FILTERS}
activeFilter={backendsFilter}
onFilterChange={setBackendsFilter}
/>
<div className="empty-state">
<i className="fas fa-filter" />
<p>No backends match the current filter.</p>
<button className="btn btn-ghost btn-sm" onClick={() => { setBackendsSearch(''); setBackendsFilter('all') }}>Clear filters</button>
</div>
</>
)
}
return (
<>
<FilterBar
search={backendsSearch}
onSearchChange={setBackendsSearch}
searchPlaceholder="Search backends by name or alias..."
filters={BACKEND_FILTERS}
activeFilter={backendsFilter}
onFilterChange={setBackendsFilter}
/>
<div className="table-container">
<table className="table">
<thead>
<tr>
<th>Name</th>
<th>Type</th>
<th>Metadata</th>
<th>Version</th>
{distributedMode && <th>Nodes</th>}
<th>Installed</th>
<th style={{ textAlign: 'right' }}>Actions</th>
</tr>
</thead>
<tbody>
{backends.map((backend, i) => (
{visibleBackends.map((backend, i) => {
const upgradeInfo = upgrades[backend.Name]
const hasDrift = upgradeInfo?.node_drift?.length > 0
const nodes = backend.Nodes || backend.nodes || []
return (
<tr key={backend.Name || i}>
<td>
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)' }}>
<i className="fas fa-cog" style={{ color: 'var(--color-accent)', fontSize: '0.75rem' }} />
<span style={{ fontWeight: 500 }}>{backend.Name}</span>
<div className="cell-name">
<i className="fas fa-cog" />
<span>{backend.Name}</span>
{backend.Metadata?.alias && (
<span className="cell-subtle">alias: {backend.Metadata.alias}</span>
)}
{backend.Metadata?.meta_backend_for && (
<span className="cell-subtle">for: {backend.Metadata.meta_backend_for}</span>
)}
</div>
</td>
<td>
<div style={{ display: 'flex', gap: '4px', flexWrap: 'wrap' }}>
<div className="badge-row">
{backend.IsSystem ? (
<span className="badge badge-info" style={{ fontSize: '0.625rem' }}>
<i className="fas fa-shield-alt" style={{ fontSize: '0.5rem', marginRight: 2 }} />System
<span className="badge badge-info">
<i className="fas fa-shield-alt" /> System
</span>
) : (
<span className="badge badge-success" style={{ fontSize: '0.625rem' }}>
<i className="fas fa-download" style={{ fontSize: '0.5rem', marginRight: 2 }} />User
<span className="badge badge-success">
<i className="fas fa-download" /> User
</span>
)}
{backend.IsMeta && (
<span className="badge" style={{ background: 'var(--color-accent-light)', color: 'var(--color-accent)', fontSize: '0.625rem' }}>
<i className="fas fa-layer-group" style={{ fontSize: '0.5rem', marginRight: 2 }} />Meta
<span className="badge badge-accent">
<i className="fas fa-layer-group" /> Meta
</span>
)}
</div>
</td>
<td>
<div style={{ display: 'flex', flexDirection: 'column', gap: 2, fontSize: '0.75rem', color: 'var(--color-text-secondary)' }}>
{backend.Metadata?.alias && (
<span>
<i className="fas fa-tag" style={{ fontSize: '0.5rem', marginRight: 4 }} />
Alias: <span style={{ color: 'var(--color-text-primary)' }}>{backend.Metadata.alias}</span>
<div className="cell-stack">
{backend.Metadata?.version ? (
<span className="cell-mono">v{backend.Metadata.version}</span>
) : (
<span className="cell-muted"></span>
)}
{upgradeInfo && (
<span className="badge badge-warning" title={upgradeInfo.available_version ? `Upgrade to v${upgradeInfo.available_version}` : 'Update available'}>
<i className="fas fa-arrow-up" />
{upgradeInfo.available_version ? ` v${upgradeInfo.available_version}` : ' Update available'}
</span>
)}
{backend.Metadata?.meta_backend_for && (
<span>
<i className="fas fa-link" style={{ fontSize: '0.5rem', marginRight: 4 }} />
For: <span style={{ color: 'var(--color-accent)' }}>{backend.Metadata.meta_backend_for}</span>
{hasDrift && (
<span
className="badge badge-warning"
title={`Drift: ${upgradeInfo.node_drift.map(d => `${d.node_name}${d.version ? ' v' + d.version : ''}`).join(', ')}`}
>
<i className="fas fa-code-branch" />
{' '}Drift: {upgradeInfo.node_drift.length} node{upgradeInfo.node_drift.length === 1 ? '' : 's'}
</span>
)}
{backend.Metadata?.version && (
<span>
<i className="fas fa-code-branch" style={{ fontSize: '0.5rem', marginRight: 4 }} />
Version: <span style={{ color: 'var(--color-text-primary)' }}>v{backend.Metadata.version}</span>
{upgrades[backend.Name] && (
<span style={{ color: '#856404', marginLeft: 4 }}>
v{upgrades[backend.Name].available_version}
</span>
)}
</span>
)}
{backend.Metadata?.installed_at && (
<span>
<i className="fas fa-calendar" style={{ fontSize: '0.5rem', marginRight: 4 }} />
{backend.Metadata.installed_at}
</span>
)}
{!backend.Metadata?.alias && !backend.Metadata?.meta_backend_for && !backend.Metadata?.installed_at && '—'}
</div>
</td>
{distributedMode && (
<td>
<NodeDistributionChip nodes={nodes} context="backends" />
</td>
)}
<td>
<div style={{ display: 'flex', gap: 'var(--spacing-xs)', justifyContent: 'flex-end' }}>
{!backend.IsSystem ? (
<span className="cell-muted cell-mono">
{backend.Metadata?.installed_at ? formatInstalledAt(backend.Metadata.installed_at) : '—'}
</span>
</td>
<td>
<div className="row-actions">
{backend.IsSystem ? (
<span className="badge" title="System backends are managed outside the gallery">
<i className="fas fa-lock" /> Protected
</span>
) : (
<>
{upgradeInfo ? (
<button
className="btn btn-primary btn-sm"
onClick={() => handleUpgradeBackend(backend.Name)}
disabled={reinstallingBackends.has(backend.Name)}
>
<i className={`fas ${reinstallingBackends.has(backend.Name) ? 'fa-spinner fa-spin' : 'fa-arrow-up'}`} />
{' '}Upgrade{upgradeInfo.available_version ? ` to v${upgradeInfo.available_version}` : ''}
</button>
) : (
<button
className="btn btn-secondary btn-sm"
onClick={() => handleReinstallBackend(backend.Name)}
disabled={reinstallingBackends.has(backend.Name)}
>
<i className={`fas ${reinstallingBackends.has(backend.Name) ? 'fa-spinner fa-spin' : 'fa-rotate'}`} />
{' '}Reinstall
</button>
)}
<button
className={`btn ${upgrades[backend.Name] ? 'btn-primary' : 'btn-secondary'} btn-sm`}
onClick={() => upgrades[backend.Name] ? handleUpgradeBackend(backend.Name) : handleReinstallBackend(backend.Name)}
disabled={reinstallingBackends.has(backend.Name)}
title={upgrades[backend.Name] ? `Upgrade to v${upgrades[backend.Name]?.available_version || 'latest'}` : 'Reinstall'}
>
<i className={`fas ${reinstallingBackends.has(backend.Name) ? 'fa-spinner fa-spin' : upgrades[backend.Name] ? 'fa-arrow-up' : 'fa-rotate'}`} />
</button>
<button
className="btn btn-danger btn-sm"
className="btn btn-danger-ghost btn-sm"
onClick={() => handleDeleteBackend(backend.Name)}
title="Delete"
title="Delete backend (removes from all nodes)"
>
<i className="fas fa-trash" />
</button>
</>
) : (
<span style={{ fontSize: '0.75rem', color: 'var(--color-text-muted)' }}></span>
)}
</div>
</td>
</tr>
))}
)
})}
</tbody>
</table>
</div>
)}
</div>
</>
)
})()}
</div>
)}

View File

@@ -51,15 +51,22 @@ const modelStateConfig = {
idle: { bg: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)', border: 'var(--color-border-subtle)' },
}
function StatCard({ icon, label, value, color }) {
function StatCard({ icon, label, value, color, accentVar }) {
// accentVar: optional CSS variable for the left edge + icon chip, e.g.
// "--color-success". When unset the card reads neutral — used for simple
// counts so they don't compete with the semantic cards for attention.
const accent = color || (accentVar ? `var(${accentVar})` : 'var(--color-text-primary)')
return (
<div className="card" style={{ padding: 'var(--spacing-sm) var(--spacing-md)', flex: '1 1 0', minWidth: 120 }}>
<div style={{ display: 'flex', alignItems: 'center', gap: 6, marginBottom: 2 }}>
<i className={icon} style={{ color: 'var(--color-text-muted)', fontSize: '0.75rem' }} />
<span style={{ fontSize: '0.6875rem', color: 'var(--color-text-muted)', fontWeight: 500, textTransform: 'uppercase', letterSpacing: '0.03em' }}>{label}</span>
<div
className="stat-card"
style={accentVar ? { ['--stat-accent']: `var(${accentVar})` } : undefined}
>
<div className="stat-card__body">
<div className="stat-card__label">{label}</div>
<div className="stat-card__value" style={{ color: accent }}>{value}</div>
</div>
<div style={{ fontSize: '1.375rem', fontWeight: 700, fontFamily: 'JetBrains Mono, monospace', color: color || 'var(--color-text-primary)' }}>
{value}
<div className="stat-card__icon" style={accentVar ? { color: accent } : undefined}>
<i className={icon} />
</div>
</div>
)
@@ -543,45 +550,24 @@ export default function Nodes() {
</div>
{/* Tabs */}
<div style={{ display: 'flex', gap: 'var(--spacing-xs)', marginBottom: 'var(--spacing-lg)', borderBottom: '2px solid var(--color-border)' }}>
<div className="tabs" style={{ marginBottom: 'var(--spacing-lg)' }}>
<button
onClick={() => setActiveTab('backend')}
style={{
padding: 'var(--spacing-sm) var(--spacing-lg)',
border: 'none', cursor: 'pointer', fontWeight: 600, fontSize: '0.875rem',
background: 'none',
color: activeTab === 'backend' ? 'var(--color-primary)' : 'var(--color-text-muted)',
borderBottom: activeTab === 'backend' ? '2px solid var(--color-primary)' : '2px solid transparent',
marginBottom: '-2px',
}}
className={`tab ${activeTab === 'backend' ? 'tab-active' : ''}`}
>
<i className="fas fa-server" style={{ marginRight: 6 }} />
Backend Workers ({backendNodes.length})
</button>
<button
onClick={() => setActiveTab('agent')}
style={{
padding: 'var(--spacing-sm) var(--spacing-lg)',
border: 'none', cursor: 'pointer', fontWeight: 600, fontSize: '0.875rem',
background: 'none',
color: activeTab === 'agent' ? 'var(--color-primary)' : 'var(--color-text-muted)',
borderBottom: activeTab === 'agent' ? '2px solid var(--color-primary)' : '2px solid transparent',
marginBottom: '-2px',
}}
className={`tab ${activeTab === 'agent' ? 'tab-active' : ''}`}
>
<i className="fas fa-robot" style={{ marginRight: 6 }} />
Agent Workers ({agentNodes.length})
</button>
<button
onClick={() => setActiveTab('scheduling')}
style={{
padding: 'var(--spacing-sm) var(--spacing-lg)',
border: 'none', cursor: 'pointer', fontWeight: 600, fontSize: '0.875rem',
background: 'none',
color: activeTab === 'scheduling' ? 'var(--color-primary)' : 'var(--color-text-muted)',
borderBottom: activeTab === 'scheduling' ? '2px solid var(--color-primary)' : '2px solid transparent',
marginBottom: '-2px',
}}
className={`tab ${activeTab === 'scheduling' ? 'tab-active' : ''}`}
>
<i className="fas fa-calendar-alt" style={{ marginRight: 6 }} />
Scheduling ({schedulingConfigs.length})
@@ -590,13 +576,17 @@ export default function Nodes() {
{activeTab !== 'scheduling' && <>
{/* Stat cards */}
<div style={{ display: 'flex', gap: 'var(--spacing-md)', marginBottom: 'var(--spacing-xl)', flexWrap: 'wrap' }}>
<StatCard icon={activeTab === 'agent' ? 'fas fa-robot' : 'fas fa-server'} label={`Total ${activeTab === 'agent' ? 'Agent' : 'Backend'} Workers`} value={total} />
<StatCard icon="fas fa-check-circle" label="Healthy" value={healthy} color="var(--color-success)" />
<StatCard icon="fas fa-exclamation-circle" label="Unhealthy" value={unhealthy} color={unhealthy > 0 ? 'var(--color-error)' : undefined} />
<StatCard icon="fas fa-hourglass-half" label="Draining" value={draining} color={draining > 0 ? 'var(--color-warning)' : undefined} />
<div className="stat-grid">
<StatCard icon={activeTab === 'agent' ? 'fas fa-robot' : 'fas fa-server'}
label={`Total ${activeTab === 'agent' ? 'Agent' : 'Backend'} Workers`} value={total} />
<StatCard icon="fas fa-check-circle" label="Healthy" value={healthy}
accentVar={healthy > 0 ? '--color-success' : undefined} />
<StatCard icon="fas fa-exclamation-circle" label="Unhealthy" value={unhealthy}
accentVar={unhealthy > 0 ? '--color-error' : undefined} />
<StatCard icon="fas fa-hourglass-half" label="Draining" value={draining}
accentVar={draining > 0 ? '--color-warning' : undefined} />
{pending > 0 && (
<StatCard icon="fas fa-clock" label="Pending" value={pending} color="var(--color-warning)" />
<StatCard icon="fas fa-clock" label="Pending" value={pending} accentVar="--color-warning" />
)}
{activeTab === 'backend' && (() => {
const clusterTotalVRAM = backendNodes.reduce((sum, n) => sum + (n.total_vram || 0), 0)
@@ -614,7 +604,7 @@ export default function Nodes() {
)}
<StatCard icon="fas fa-cube" label="Models Loaded" value={totalModelsLoaded} />
<StatCard icon="fas fa-exchange-alt" label="In-Flight Requests" value={totalInFlight}
color={totalInFlight > 0 ? 'var(--color-primary)' : undefined} />
accentVar={totalInFlight > 0 ? '--color-primary' : undefined} />
</>
)
})()}
@@ -627,15 +617,11 @@ export default function Nodes() {
<>
<button
onClick={() => setShowTips(t => !t)}
style={{
background: 'none', border: 'none', cursor: 'pointer',
color: 'var(--color-primary)', fontSize: '0.8125rem', fontWeight: 500,
display: 'flex', alignItems: 'center', gap: 6,
padding: 0, marginBottom: 'var(--spacing-md)',
}}
className="nodes-add-worker"
aria-expanded={showTips}
>
<i className={`fas fa-chevron-${showTips ? 'down' : 'right'}`} style={{ fontSize: '0.625rem' }} />
Add more workers
<i className={`fas ${showTips ? 'fa-chevron-down' : 'fa-plus'}`} />
{showTips ? 'Hide instructions' : 'Register a new worker'}
</button>
{showTips && <WorkerHintCard addToast={addToast} activeTab={activeTab} hasWorkers />}
</>
@@ -685,23 +671,28 @@ export default function Nodes() {
>
<td>
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)' }}>
<i className="fas fa-server" style={{ color: 'var(--color-text-muted)', fontSize: '0.875rem' }} />
{canExpand && (
<span className={`row-chevron${isExpanded ? ' is-expanded' : ''}`} aria-hidden="true">
<i className="fas fa-chevron-right" />
</span>
)}
<i className="fas fa-server" style={{ color: 'var(--color-text-muted)', fontSize: 'var(--text-sm)' }} />
<div>
<div style={{ fontWeight: 600, fontSize: '0.875rem' }}>{node.name}</div>
<div style={{ fontSize: '0.75rem', fontFamily: "'JetBrains Mono', monospace", color: 'var(--color-text-muted)' }}>
<div style={{ fontWeight: 600, fontSize: 'var(--text-sm)' }}>{node.name}</div>
<div className="cell-mono cell-muted">
{node.address}
</div>
{node.labels && Object.keys(node.labels).length > 0 && (
<div style={{ display: 'flex', flexWrap: 'wrap', gap: 3, marginTop: 3 }}>
{Object.entries(node.labels).slice(0, 5).map(([k, v]) => (
<span key={k} style={{
fontSize: '0.625rem', padding: '1px 5px', borderRadius: 3,
background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)',
fontFamily: "'JetBrains Mono', monospace", border: '1px solid var(--color-border-subtle)',
<span key={k} className="cell-mono" style={{
padding: '1px 5px', borderRadius: 3,
background: 'var(--color-bg-tertiary)',
border: '1px solid var(--color-border-subtle)',
}}>{k}={v}</span>
))}
{Object.keys(node.labels).length > 5 && (
<span style={{ fontSize: '0.625rem', color: 'var(--color-text-muted)' }}>
<span className="cell-muted">
+{Object.keys(node.labels).length - 5} more
</span>
)}
@@ -711,12 +702,10 @@ export default function Nodes() {
</div>
</td>
<td>
<div style={{ display: 'flex', alignItems: 'center', gap: 6 }}>
<i className="fas fa-circle" style={{ fontSize: '0.5rem', color: status.color }} />
<span style={{ fontSize: '0.8125rem', color: status.color, fontWeight: 500 }}>
{status.label}
</span>
</div>
<span className="node-status" style={{ color: status.color }}>
<span className="node-status__dot" style={{ background: status.color }} />
{status.label}
</span>
</td>
<td>
{hasGPU && totalVRAMStr ? (
@@ -745,38 +734,37 @@ export default function Nodes() {
</span>
</td>
<td style={{ textAlign: 'right' }}>
<div style={{ display: 'flex', gap: 'var(--spacing-xs)', justifyContent: 'flex-end' }} onClick={e => e.stopPropagation()}>
<div className="row-actions" onClick={e => e.stopPropagation()}>
{node.status === 'pending' && (
<button
className="btn btn-primary btn-sm"
onClick={() => handleApprove(node.id)}
title="Approve node"
>
<i className="fas fa-check" />
<i className="fas fa-check" /> Approve
</button>
)}
{node.status === 'draining' && (
<button
className="btn btn-secondary btn-sm"
onClick={() => handleResume(node.id)}
title="Resume node"
title="Resume accepting requests"
>
<i className="fas fa-play" />
<i className="fas fa-play" /> Resume
</button>
)}
{node.status !== 'draining' && node.status !== 'pending' && (
<button
className="btn btn-secondary btn-sm"
onClick={() => handleDrain(node.id)}
title="Drain node"
title="Stop sending new requests to this node"
>
<i className="fas fa-pause" />
<i className="fas fa-pause" /> Drain
</button>
)}
<button
className="btn btn-danger btn-sm"
className="btn btn-danger-ghost btn-sm"
onClick={() => setConfirmDelete(node)}
title="Remove node"
title="Remove node from cluster"
>
<i className="fas fa-trash" />
</button>
@@ -794,7 +782,10 @@ export default function Nodes() {
{!models ? (
<LoadingSpinner size="sm" />
) : models.length === 0 ? (
<p style={{ fontSize: '0.8125rem', color: 'var(--color-text-muted)' }}>No models loaded on this node</p>
<div className="drawer-empty">
<i className="fas fa-cube" />
<span>No models loaded on this node yet.</span>
</div>
) : (
<table className="table" style={{ margin: 0 }}>
<thead>
@@ -870,7 +861,10 @@ export default function Nodes() {
{!backends ? (
<LoadingSpinner size="sm" />
) : backends.length === 0 ? (
<p style={{ fontSize: '0.8125rem', color: 'var(--color-text-muted)' }}>No backends installed on this node</p>
<div className="drawer-empty">
<i className="fas fa-cogs" />
<span>No backends installed on this node. Install one from the gallery to schedule models here.</span>
</div>
) : (
<table className="table" style={{ margin: 0 }}>
<thead>

View File

@@ -510,28 +510,89 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
modelConfigs := cl.GetAllModelsConfigs()
modelsWithoutConfig, _ := galleryop.ListModels(cl, ml, config.NoFilterFn, galleryop.LOOSE_ONLY)
type loadedOn struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
State string `json:"state"`
NodeStatus string `json:"node_status"`
}
type modelCapability struct {
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
// LoadedOn is populated only when the node registry is active
// (distributed mode). Lets the UI show "loaded on worker-1" without
// the operator having to expand every node manually. An empty slice
// with nil reports "no loaded replicas" vs. nil reports "not in
// cluster mode" — the frontend treats both as "no distribution info".
LoadedOn []loadedOn `json:"loaded_on,omitempty"`
// Source="registry-only" marks models adopted from the cluster that
// have no local config yet (ghosts that the reconciler discovered).
Source string `json:"source,omitempty"`
}
// Join with the node registry when we have one (distributed mode). A
// single registry fetch + map join beats per-model queries for the
// 100-model case.
var loadedByModel map[string][]loadedOn
if ds := applicationInstance.Distributed(); ds != nil && ds.Registry != nil {
nodeModels, err := ds.Registry.ListAllLoadedModels(c.Request().Context())
if err == nil {
allNodes, _ := ds.Registry.List(c.Request().Context())
nameByID := make(map[string]string, len(allNodes))
statusByID := make(map[string]string, len(allNodes))
for _, n := range allNodes {
nameByID[n.ID] = n.Name
statusByID[n.ID] = n.Status
}
loadedByModel = make(map[string][]loadedOn)
for _, nm := range nodeModels {
loadedByModel[nm.ModelName] = append(loadedByModel[nm.ModelName], loadedOn{
NodeID: nm.NodeID,
NodeName: nameByID[nm.NodeID],
State: nm.State,
NodeStatus: statusByID[nm.NodeID],
})
}
}
}
result := make([]modelCapability, 0, len(modelConfigs)+len(modelsWithoutConfig))
seen := make(map[string]bool, len(modelConfigs)+len(modelsWithoutConfig))
for _, cfg := range modelConfigs {
seen[cfg.Name] = true
result = append(result, modelCapability{
ID: cfg.Name,
Capabilities: cfg.KnownUsecaseStrings,
Backend: cfg.Backend,
Disabled: cfg.IsDisabled(),
Pinned: cfg.IsPinned(),
LoadedOn: loadedByModel[cfg.Name],
})
}
for _, name := range modelsWithoutConfig {
seen[name] = true
result = append(result, modelCapability{
ID: name,
Capabilities: []string{},
LoadedOn: loadedByModel[name],
})
}
// Emit entries for cluster models that have no local config — these
// are the actual ghosts. Without this the operator would have no way
// to see a model the cluster is running if its config file wasn't
// synced to this frontend's filesystem.
for name, loc := range loadedByModel {
if seen[name] {
continue
}
result = append(result, modelCapability{
ID: name,
Capabilities: []string{},
LoadedOn: loc,
Source: "registry-only",
})
}

View File

@@ -11,4 +11,5 @@ const (
KeyHealthCheck int64 = 104
KeySchemaMigrate int64 = 105
KeyBackendUpgradeCheck int64 = 106
KeyStateReconciler int64 = 107
)

View File

@@ -57,6 +57,16 @@ func (g *GalleryService) SetBackendManager(b BackendManager) {
g.backendManager = b
}
// BackendManager returns the current backend manager. Callers like the
// periodic upgrade checker need this so they run CheckUpgrades through the
// distributed implementation (which asks workers) instead of the frontend's
// local filesystem — the latter is always empty in distributed deployments.
func (g *GalleryService) BackendManager() BackendManager {
g.Lock()
defer g.Unlock()
return g.backendManager
}
// SetNATSClient sets the NATS client for distributed progress publishing.
func (g *GalleryService) SetNATSClient(nc messaging.Publisher) {
g.Lock()

View File

@@ -157,6 +157,12 @@ type NodeBackendInfo struct {
IsMeta bool `json:"is_meta"`
InstalledAt string `json:"installed_at,omitempty"`
GalleryURL string `json:"gallery_url,omitempty"`
// Version, URI and Digest enable cluster-wide upgrade detection —
// without them, the frontend cannot tell whether the installed OCI
// image matches the gallery entry, and upgrades silently never surface.
Version string `json:"version,omitempty"`
URI string `json:"uri,omitempty"`
Digest string `json:"digest,omitempty"`
}
// SubjectNodeBackendStop tells a worker node to stop its gRPC backend process.

View File

@@ -10,6 +10,7 @@ import (
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
"github.com/nats-io/nats.go"
)
@@ -53,6 +54,7 @@ type DistributedBackendManager struct {
adapter *RemoteUnloaderAdapter
registry *NodeRegistry
backendGalleries []config.Gallery
systemState *system.SystemState
}
// NewDistributedBackendManager creates a DistributedBackendManager.
@@ -62,46 +64,161 @@ func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
}
}
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
// Returned as part of BackendOpResult so the frontend can surface exactly
// what happened on each worker instead of a single joined error string.
type NodeOpStatus struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"` // "success" | "queued" | "error"
Error string `json:"error,omitempty"`
}
// BackendOpResult aggregates per-node outcomes.
type BackendOpResult struct {
Nodes []NodeOpStatus `json:"nodes"`
}
// enqueueAndDrainBackendOp is the shared scaffolding for
// delete/install/upgrade. Every non-pending node gets a pending_backend_ops
// row (intent is durable even if the node is offline). Currently-healthy
// nodes get an immediate attempt; success deletes the row, failure records
// the error and leaves the row for the reconciler to retry.
//
// `apply` is the NATS round-trip for one node. Returning an error keeps the
// row in the queue and marks the per-node status as "error"; returning nil
// deletes the row and reports "success". For non-healthy nodes the status
// is "queued" — no attempt is made right now, reconciler will pick it up
// when the node returns.
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet — no intent to apply.
if node.Status == StatusPending {
continue
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
})
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
})
continue
}
applyErr := apply(node)
if applyErr == nil {
// Find the row we just upserted and delete it; cheap but requires
// a lookup since UpsertPendingBackendOp doesn't return the ID.
if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil {
xlog.Debug("Failed to clear pending backend op after success", "error", err)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "success",
})
continue
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL — mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
}
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
})
}
return result, nil
}
// findPendingRow looks up the ID of a pending_backend_ops row by its
// composite key. Used to hand off to RecordPendingBackendOpFailure /
// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same
// composite key.
func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) {
var row PendingBackendOp
if err := d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
First(&row).Error; err != nil {
return 0, err
}
return row.ID, nil
}
// deletePendingRow removes the queue row keyed by (nodeID, backend, op).
func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error {
return d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
Delete(&PendingBackendOp{}).Error
}
// DeleteBackend fans out backend deletion to every known node. The previous
// implementation silently skipped non-healthy nodes, which meant zombies
// reappeared once those nodes returned. Now the intent is durable — see
// enqueueAndDrainBackendOp — and the reconciler catches up later.
func (d *DistributedBackendManager) DeleteBackend(name string) error {
// Try local deletion but ignore "not found" errors — in distributed mode
// the frontend node typically doesn't have backends installed locally;
// they only exist on worker nodes.
// Local delete first (frontend rarely has backends installed in
// distributed mode, but the gallery operation still expects it; ignore
// "not found" which is the common case).
if err := d.local.DeleteBackend(name); err != nil {
if !errors.Is(err, gallery.ErrBackendNotFound) {
return err
}
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
}
// Fan out backend.delete to all healthy nodes
allNodes, listErr := d.registry.List(context.Background())
if listErr != nil {
xlog.Warn("Failed to list nodes for backend deletion fan-out", "error", listErr)
return listErr
}
var errs []error
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
if _, delErr := d.adapter.DeleteBackend(node.ID, name); delErr != nil {
if errors.Is(delErr, nats.ErrNoResponders) {
// Node's NATS subscription is gone — likely restarted with a new ID.
// Mark it unhealthy so future fan-outs skip it.
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to propagate backend deletion to worker", "node", node.Name, "backend", name, "error", delErr)
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, delErr))
}
}
return errors.Join(errs...)
ctx := context.Background()
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
_, err := d.adapter.DeleteBackend(node.ID, name)
return err
})
return err
}
// ListBackends aggregates installed backends from all healthy worker nodes.
// DeleteBackendDetailed is the per-node-result variant called by the HTTP
// handler so the UI can render a per-node status drawer. DeleteBackend still
// returns error-only for callers that don't care about node breakdown.
func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) {
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
_, err := d.adapter.DeleteBackend(node.ID, name)
return err
})
}
// ListBackends aggregates installed backends from all worker nodes, preserving
// per-node attribution. Each SystemBackend.Nodes entry records which node has
// the backend and the version/digest it reports. The top-level Metadata is
// populated from the first node seen so single-node-minded callers still work.
//
// Pending/offline/draining nodes are skipped because they aren't expected to
// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders
// then marks them unhealthy and the loop continues.
func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) {
result := make(gallery.SystemBackends)
allNodes, err := d.registry.List(context.Background())
@@ -110,7 +227,7 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
}
for _, node := range allNodes {
if node.Status != StatusHealthy {
if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining {
continue
}
reply, err := d.adapter.ListBackends(node.ID)
@@ -128,89 +245,92 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
continue
}
for _, b := range reply.Backends {
if _, exists := result[b.Name]; !exists {
result[b.Name] = gallery.SystemBackend{
ref := gallery.NodeBackendRef{
NodeID: node.ID,
NodeName: node.Name,
NodeStatus: node.Status,
Version: b.Version,
Digest: b.Digest,
URI: b.URI,
InstalledAt: b.InstalledAt,
}
entry, exists := result[b.Name]
if !exists {
entry = gallery.SystemBackend{
Name: b.Name,
IsSystem: b.IsSystem,
IsMeta: b.IsMeta,
Metadata: &gallery.BackendMetadata{
Name: b.Name,
InstalledAt: b.InstalledAt,
GalleryURL: b.GalleryURL,
Version: b.Version,
URI: b.URI,
Digest: b.Digest,
},
}
}
entry.Nodes = append(entry.Nodes, ref)
result[b.Name] = entry
}
}
return result, nil
}
// InstallBackend fans out backend installation to all healthy worker nodes.
// InstallBackend fans out installation through the pending-ops queue so
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
// reply.Success==false is treated as an error so the row stays for retry.
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
allNodes, err := d.registry.List(context.Background())
if err != nil {
return err
}
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON))
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to install backend on worker", "node", node.Name, "backend", backendName, "error", err)
continue
return err
}
if !reply.Success {
xlog.Warn("Backend install failed on worker", "node", node.Name, "backend", backendName, "error", reply.Error)
return fmt.Errorf("install failed: %s", reply.Error)
}
}
return nil
return nil
})
return err
}
// UpgradeBackend fans out a backend upgrade to all healthy worker nodes.
// TODO: Add dedicated NATS subject for upgrade (currently reuses install with force flag)
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
// from the gallery). Same queue semantics as Install/Delete.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
allNodes, err := d.registry.List(context.Background())
if err != nil {
return err
}
galleriesJSON, _ := json.Marshal(d.backendGalleries)
var errs []error
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
// Reuse install endpoint which will re-download the backend (force mode)
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON))
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node during upgrade, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, err))
continue
return err
}
if !reply.Success {
errs = append(errs, fmt.Errorf("node %s: %s", node.Name, reply.Error))
return fmt.Errorf("upgrade failed: %s", reply.Error)
}
}
return errors.Join(errs...)
return nil
})
return err
}
// CheckUpgrades checks for available backend upgrades.
// Gallery comparison is global (not per-node), so we delegate to the local manager.
// CheckUpgrades checks for available backend upgrades across the cluster.
//
// The previous implementation delegated to d.local, which called
// ListSystemBackends on the frontend — but in distributed mode the frontend
// has no backends installed locally, so the upgrade loop never ran and the UI
// never surfaced any upgrades. We now feed the cluster-wide aggregation
// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so
// digest-based detection actually works and cluster drift is visible.
func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) {
return d.local.CheckUpgrades(ctx)
installed, err := d.ListBackends()
if err != nil {
return nil, err
}
// systemState is used by AvailableBackends (gallery paths + meta-backend
// resolution). The `installed` argument is what the old code got wrong —
// it used to come from the empty frontend filesystem.
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
}

View File

@@ -3,26 +3,57 @@ package nodes
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/advisorylock"
grpcclient "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/xlog"
"gorm.io/gorm"
)
// ModelProber checks whether a model's backend process is still reachable.
// Defaulted to a gRPC health probe but overridable for tests so we don't
// need to stand up a real server. Returning false without an error means the
// process is reachable but unhealthy (same as a timeout for our purposes).
type ModelProber interface {
IsAlive(ctx context.Context, address string) bool
}
// grpcModelProber does a 1s HealthCheck on the model's stored gRPC address.
type grpcModelProber struct{ token string }
func (g grpcModelProber) IsAlive(ctx context.Context, address string) bool {
client := grpcclient.NewClientWithToken(address, false, nil, false, g.token)
probeCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
ok, _ := client.HealthCheck(probeCtx)
return ok
}
// ReplicaReconciler periodically ensures model replica counts match their
// scheduling configs. It scales up replicas when below MinReplicas or when
// all replicas are busy (up to MaxReplicas), and scales down idle replicas
// above MinReplicas.
//
// Alongside replica scaling it runs two state-reconciliation passes — draining
// the pending_backend_ops queue and probing loaded models' gRPC addresses to
// orphan ghosts. Both passes are wrapped in the KeyStateReconciler advisory
// lock so N frontends don't stampede.
//
// Only processes models with auto-scaling enabled (MinReplicas > 0 or MaxReplicas > 0).
type ReplicaReconciler struct {
registry *NodeRegistry
scheduler ModelScheduler // interface for scheduling new models
unloader NodeCommandSender
adapter *RemoteUnloaderAdapter // NATS sender for pending-op drain
prober ModelProber // health probe for model gRPC addrs
db *gorm.DB
interval time.Duration
scaleDownDelay time.Duration
// probeStaleAfter: only probe node_models rows older than this so we
// don't hammer every worker every tick for models we just heard from.
probeStaleAfter time.Duration
}
// ModelScheduler abstracts the scheduling logic needed by the reconciler.
@@ -35,12 +66,21 @@ type ModelScheduler interface {
// ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler.
type ReplicaReconcilerOptions struct {
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
// Adapter is the NATS sender used to retry pending backend ops. When nil,
// the state-reconciler pending-drain pass is a no-op (single-node mode).
Adapter *RemoteUnloaderAdapter
// RegistrationToken is used by the default gRPC prober when probing model
// addresses. Matches the worker's token so HealthCheck auth succeeds.
RegistrationToken string
// Prober overrides the default gRPC health probe (used by tests).
Prober ModelProber
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
ProbeStaleAfter time.Duration // default 2m
}
// NewReplicaReconciler creates a new ReplicaReconciler.
@@ -53,13 +93,24 @@ func NewReplicaReconciler(opts ReplicaReconcilerOptions) *ReplicaReconciler {
if scaleDownDelay == 0 {
scaleDownDelay = 5 * time.Minute
}
probeStaleAfter := opts.ProbeStaleAfter
if probeStaleAfter == 0 {
probeStaleAfter = 2 * time.Minute
}
prober := opts.Prober
if prober == nil {
prober = grpcModelProber{token: opts.RegistrationToken}
}
return &ReplicaReconciler{
registry: opts.Registry,
scheduler: opts.Scheduler,
unloader: opts.Unloader,
db: opts.DB,
interval: interval,
scaleDownDelay: scaleDownDelay,
registry: opts.Registry,
scheduler: opts.Scheduler,
unloader: opts.Unloader,
adapter: opts.Adapter,
prober: prober,
db: opts.DB,
interval: interval,
scaleDownDelay: scaleDownDelay,
probeStaleAfter: probeStaleAfter,
}
}
@@ -78,17 +129,122 @@ func (rc *ReplicaReconciler) Run(ctx context.Context) {
}
}
// reconcileOnce performs a single reconciliation pass.
// Uses an advisory lock so only one frontend instance reconciles at a time.
// reconcileOnce performs a single reconciliation pass. Replica work and
// state-reconciliation work run under *different* advisory locks so multiple
// frontends can share load across passes, and one long-running pass doesn't
// block the other forever if a frontend wedges.
func (rc *ReplicaReconciler) reconcileOnce(ctx context.Context) {
if rc.db != nil {
lockKey := advisorylock.KeyFromString("replica-reconciler")
_ = advisorylock.WithLockCtx(ctx, rc.db, lockKey, func() error {
replicaKey := advisorylock.KeyFromString("replica-reconciler")
_ = advisorylock.WithLockCtx(ctx, rc.db, replicaKey, func() error {
rc.reconcile(ctx)
return nil
})
// Try, don't block: if another frontend is already running the state
// pass, this tick is a no-op. Matches the health monitor pattern.
_, _ = advisorylock.TryWithLockCtx(ctx, rc.db, advisorylock.KeyStateReconciler, func() error {
rc.reconcileState(ctx)
return nil
})
} else {
rc.reconcile(ctx)
rc.reconcileState(ctx)
}
}
// reconcileState runs the state-reconciliation passes: drain pending backend
// ops for freshly-healthy nodes, then probe model gRPC addresses to orphan
// ghosts. Both passes are best-effort: a failure on one node doesn't stop
// the rest.
func (rc *ReplicaReconciler) reconcileState(ctx context.Context) {
if rc.adapter != nil {
rc.drainPendingBackendOps(ctx)
}
rc.probeLoadedModels(ctx)
}
// drainPendingBackendOps retries queued backend ops whose next_retry_at has
// passed on nodes that are currently healthy. On success the row is deleted;
// on failure attempts++ and next_retry_at moves out via exponential backoff.
func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
ops, err := rc.registry.ListDuePendingBackendOps(ctx)
if err != nil {
xlog.Warn("Reconciler: failed to list pending backend ops", "error", err)
return
}
if len(ops) == 0 {
return
}
xlog.Debug("Reconciler: draining pending backend ops", "count", len(ops))
for _, op := range ops {
if err := ctx.Err(); err != nil {
return
}
var applyErr error
switch op.Op {
case OpBackendDelete:
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
case OpBackendInstall, OpBackendUpgrade:
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries))
if err != nil {
applyErr = err
} else if !reply.Success {
applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error)
}
default:
xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID)
continue
}
if applyErr == nil {
if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil {
xlog.Warn("Reconciler: failed to delete drained op row", "id", op.ID, "error", err)
} else {
xlog.Info("Reconciler: pending backend op applied",
"op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1)
}
continue
}
_ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error())
xlog.Warn("Reconciler: pending backend op retry failed",
"op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr)
}
}
// probeLoadedModels gRPC-health-checks model addresses that the DB says are
// loaded. If a model's backend process is gone (OOM, crash, manual restart)
// we remove the row so ghosts don't linger. Only probes rows older than
// probeStaleAfter so we don't hammer every worker every tick for models we
// just heard from.
func (rc *ReplicaReconciler) probeLoadedModels(ctx context.Context) {
var stale []NodeModel
cutoff := time.Now().Add(-rc.probeStaleAfter)
err := rc.registry.db.WithContext(ctx).
Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id").
Where("node_models.state = ? AND backend_nodes.status = ? AND node_models.updated_at < ? AND node_models.address != ''",
"loaded", StatusHealthy, cutoff).
Find(&stale).Error
if err != nil {
xlog.Warn("Reconciler: failed to list loaded models for probe", "error", err)
return
}
for _, m := range stale {
if err := ctx.Err(); err != nil {
return
}
if rc.prober.IsAlive(ctx, m.Address) {
// Bump updated_at so we don't probe this row again immediately.
_ = rc.registry.db.WithContext(ctx).Model(&NodeModel{}).
Where("id = ?", m.ID).Update("updated_at", time.Now()).Error
continue
}
if err := rc.registry.RemoveNodeModel(ctx, m.NodeID, m.ModelName); err != nil {
xlog.Warn("Reconciler: failed to remove unreachable model", "node", m.NodeID, "model", m.ModelName, "error", err)
continue
}
xlog.Warn("Reconciler: model unreachable, removed from registry",
"node", m.NodeID, "model", m.ModelName, "address", m.Address)
}
}

View File

@@ -239,3 +239,138 @@ var _ = Describe("ReplicaReconciler", func() {
})
})
})
// fakeProber lets tests control whether a model's gRPC address "responds".
type fakeProber struct {
alive map[string]bool
calls int
}
func (f *fakeProber) IsAlive(_ context.Context, address string) bool {
f.calls++
if f.alive == nil {
return false
}
return f.alive[address]
}
var _ = Describe("ReplicaReconciler — state reconciliation", func() {
var (
db *gorm.DB
registry *NodeRegistry
)
BeforeEach(func() {
if runtime.GOOS == "darwin" {
Skip("testcontainers requires Docker, not available on macOS CI")
}
db = testutil.SetupTestDB()
var err error
registry, err = NewNodeRegistry(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("probeLoadedModels", func() {
It("removes loaded models whose gRPC address is unreachable", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
// Two loaded models — one stale (will probe), one fresh (skipped).
stale := &NodeModel{
ID: "stale-1",
NodeID: node.ID,
ModelName: "stale-model",
Address: "10.0.0.1:12345",
State: "loaded",
UpdatedAt: time.Now().Add(-5 * time.Minute),
}
fresh := &NodeModel{
ID: "fresh-1",
NodeID: node.ID,
ModelName: "fresh-model",
Address: "10.0.0.1:54321",
State: "loaded",
UpdatedAt: time.Now(), // within probeStaleAfter
}
Expect(db.Create(stale).Error).To(Succeed())
Expect(db.Create(fresh).Error).To(Succeed())
prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": false}}
rc := NewReplicaReconciler(ReplicaReconcilerOptions{
Registry: registry,
DB: db,
Prober: prober,
ProbeStaleAfter: 2 * time.Minute,
})
rc.probeLoadedModels(context.Background())
// Stale was unreachable — row removed.
var after []NodeModel
Expect(db.Find(&after).Error).To(Succeed())
Expect(after).To(HaveLen(1))
Expect(after[0].ModelName).To(Equal("fresh-model"))
// Prober was only called once (the fresh row was filtered out).
Expect(prober.calls).To(Equal(1))
})
It("keeps reachable models and bumps their updated_at", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
stale := &NodeModel{
ID: "stale-2",
NodeID: node.ID,
ModelName: "alive-model",
Address: "10.0.0.1:12345",
State: "loaded",
UpdatedAt: time.Now().Add(-5 * time.Minute),
}
Expect(db.Create(stale).Error).To(Succeed())
prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": true}}
rc := NewReplicaReconciler(ReplicaReconcilerOptions{
Registry: registry,
DB: db,
Prober: prober,
ProbeStaleAfter: 2 * time.Minute,
})
rc.probeLoadedModels(context.Background())
var after NodeModel
Expect(db.First(&after, "id = ?", "stale-2").Error).To(Succeed())
Expect(after.UpdatedAt).To(BeTemporally("~", time.Now(), time.Second))
})
})
Describe("UpsertPendingBackendOp + RecordPendingBackendOpFailure", func() {
It("upserts on the composite key rather than duplicating rows", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
// Second call for the same (node, backend, op) should not create a
// new row — that's how re-issuing a delete works.
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
var rows []PendingBackendOp
Expect(db.Find(&rows).Error).To(Succeed())
Expect(rows).To(HaveLen(1))
})
It("increments attempts and moves next_retry_at out on failure", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
var row PendingBackendOp
Expect(db.First(&row).Error).To(Succeed())
before := row.NextRetryAt
Expect(registry.RecordPendingBackendOpFailure(context.Background(), row.ID, "boom")).To(Succeed())
Expect(db.First(&row, row.ID).Error).To(Succeed())
Expect(row.Attempts).To(Equal(1))
Expect(row.LastError).To(Equal("boom"))
Expect(row.NextRetryAt).To(BeTemporally(">", before))
})
})
})

View File

@@ -104,6 +104,36 @@ type NodeWithExtras struct {
Labels map[string]string `json:"labels,omitempty"`
}
// PendingBackendOp is a durable intent for a backend lifecycle operation
// (delete/install/upgrade) that needs to eventually apply on a specific node.
//
// Without this table, a backend delete against an offline node silently
// dropped: the frontend skipped the node, the node came back later with the
// backend still installed, and the operator saw a zombie. Now the intent is
// recorded regardless of node status; the state reconciler drains the queue
// whenever a node is healthy and removes the row on success. Reissuing the
// same operation while a row exists updates NextRetryAt instead of stacking
// duplicates (see the unique index).
type PendingBackendOp struct {
ID uint `gorm:"primaryKey;autoIncrement" json:"id"`
NodeID string `gorm:"index;size:36;not null;uniqueIndex:idx_pending_backend_op,priority:1" json:"node_id"`
Backend string `gorm:"index;size:255;not null;uniqueIndex:idx_pending_backend_op,priority:2" json:"backend"`
Op string `gorm:"size:16;not null;uniqueIndex:idx_pending_backend_op,priority:3" json:"op"`
Galleries []byte `gorm:"type:bytea" json:"-"` // serialized JSON for install/upgrade retries
Attempts int `gorm:"default:0" json:"attempts"`
LastError string `gorm:"type:text" json:"last_error,omitempty"`
CreatedAt time.Time `json:"created_at"`
NextRetryAt time.Time `gorm:"index" json:"next_retry_at"`
}
// Op constants mirror the operation names used by DistributedBackendManager
// so callers don't repeat stringly-typed values.
const (
OpBackendDelete = "delete"
OpBackendInstall = "install"
OpBackendUpgrade = "upgrade"
)
// NodeRegistry manages backend node registration and lookup in PostgreSQL.
type NodeRegistry struct {
db *gorm.DB
@@ -114,7 +144,7 @@ type NodeRegistry struct {
// when multiple instances (frontend + workers) start at the same time.
func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) {
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{})
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{})
}); err != nil {
return nil, fmt.Errorf("migrating node tables: %w", err)
}
@@ -946,3 +976,114 @@ func (r *NodeRegistry) ApplyAutoLabels(ctx context.Context, nodeID string, node
_ = r.SetNodeLabel(ctx, nodeID, "node.name", node.Name)
}
}
// UpsertPendingBackendOp records or refreshes a pending backend operation for
// a node. If a row already exists for (nodeID, backend, op) we keep its
// Attempts/LastError but reset NextRetryAt to now, so reissuing the same
// delete/upgrade nudges it to the front of the queue instead of stacking a
// duplicate intent.
func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backend, op string, galleries []byte) error {
row := PendingBackendOp{
NodeID: nodeID,
Backend: backend,
Op: op,
Galleries: galleries,
NextRetryAt: time.Now(),
}
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
}).Create(&row).Error
}
// ListDuePendingBackendOps returns queued ops whose NextRetryAt has passed
// AND whose node is currently healthy. The reconciler drains this list; we
// filter by node status in the query so a tick doesn't hammer NATS for
// nodes that obviously can't answer.
func (r *NodeRegistry) ListDuePendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
var ops []PendingBackendOp
err := r.db.WithContext(ctx).
Joins("JOIN backend_nodes ON backend_nodes.id = pending_backend_ops.node_id").
Where("pending_backend_ops.next_retry_at <= ? AND backend_nodes.status = ?", time.Now(), StatusHealthy).
Order("pending_backend_ops.next_retry_at ASC").
Find(&ops).Error
if err != nil {
return nil, fmt.Errorf("listing due pending backend ops: %w", err)
}
return ops, nil
}
// ListPendingBackendOps returns every queued row (for the UI "pending on N
// nodes" chip and the pre-delete ConfirmDialog).
func (r *NodeRegistry) ListPendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
var ops []PendingBackendOp
if err := r.db.WithContext(ctx).Order("backend ASC, created_at ASC").Find(&ops).Error; err != nil {
return nil, fmt.Errorf("listing pending backend ops: %w", err)
}
return ops, nil
}
// DeletePendingBackendOp removes a queue row — called after the op succeeds.
func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) error {
if err := r.db.WithContext(ctx).Delete(&PendingBackendOp{}, id).Error; err != nil {
return fmt.Errorf("deleting pending backend op %d: %w", id, err)
}
return nil
}
// RecordPendingBackendOpFailure bumps Attempts, captures the error, and
// pushes NextRetryAt out with exponential backoff capped at 15 minutes.
func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error {
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var row PendingBackendOp
if err := tx.First(&row, id).Error; err != nil {
return err
}
row.Attempts++
row.LastError = errMsg
row.NextRetryAt = time.Now().Add(backoffForAttempt(row.Attempts))
return tx.Save(&row).Error
})
}
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
// reconciler tick is 30s so anything shorter would just re-fire immediately.
func backoffForAttempt(attempts int) time.Duration {
const cap = 15 * time.Minute
base := 30 * time.Second
shift := attempts - 1
if shift < 0 {
shift = 0
}
if shift > 10 { // 2^10 * 30s already exceeds the cap
shift = 10
}
d := base << shift
if d > cap {
return cap
}
return d
}
// CountPendingBackendOpsByBackend returns a map of backend name to the count
// of pending rows. Used to decorate Manage → Backends with a "pending on N
// nodes" chip without exposing the full queue.
func (r *NodeRegistry) CountPendingBackendOpsByBackend(ctx context.Context) (map[string]int, error) {
type row struct {
Backend string
Count int
}
var rows []row
err := r.db.WithContext(ctx).Model(&PendingBackendOp{}).
Select("backend, COUNT(*) as count").
Group("backend").
Scan(&rows).Error
if err != nil {
return nil, fmt.Errorf("counting pending backend ops: %w", err)
}
out := make(map[string]int, len(rows))
for _, r := range rows {
out[r.Backend] = r.Count
}
return out, nil
}