diff --git a/core/application/distributed.go b/core/application/distributed.go index 31e87fdab..26d56d121 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut DB: authDB, }) - // Create ReplicaReconciler for auto-scaling model replicas + // Create ReplicaReconciler for auto-scaling model replicas. Adapter + + // RegistrationToken feed the state-reconciliation passes: pending op + // drain uses the adapter, and model health probes use the token to auth + // against workers' gRPC HealthCheck. reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{ - Registry: registry, - Scheduler: router, - Unloader: remoteUnloader, - DB: authDB, - Interval: 30 * time.Second, - ScaleDownDelay: 5 * time.Minute, + Registry: registry, + Scheduler: router, + Unloader: remoteUnloader, + Adapter: remoteUnloader, + RegistrationToken: cfg.Distributed.RegistrationToken, + DB: authDB, + Interval: 30 * time.Second, + ScaleDownDelay: 5 * time.Minute, + ProbeStaleAfter: 2 * time.Minute, }) // Create ModelRouterAdapter to wire into ModelLoader diff --git a/core/application/startup.go b/core/application/startup.go index a03f17bd2..241ea8b22 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -235,7 +235,12 @@ func New(opts ...config.AppOption) (*Application, error) { // In distributed mode, uses PostgreSQL advisory lock so only one frontend // instance runs periodic checks (avoids duplicate upgrades across replicas). if len(options.BackendGalleries) > 0 { - uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB()) + // Pass a lazy getter for the backend manager so the checker always + // uses the active one — DistributedBackendManager is swapped in above + // and asks workers for their installed backends, which is what + // upgrade detection needs in distributed mode. + bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() } + uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn) application.upgradeChecker = uc go uc.Run(options.Context) } diff --git a/core/application/upgrade_checker.go b/core/application/upgrade_checker.go index 94fb3f6c7..3b2d94544 100644 --- a/core/application/upgrade_checker.go +++ b/core/application/upgrade_checker.go @@ -8,6 +8,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" "github.com/mudler/LocalAI/core/services/advisorylock" + "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/xlog" @@ -26,6 +27,12 @@ type UpgradeChecker struct { galleries []config.Gallery systemState *system.SystemState db *gorm.DB // non-nil in distributed mode + // backendManagerFn lazily returns the current backend manager (may be + // swapped from Local to Distributed after startup). Pulled through each + // check so the UpgradeChecker uses whichever is active. In distributed + // mode this ensures CheckUpgrades asks workers instead of the (empty) + // frontend filesystem — fixing the bug where upgrades never surfaced. + backendManagerFn func() galleryop.BackendManager checkInterval time.Duration stop chan struct{} @@ -40,18 +47,22 @@ type UpgradeChecker struct { // NewUpgradeChecker creates a new UpgradeChecker service. // Pass db=nil for standalone mode, or a *gorm.DB for distributed mode // (uses advisory locks so only one instance runs periodic checks). -func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker { +// backendManagerFn is optional; when set, CheckUpgrades is routed through +// the active backend manager — required in distributed mode so the check +// aggregates from workers rather than the empty frontend filesystem. +func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB, backendManagerFn func() galleryop.BackendManager) *UpgradeChecker { return &UpgradeChecker{ - appConfig: appConfig, - modelLoader: ml, - galleries: appConfig.BackendGalleries, - systemState: appConfig.SystemState, - db: db, - checkInterval: 6 * time.Hour, - stop: make(chan struct{}), - done: make(chan struct{}), - triggerCh: make(chan struct{}, 1), - lastUpgrades: make(map[string]gallery.UpgradeInfo), + appConfig: appConfig, + modelLoader: ml, + galleries: appConfig.BackendGalleries, + systemState: appConfig.SystemState, + db: db, + backendManagerFn: backendManagerFn, + checkInterval: 6 * time.Hour, + stop: make(chan struct{}), + done: make(chan struct{}), + triggerCh: make(chan struct{}, 1), + lastUpgrades: make(map[string]gallery.UpgradeInfo), } } @@ -64,13 +75,16 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade func (uc *UpgradeChecker) Run(ctx context.Context) { defer close(uc.done) - // Initial delay: don't slow down startup + // Initial delay: don't slow down startup. Short enough that operators + // don't stare at an empty upgrade banner for long; long enough that + // workers have registered and reported their installed backends. + initialDelay := 10 * time.Second select { case <-ctx.Done(): return case <-uc.stop: return - case <-time.After(30 * time.Second): + case <-time.After(initialDelay): } // First check always runs locally (to warm the cache on this instance) @@ -144,7 +158,18 @@ func (uc *UpgradeChecker) GetAvailableUpgrades() map[string]gallery.UpgradeInfo } func (uc *UpgradeChecker) runCheck(ctx context.Context) { - upgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState) + var ( + upgrades map[string]gallery.UpgradeInfo + err error + ) + if uc.backendManagerFn != nil { + if bm := uc.backendManagerFn(); bm != nil { + upgrades, err = bm.CheckUpgrades(ctx) + } + } + if upgrades == nil && err == nil { + upgrades, err = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState) + } uc.mu.Lock() uc.lastCheckTime = time.Now() diff --git a/core/cli/worker.go b/core/cli/worker.go index affde4b08..186fe298e 100644 --- a/core/cli/worker.go +++ b/core/cli/worker.go @@ -738,6 +738,9 @@ func (s *backendSupervisor) subscribeLifecycleEvents() { if b.Metadata != nil { info.InstalledAt = b.Metadata.InstalledAt info.GalleryURL = b.Metadata.GalleryURL + info.Version = b.Metadata.Version + info.URI = b.Metadata.URI + info.Digest = b.Metadata.Digest } infos = append(infos, info) } diff --git a/core/gallery/backends.go b/core/gallery/backends.go index c2622c272..ee3ca906d 100644 --- a/core/gallery/backends.go +++ b/core/gallery/backends.go @@ -394,6 +394,23 @@ type SystemBackend struct { Metadata *BackendMetadata UpgradeAvailable bool `json:"upgrade_available,omitempty"` AvailableVersion string `json:"available_version,omitempty"` + // Nodes holds per-node attribution in distributed mode. Empty in single-node. + // Each entry describes a node that has this backend installed, with the + // version/digest it reports. Lets the UI surface drift and per-node status. + Nodes []NodeBackendRef `json:"nodes,omitempty"` +} + +// NodeBackendRef describes one node's view of an installed backend. Used both +// for per-node attribution in the UI and for drift detection during upgrade +// checks (a cluster with mismatched versions/digests is flagged upgradeable). +type NodeBackendRef struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + NodeStatus string `json:"node_status"` // healthy | unhealthy | offline | draining | pending + Version string `json:"version,omitempty"` + Digest string `json:"digest,omitempty"` + URI string `json:"uri,omitempty"` + InstalledAt string `json:"installed_at,omitempty"` } type SystemBackends map[string]SystemBackend diff --git a/core/gallery/upgrade.go b/core/gallery/upgrade.go index dde33300f..d0671617e 100644 --- a/core/gallery/upgrade.go +++ b/core/gallery/upgrade.go @@ -23,22 +23,45 @@ type UpgradeInfo struct { AvailableVersion string `json:"available_version"` InstalledDigest string `json:"installed_digest,omitempty"` AvailableDigest string `json:"available_digest,omitempty"` + // NodeDrift lists nodes whose installed version or digest differs from + // the cluster majority. Non-empty means the cluster has diverged and an + // upgrade will realign it. Empty in single-node mode. + NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"` } -// CheckBackendUpgrades compares installed backends against gallery entries -// and returns a map of backend names to UpgradeInfo for those that have -// newer versions or different OCI digests available. +// NodeDriftInfo describes one node that disagrees with the cluster majority +// on which version/digest of a backend is installed. +type NodeDriftInfo struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + Version string `json:"version,omitempty"` + Digest string `json:"digest,omitempty"` +} + +// CheckBackendUpgrades is the single-node entrypoint. Distributed callers use +// CheckUpgradesAgainst directly with their aggregated SystemBackends. func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState) (map[string]UpgradeInfo, error) { + installed, err := ListSystemBackends(systemState) + if err != nil { + return nil, fmt.Errorf("failed to list installed backends: %w", err) + } + return CheckUpgradesAgainst(ctx, galleries, systemState, installed) +} + +// CheckUpgradesAgainst compares a caller-supplied SystemBackends set against +// the gallery. Fixes the distributed-mode bug where the old code passed the +// frontend's (empty) local filesystem through ListSystemBackends and so never +// surfaced any upgrades. +// +// Cluster drift policy: if a backend's per-node versions/digests disagree, the +// row is flagged upgradeable regardless of whether any node matches the gallery +// — next Upgrade All realigns the cluster. NodeDrift lists the outliers. +func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) { galleryBackends, err := AvailableBackends(galleries, systemState) if err != nil { return nil, fmt.Errorf("failed to list available backends: %w", err) } - installedBackends, err := ListSystemBackends(systemState) - if err != nil { - return nil, fmt.Errorf("failed to list installed backends: %w", err) - } - result := make(map[string]UpgradeInfo) for _, installed := range installedBackends { @@ -57,34 +80,48 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste } installedVersion := installed.Metadata.Version + installedDigest := installed.Metadata.Digest galleryVersion := galleryEntry.Version - // If both sides have versions, compare them + // Detect cluster drift: does every node report the same version+digest? + // In single-node mode this stays empty (Nodes is nil). + majority, drift := summarizeNodeDrift(installed.Nodes) + if majority.version != "" { + installedVersion = majority.version + } + if majority.digest != "" { + installedDigest = majority.digest + } + + makeInfo := func(info UpgradeInfo) UpgradeInfo { + info.NodeDrift = drift + return info + } + + // If versions are available on both sides, they're the source of truth. if galleryVersion != "" && installedVersion != "" { - if galleryVersion != installedVersion { - result[installed.Metadata.Name] = UpgradeInfo{ + if galleryVersion != installedVersion || len(drift) > 0 { + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ BackendName: installed.Metadata.Name, InstalledVersion: installedVersion, AvailableVersion: galleryVersion, - } + }) } - // Versions match — no upgrade needed continue } - // Gallery has a version but installed doesn't — this happens for backends - // installed before version tracking was added. Flag as upgradeable so - // users can re-install to pick up version metadata. + // Gallery has a version but installed doesn't — backends installed before + // version tracking was added. Flag as upgradeable to pick up metadata. if galleryVersion != "" && installedVersion == "" { - result[installed.Metadata.Name] = UpgradeInfo{ + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ BackendName: installed.Metadata.Name, InstalledVersion: "", AvailableVersion: galleryVersion, - } + }) continue } - // Fall back to OCI digest comparison when versions are unavailable + // Fall back to OCI digest comparison when versions are unavailable. if downloader.URI(galleryEntry.URI).LooksLikeOCI() { remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil) if err != nil { @@ -92,21 +129,68 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste continue } // If we have a stored digest, compare; otherwise any remote digest - // means we can't confirm we're up to date — flag as upgradeable - if installed.Metadata.Digest == "" || remoteDigest != installed.Metadata.Digest { - result[installed.Metadata.Name] = UpgradeInfo{ + // means we can't confirm we're up to date — flag as upgradeable. + if installedDigest == "" || remoteDigest != installedDigest || len(drift) > 0 { + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ BackendName: installed.Metadata.Name, - InstalledDigest: installed.Metadata.Digest, + InstalledDigest: installedDigest, AvailableDigest: remoteDigest, - } + }) } + } else if len(drift) > 0 { + // No version/digest path but nodes disagree — still worth flagging. + result[installed.Metadata.Name] = makeInfo(UpgradeInfo{ + BackendName: installed.Metadata.Name, + InstalledVersion: installedVersion, + InstalledDigest: installedDigest, + }) } - // No version info and non-OCI URI — cannot determine, skip } return result, nil } +// summarizeNodeDrift collapses per-node version/digest tuples to a majority +// pair and returns the outliers. In single-node mode (empty nodes slice) this +// returns zero values and a nil drift list. +func summarizeNodeDrift(nodes []NodeBackendRef) (majority struct{ version, digest string }, drift []NodeDriftInfo) { + if len(nodes) == 0 { + return majority, nil + } + + type key struct{ version, digest string } + counts := map[key]int{} + var topKey key + var topCount int + for _, n := range nodes { + k := key{n.Version, n.Digest} + counts[k]++ + if counts[k] > topCount { + topCount = counts[k] + topKey = k + } + } + + majority.version = topKey.version + majority.digest = topKey.digest + + if len(counts) == 1 { + return majority, nil // unanimous — no drift + } + for _, n := range nodes { + if n.Version == majority.version && n.Digest == majority.digest { + continue + } + drift = append(drift, NodeDriftInfo{ + NodeID: n.NodeID, + NodeName: n.NodeName, + Version: n.Version, + Digest: n.Digest, + }) + } + return majority, drift +} + // UpgradeBackend upgrades a single backend to the latest gallery version using // an atomic swap with backup-based rollback on failure. func UpgradeBackend(ctx context.Context, systemState *system.SystemState, modelLoader *model.ModelLoader, galleries []config.Gallery, backendName string, downloadStatus func(string, string, string, float64)) error { diff --git a/core/gallery/upgrade_test.go b/core/gallery/upgrade_test.go index f65b4276b..6fd386b2e 100644 --- a/core/gallery/upgrade_test.go +++ b/core/gallery/upgrade_test.go @@ -144,6 +144,97 @@ var _ = Describe("Upgrade Detection and Execution", func() { }) }) + // CheckUpgradesAgainst is the entry point used by DistributedBackendManager. + // It takes installed backends directly — typically aggregated from workers — + // instead of reading the frontend filesystem. These tests exercise drift + // detection, which is the feature the distributed path relies on. + Describe("CheckUpgradesAgainst (distributed)", func() { + It("flags upgrade when cluster nodes disagree on version, even if gallery matches majority", func() { + writeGalleryYAML([]GalleryBackend{ + { + Metadata: Metadata{Name: "my-backend"}, + URI: filepath.Join(tempDir, "some-source"), + Version: "2.0.0", + }, + }) + + installed := SystemBackends{ + "my-backend": SystemBackend{ + Name: "my-backend", + Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"}, + Nodes: []NodeBackendRef{ + {NodeID: "a", NodeName: "worker-1", Version: "2.0.0"}, + {NodeID: "b", NodeName: "worker-2", Version: "2.0.0"}, + {NodeID: "c", NodeName: "worker-3", Version: "1.0.0"}, // drift + }, + }, + } + + upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed) + Expect(err).NotTo(HaveOccurred()) + Expect(upgrades).To(HaveKey("my-backend")) + info := upgrades["my-backend"] + Expect(info.AvailableVersion).To(Equal("2.0.0")) + Expect(info.NodeDrift).To(HaveLen(1)) + Expect(info.NodeDrift[0].NodeName).To(Equal("worker-3")) + Expect(info.NodeDrift[0].Version).To(Equal("1.0.0")) + }) + + It("does not flag upgrade when all nodes agree and match gallery", func() { + writeGalleryYAML([]GalleryBackend{ + { + Metadata: Metadata{Name: "my-backend"}, + URI: filepath.Join(tempDir, "some-source"), + Version: "2.0.0", + }, + }) + + installed := SystemBackends{ + "my-backend": SystemBackend{ + Name: "my-backend", + Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"}, + Nodes: []NodeBackendRef{ + {NodeID: "a", NodeName: "worker-1", Version: "2.0.0"}, + {NodeID: "b", NodeName: "worker-2", Version: "2.0.0"}, + }, + }, + } + + upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed) + Expect(err).NotTo(HaveOccurred()) + Expect(upgrades).To(BeEmpty()) + }) + + It("surfaces empty-installed-version path the old distributed code silently missed", func() { + // Simulates the real-world bug: worker has a backend, its version + // is empty (pre-tracking or OCI-pinned-to-latest), gallery has a + // version. Pre-fix CheckUpgrades returned nothing; now it surfaces. + writeGalleryYAML([]GalleryBackend{ + { + Metadata: Metadata{Name: "my-backend"}, + URI: filepath.Join(tempDir, "some-source"), + Version: "2.0.0", + }, + }) + + installed := SystemBackends{ + "my-backend": SystemBackend{ + Name: "my-backend", + Metadata: &BackendMetadata{Name: "my-backend"}, + Nodes: []NodeBackendRef{ + {NodeID: "a", NodeName: "worker-1"}, + }, + }, + } + + upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed) + Expect(err).NotTo(HaveOccurred()) + Expect(upgrades).To(HaveKey("my-backend")) + Expect(upgrades["my-backend"].InstalledVersion).To(BeEmpty()) + Expect(upgrades["my-backend"].AvailableVersion).To(Equal("2.0.0")) + }) + }) + Describe("UpgradeBackend", func() { It("should replace backend directory and update metadata", func() { // Install v1 diff --git a/core/http/react-ui/src/App.css b/core/http/react-ui/src/App.css index c9e945d16..03c448243 100644 --- a/core/http/react-ui/src/App.css +++ b/core/http/react-ui/src/App.css @@ -1529,6 +1529,401 @@ select.input { background: var(--color-warning-light); color: var(--color-warning); } +.badge-accent { + background: var(--color-accent-light); + color: var(--color-accent); +} + +/* Horizontal row of badges used inside table cells — consistent spacing so + cells line up regardless of how many badges are present. */ +.badge-row { + display: inline-flex; + flex-wrap: wrap; + gap: 4px; + align-items: center; +} + +/* Vertically stacked cell content (e.g. version + update chip + drift chip). + Keeps rows readable at scale without inline style={{...}} everywhere. */ +.cell-stack { + display: flex; + flex-direction: column; + gap: 4px; + align-items: flex-start; +} + +.cell-mono { + font-family: 'JetBrains Mono', ui-monospace, monospace; + font-size: var(--text-xs); + color: var(--color-text-primary); +} + +.cell-muted { + color: var(--color-text-muted); + font-size: var(--text-xs); +} + +.cell-subtle { + color: var(--color-text-muted); + font-size: var(--text-xs); + font-weight: 400; + margin-left: 8px; +} + +.cell-name { + display: inline-flex; + align-items: center; + gap: var(--spacing-xs); + font-weight: 500; +} +.cell-name > i { + color: var(--color-accent); + font-size: var(--text-xs); +} + +.row-actions { + display: flex; + gap: var(--spacing-xs); + justify-content: flex-end; + align-items: center; +} + +/* Softer delete button for dense tables — the destructive confirm dialog + already owns the "are you sure" affordance, so the button itself doesn't + need to scream. Keeps the delete red readable without dominating rows. */ +.btn.btn-danger-ghost { + background: transparent; + color: var(--color-error); + border-color: transparent; +} +.btn.btn-danger-ghost:hover:not(:disabled) { + background: var(--color-error-light); + color: var(--color-error); + border-color: var(--color-error-light); +} + +/* Small count pill used inside tabs ("(3) ↑ 2") so update counts are + glanceable without extra rows of UI. */ +.tab-pill { + display: inline-flex; + align-items: center; + gap: 3px; + margin-left: 6px; + padding: 1px 6px; + border-radius: var(--radius-full); + font-size: var(--text-xs); + font-weight: 600; + line-height: 1.4; +} +.tab-pill--warning { + background: var(--color-warning-light); + color: var(--color-warning); +} + +/* Stat cards — uniform-height cluster metrics for the Nodes dashboard. + Left accent bar ties the color to the metric's semantic (success/warning/ + error/primary), icon chip sits top-right, value is left-aligned and + prominent so you can scan a row of cards without reading labels. */ +.stat-grid { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(180px, 1fr)); + gap: var(--spacing-md); + margin-bottom: var(--spacing-xl); +} + +.stat-card { + position: relative; + display: flex; + align-items: center; + justify-content: space-between; + gap: var(--spacing-sm); + padding: var(--spacing-md); + min-height: 96px; + background: var(--color-bg-raised, var(--color-bg-secondary)); + border: 1px solid var(--color-border-subtle); + border-radius: var(--radius-lg); + transition: transform var(--duration-fast) var(--ease-default), + box-shadow var(--duration-fast) var(--ease-default), + border-color var(--duration-fast) var(--ease-default); + overflow: hidden; +} +.stat-card::before { + content: ''; + position: absolute; + left: 0; top: 0; bottom: 0; + width: 3px; + background: var(--stat-accent, var(--color-border-subtle)); + transition: background var(--duration-fast) var(--ease-default); +} +.stat-card:hover { + transform: translateY(-1px); + box-shadow: var(--shadow-sm); + border-color: var(--color-border); +} + +.stat-card__body { + display: flex; + flex-direction: column; + gap: 6px; + min-width: 0; +} +.stat-card__label { + font-size: var(--text-xs); + font-weight: 600; + letter-spacing: 0.08em; + text-transform: uppercase; + color: var(--color-text-muted); + white-space: normal; + line-height: 1.2; +} +.stat-card__value { + font-size: var(--text-2xl); + font-weight: 600; + font-family: 'JetBrains Mono', ui-monospace, monospace; + line-height: 1; + color: var(--color-text-primary); + word-break: break-word; +} +.stat-card__icon { + display: inline-flex; + align-items: center; + justify-content: center; + width: 36px; + height: 36px; + border-radius: var(--radius-md); + background: color-mix(in srgb, var(--stat-accent, var(--color-text-muted)) 12%, transparent); + color: var(--stat-accent, var(--color-text-muted)); + font-size: var(--text-lg); + flex-shrink: 0; +} + +/* Subtle "Register a new worker" trigger replacing the broken-text chevron + link. Still opens the same hint card — just reads like a button now. */ +.nodes-add-worker { + display: inline-flex; + align-items: center; + gap: var(--spacing-xs); + padding: var(--spacing-xs) var(--spacing-sm); + background: transparent; + border: 1px dashed var(--color-border); + border-radius: var(--radius-md); + color: var(--color-text-secondary); + font-size: var(--text-sm); + font-family: inherit; + font-weight: 500; + cursor: pointer; + margin-bottom: var(--spacing-md); + transition: background var(--duration-fast) var(--ease-default), + border-color var(--duration-fast) var(--ease-default), + color var(--duration-fast) var(--ease-default); +} +.nodes-add-worker:hover { + background: var(--color-bg-raised, var(--color-bg-secondary)); + border-color: var(--color-border-strong); + color: var(--color-text-primary); +} + +/* Shared FilterBar layout — search strip + chip row + toggle strip. Lives + outside the .filter-bar chip row so the padding and wrapping behavior is + consistent between the Backends gallery and the System tabs. */ +.filter-bar-group { + display: flex; + flex-direction: column; + gap: var(--spacing-sm); + margin-bottom: var(--spacing-md); +} +.filter-bar-group__search { + min-width: 200px; + flex: 1; +} +.filter-bar-group__row { + display: flex; + gap: var(--spacing-md); + align-items: center; + flex-wrap: wrap; +} +.filter-bar-group__right { + display: flex; + gap: var(--spacing-md); + align-items: center; + flex-wrap: wrap; + padding-left: var(--spacing-md); + border-left: 1px solid var(--color-border-subtle); +} +.filter-bar-group__toggle { + display: flex; + align-items: center; + gap: var(--spacing-xs); + font-size: var(--text-xs); + color: var(--color-text-secondary); + cursor: pointer; + user-select: none; + white-space: nowrap; +} +.filter-btn__count { + display: inline-flex; + align-items: center; + justify-content: center; + margin-left: 6px; + min-width: 18px; + padding: 0 5px; + background: color-mix(in srgb, currentColor 18%, transparent); + border-radius: var(--radius-full); + font-size: 0.625rem; + font-weight: 600; +} + +/* Popover — floating surface anchored to a trigger element. Uses the .card + base so theming is free, adds z-index + fixed-position + scroll cap so it + behaves on tables with many rows. Kept deliberately unstyled beyond that + — content is expected to provide its own header/body structure. */ +.popover { + position: fixed; + z-index: 200; + min-width: 260px; + max-width: min(420px, 95vw); + max-height: min(420px, 70vh); + display: flex; + flex-direction: column; + padding: 0; /* sections provide their own padding */ + overflow: hidden; + box-shadow: var(--shadow-lg); + animation: popoverIn var(--duration-fast) var(--ease-default); +} + +@keyframes popoverIn { + from { opacity: 0; transform: translateY(-4px) scale(0.98); } + to { opacity: 1; transform: translateY(0) scale(1); } +} + +.popover__header { + display: flex; + align-items: center; + gap: var(--spacing-sm); + padding: var(--spacing-sm) var(--spacing-md); + border-bottom: 1px solid var(--color-border-subtle); + font-size: var(--text-sm); +} + +.popover__scroll { + overflow: auto; + padding: 0; +} + +.popover__table { + margin: 0; + width: 100%; +} +.popover__table th { + position: sticky; + top: 0; + background: var(--color-bg-raised, var(--color-bg-secondary)); + z-index: 1; +} + +/* Inline-table chip trigger — looks like a badge but is a button (cursor, + focus ring inherited from global :focus-visible). */ +.chip-trigger { + border: none; + cursor: pointer; + font-family: inherit; +} +.chip-trigger:hover { + filter: brightness(1.08); +} + +/* Truncate + ellipsize a long cell (e.g. OCI digest) without breaking the + table layout. Tooltip preserves the full value. */ +.cell-truncate { + max-width: 160px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +/* Compact empty-state used inside expanded drawer sections (e.g. "No + models loaded on this node"). Dimmer than the page-level .empty-state + because it lives inside another container and shouldn't compete with + the row's primary content. */ +.drawer-empty { + display: flex; + align-items: center; + gap: var(--spacing-sm); + padding: var(--spacing-sm) var(--spacing-md); + background: var(--color-bg-tertiary); + border: 1px dashed var(--color-border-subtle); + border-radius: var(--radius-md); + color: var(--color-text-muted); + font-size: var(--text-sm); +} +.drawer-empty > i { + font-size: var(--text-sm); + color: var(--color-text-muted); + opacity: 0.8; +} + +/* Node-status indicator — replaces the tiny bullet with a proper LED-style + dot next to a bold status label. Colors are applied inline from statusConfig + so one primitive handles healthy/unhealthy/draining/pending in one shape. */ +.node-status { + display: inline-flex; + align-items: center; + gap: 8px; + font-size: var(--text-sm); + font-weight: 600; +} +.node-status__dot { + width: 8px; + height: 8px; + border-radius: 50%; + box-shadow: 0 0 0 3px color-mix(in srgb, currentColor 15%, transparent); + flex-shrink: 0; +} + +/* Row-chevron cell — small 20px toggle used in table rows that expand. + The row itself is still clickable; the chevron provides the visible + affordance users were missing. */ +.row-chevron { + display: inline-flex; + align-items: center; + justify-content: center; + width: 20px; + height: 20px; + font-size: var(--text-xs); + color: var(--color-text-muted); + transition: transform var(--duration-fast) var(--ease-default); +} +.row-chevron.is-expanded { + transform: rotate(90deg); + color: var(--color-text-primary); +} + +/* Upgrade banner — the yellow strip operators see when updates are available. + Mirrors the gallery so both pages speak the same visual language. */ +.upgrade-banner { + display: flex; + align-items: center; + justify-content: space-between; + gap: var(--spacing-md); + padding: var(--spacing-sm) var(--spacing-md); + margin-bottom: var(--spacing-md); + background: var(--color-warning-light); + border: 1px solid var(--color-warning); + border-radius: var(--radius-md); + color: var(--color-warning); +} +.upgrade-banner__text { + display: inline-flex; + align-items: center; + gap: var(--spacing-sm); + font-weight: 500; + font-size: var(--text-sm); +} +.upgrade-banner__actions { + display: inline-flex; + gap: var(--spacing-xs); + align-items: center; +} /* Tabs */ .tabs { diff --git a/core/http/react-ui/src/components/FilterBar.jsx b/core/http/react-ui/src/components/FilterBar.jsx new file mode 100644 index 000000000..95f7b2135 --- /dev/null +++ b/core/http/react-ui/src/components/FilterBar.jsx @@ -0,0 +1,87 @@ +import Toggle from './Toggle' + +// FilterBar is the shared search + chip filter + toggles control strip that +// the Backends gallery pioneered. Pulled into its own component so the System +// page's two tabs stop looking like a different app — matching visual +// grammar + matching keyboard behavior. +// +// Props: +// search: controlled value for the search input. +// onSearchChange: (value) => void; null disables the search input entirely. +// searchPlaceholder: placeholder for the search input. +// filters: [{ key, label, icon }]; activeFilter is compared by key. +// Omit to hide the chip row. +// activeFilter: currently-selected filter key (use '' for "all" if +// that's the first entry in `filters`). +// onFilterChange: (key) => void. +// toggles: [{ key, label, icon?, checked, onChange }]; optional +// right-side toggle group (e.g. "Show all", "Development"). +// rightSlot: arbitrary element rendered after the toggles — use for +// sort controls or extra buttons. +export default function FilterBar({ + search, + onSearchChange, + searchPlaceholder = 'Search...', + filters, + activeFilter, + onFilterChange, + toggles, + rightSlot, +}) { + const hasFilters = Array.isArray(filters) && filters.length > 0 + const hasToggles = Array.isArray(toggles) && toggles.length > 0 + + return ( +
+ {onSearchChange && ( +
+ + onSearchChange(e.target.value)} + aria-label={searchPlaceholder} + /> +
+ )} + + {(hasFilters || hasToggles || rightSlot) && ( +
+ {hasFilters && ( +
+ {filters.map(f => ( + + ))} +
+ )} + + {(hasToggles || rightSlot) && ( +
+ {hasToggles && toggles.map(t => ( + + ))} + {rightSlot} +
+ )} +
+ )} +
+ ) +} diff --git a/core/http/react-ui/src/components/NodeDistributionChip.jsx b/core/http/react-ui/src/components/NodeDistributionChip.jsx new file mode 100644 index 000000000..952d8be0e --- /dev/null +++ b/core/http/react-ui/src/components/NodeDistributionChip.jsx @@ -0,0 +1,168 @@ +import { useRef, useState } from 'react' +import Popover from './Popover' + +// NodeDistributionChip shows where something is installed/loaded across a +// cluster. Used by both Manage → Backends (per-row Nodes column, data = +// gallery NodeBackendRef with version/digest) and by the Models tab (data = +// LoadedOn with state/status). Supports arbitrary cluster size — small +// clusters render node-name chips inline, larger clusters collapse to a +// summary chip and reveal the full per-node table in a popover on click. +// +// Field names are intentionally forgiving: both {node_name, node_status} and +// {NodeName, NodeStatus} are supported so the component works whether it's +// reading directly off the JSON or off a hydrated class. +// +// Props: +// nodes: array of node refs (see shape below). +// compactThreshold: max nodes to render inline before collapsing (default 3). +// context: 'backends' (default) shows version/digest; 'models' +// shows state. +// emptyLabel: what to render when nodes is empty (default "—"). +export default function NodeDistributionChip({ + nodes, + compactThreshold = 3, + context = 'backends', + emptyLabel = '—', +}) { + const triggerRef = useRef(null) + const [open, setOpen] = useState(false) + + const list = Array.isArray(nodes) ? nodes : [] + if (list.length === 0) { + return {emptyLabel} + } + + const getName = n => n.node_name ?? n.NodeName ?? '' + const getStatus = n => n.node_status ?? n.NodeStatus ?? '' + const getState = n => n.state ?? n.State ?? '' + const getVersion = n => n.version ?? n.Version ?? '' + const getDigest = n => n.digest ?? n.Digest ?? '' + + // Inline mode: render every node as its own chip. Good for small clusters + // where seeing the names directly is more useful than a summary. + if (list.length <= compactThreshold) { + return ( +
+ {list.map(n => { + const status = getStatus(n) + const variant = status === 'healthy' ? 'badge-success' + : status === 'draining' ? 'badge-info' + : 'badge-warning' + const title = context === 'models' + ? `${getName(n)} — ${getState(n)} (${status})` + : `${getName(n)} — ${status}${getVersion(n) ? ` · v${getVersion(n)}` : ''}` + return ( + + {getName(n)} + + ) + })} +
+ ) + } + + // Summary mode for anything bigger. Count unhealthy/offline explicitly so + // the chip tells an operator at-a-glance whether to click in. "Drift" for + // backends = more than one (version, digest) tuple across healthy nodes. + const total = list.length + const offline = list.filter(n => { + const s = getStatus(n) + return s !== 'healthy' && s !== 'draining' + }).length + const drift = context === 'backends' ? countDrift(list) : 0 + const severity = offline > 0 || drift > 0 ? 'badge-warning' : 'badge-info' + + return ( + <> + + setOpen(false)} + ariaLabel={context === 'models' ? 'Model distribution' : 'Backend distribution'} + > +
+ Installed on {total} node{total === 1 ? '' : 's'} + {offline > 0 && {offline} offline} + {drift > 0 && {drift} drift} +
+
+ + + + + + {context === 'models' ? : <> + + + } + + + + {list.map(n => ( + + + + {context === 'models' ? ( + + ) : ( + <> + + + + )} + + ))} + +
NodeStatusStateVersionDigest
{getName(n)} + + {getStatus(n)} + + {getState(n) || '—'}{getVersion(n) ? `v${getVersion(n)}` : '—'} + {getDigest(n) ? shortenDigest(getDigest(n)) : '—'} +
+
+
+ + ) +} + +// countDrift counts nodes whose (version, digest) disagrees with the cluster +// majority. Mirrors the backend summarizeNodeDrift logic so the UI number +// matches what CheckUpgradesAgainst emits in UpgradeInfo.NodeDrift. +function countDrift(nodes) { + if (nodes.length <= 1) return 0 + const counts = new Map() + for (const n of nodes) { + const key = `${n.version ?? n.Version ?? ''}|${n.digest ?? n.Digest ?? ''}` + counts.set(key, (counts.get(key) || 0) + 1) + } + if (counts.size === 1) return 0 // unanimous + let topKey = '' + let topCount = 0 + for (const [k, v] of counts.entries()) { + if (v > topCount) { topKey = k; topCount = v } + } + return nodes.length - topCount +} + +// shortenDigest trims a full OCI digest to the common 12-char form used in +// docker/oci tooling. Falls back to the raw value if it doesn't match. +function shortenDigest(digest) { + const m = /^(sha\d+:)?([a-f0-9]+)$/i.exec(digest) + if (!m) return digest + const hex = m[2] + return (m[1] ?? '') + hex.slice(0, 12) +} diff --git a/core/http/react-ui/src/components/Popover.jsx b/core/http/react-ui/src/components/Popover.jsx new file mode 100644 index 000000000..96a9e217e --- /dev/null +++ b/core/http/react-ui/src/components/Popover.jsx @@ -0,0 +1,86 @@ +import { useEffect, useRef, useState, useCallback } from 'react' + +// Minimal popover: positions itself below-right of the trigger's bounding box, +// flips above when there isn't room below, closes on outside click or Escape, +// returns focus to the trigger. Uses the existing .card surface so it picks +// up theme/border/shadow automatically — no new theming work. +// +// Props: +// anchor: ref to the trigger DOMElement (required) +// open: boolean +// onClose: () => void +// children: popover body +// ariaLabel: accessible label for the dialog +export default function Popover({ anchor, open, onClose, children, ariaLabel }) { + const popoverRef = useRef(null) + const [pos, setPos] = useState({ top: 0, left: 0, flipped: false }) + + // Compute position from the anchor's bounding box whenever we open or the + // viewport changes. 240px is the minimum width we'll reserve; bigger content + // grows naturally. + const reposition = useCallback(() => { + if (!anchor?.current) return + const rect = anchor.current.getBoundingClientRect() + const popoverHeight = popoverRef.current?.offsetHeight ?? 0 + const spaceBelow = window.innerHeight - rect.bottom + const flipped = popoverHeight > spaceBelow - 16 && rect.top > popoverHeight + const top = flipped ? rect.top - popoverHeight - 8 : rect.bottom + 8 + // Prefer left-aligned; clamp so we don't go off-screen right. + const left = Math.min(rect.left, window.innerWidth - 320) + setPos({ top, left: Math.max(8, left), flipped }) + }, [anchor]) + + useEffect(() => { + if (!open) return + reposition() + window.addEventListener('resize', reposition) + window.addEventListener('scroll', reposition, true) + return () => { + window.removeEventListener('resize', reposition) + window.removeEventListener('scroll', reposition, true) + } + }, [open, reposition]) + + // Close on outside click or Escape. Mousedown (not click) so the close + // happens before a parent handler could re-trigger us. + useEffect(() => { + if (!open) return + const onMouseDown = (e) => { + if (popoverRef.current && !popoverRef.current.contains(e.target) && !anchor?.current?.contains(e.target)) { + onClose() + } + } + const onKey = (e) => { if (e.key === 'Escape') onClose() } + document.addEventListener('mousedown', onMouseDown) + document.addEventListener('keydown', onKey) + return () => { + document.removeEventListener('mousedown', onMouseDown) + document.removeEventListener('keydown', onKey) + } + }, [open, onClose, anchor]) + + // Return focus to the trigger when the popover closes — keyboard users + // shouldn't have to tab back through the whole page to find their spot. + useEffect(() => { + if (!open && anchor?.current) { + // requestAnimationFrame so the close is painted before focus jumps; + // otherwise screen readers announce the trigger mid-transition. + const raf = requestAnimationFrame(() => anchor.current?.focus?.()) + return () => cancelAnimationFrame(raf) + } + }, [open, anchor]) + + if (!open) return null + + return ( +
+ {children} +
+ ) +} diff --git a/core/http/react-ui/src/pages/Manage.jsx b/core/http/react-ui/src/pages/Manage.jsx index bd6bda938..3f11a7744 100644 --- a/core/http/react-ui/src/pages/Manage.jsx +++ b/core/http/react-ui/src/pages/Manage.jsx @@ -3,6 +3,8 @@ import { useNavigate, useOutletContext, useSearchParams } from 'react-router-dom import ResourceMonitor from '../components/ResourceMonitor' import ConfirmDialog from '../components/ConfirmDialog' import Toggle from '../components/Toggle' +import NodeDistributionChip from '../components/NodeDistributionChip' +import FilterBar from '../components/FilterBar' import { useModels } from '../hooks/useModels' import { backendControlApi, modelsApi, backendsApi, systemApi, nodesApi } from '../utils/api' @@ -11,6 +13,22 @@ const TABS = [ { key: 'backends', label: 'Backends', icon: 'fa-server' }, ] +// formatInstalledAt renders an installed_at timestamp as a short relative/abs +// string suitable for dense tables. Returns the raw value if parsing fails so +// we never display "Invalid Date". +function formatInstalledAt(value) { + if (!value) return '—' + const d = new Date(value) + if (isNaN(d.getTime())) return value + const now = Date.now() + const diffMin = Math.floor((now - d.getTime()) / 60000) + if (diffMin < 1) return 'just now' + if (diffMin < 60) return `${diffMin}m ago` + if (diffMin < 60 * 24) return `${Math.floor(diffMin / 60)}h ago` + if (diffMin < 60 * 24 * 30) return `${Math.floor(diffMin / (60 * 24))}d ago` + return d.toISOString().slice(0, 10) +} + export default function Manage() { const { addToast } = useOutletContext() const navigate = useNavigate() @@ -28,6 +46,24 @@ export default function Manage() { const [distributedMode, setDistributedMode] = useState(false) const [togglingModels, setTogglingModels] = useState(new Set()) const [pinningModels, setPinningModels] = useState(new Set()) + // Filter state per tab. Persisted in the URL query so switching tabs + // doesn't lose the filter the operator just set. + const [modelsSearch, setModelsSearch] = useState(() => searchParams.get('mq') || '') + const [modelsFilter, setModelsFilter] = useState(() => searchParams.get('mf') || 'all') + const [backendsSearch, setBackendsSearch] = useState(() => searchParams.get('bq') || '') + const [backendsFilter, setBackendsFilter] = useState(() => searchParams.get('bf') || 'all') + + // Sync filter state into the URL so deep-links + tab switches survive. + useEffect(() => { + const p = new URLSearchParams(searchParams) + const setOrDelete = (k, v) => { if (v && v !== 'all') p.set(k, v); else p.delete(k) } + setOrDelete('mq', modelsSearch) + setOrDelete('mf', modelsFilter) + setOrDelete('bq', backendsSearch) + setOrDelete('bf', backendsFilter) + setSearchParams(p, { replace: true }) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [modelsSearch, modelsFilter, backendsSearch, backendsFilter]) const handleTabChange = (tab) => { setActiveTab(tab) @@ -64,6 +100,35 @@ export default function Manage() { nodesApi.list().then(() => setDistributedMode(true)).catch(() => {}) }, [fetchLoadedModels, fetchBackends]) + // Auto-refresh the Models tab every 10s in distributed mode so ghost models + // (loaded on a worker but absent from this frontend's in-memory cache) + // clear on their own without the user clicking Update. + const [lastSyncedAt, setLastSyncedAt] = useState(() => Date.now()) + const [nowTick, setNowTick] = useState(() => Date.now()) + useEffect(() => { + if (!distributedMode || activeTab !== 'models') return + const interval = setInterval(() => { + refetchModels() + fetchLoadedModels() + setLastSyncedAt(Date.now()) + }, 10000) + return () => clearInterval(interval) + }, [distributedMode, activeTab, refetchModels, fetchLoadedModels]) + + // Drive the "last synced Ns ago" label without over-rendering the table. + useEffect(() => { + if (!distributedMode) return + const interval = setInterval(() => setNowTick(Date.now()), 1000) + return () => clearInterval(interval) + }, [distributedMode]) + const lastSyncedAgo = (() => { + const s = Math.max(0, Math.floor((nowTick - lastSyncedAt) / 1000)) + if (s < 5) return 'just now' + if (s < 60) return `${s}s ago` + const m = Math.floor(s / 60) + return `${m}m ago` + })() + // Fetch available backend upgrades useEffect(() => { if (activeTab === 'backends') { @@ -196,6 +261,29 @@ export default function Manage() { } } + const [upgradingAll, setUpgradingAll] = useState(false) + const [showOnlyUpgradable, setShowOnlyUpgradable] = useState(false) + const handleUpgradeAll = async () => { + const names = Object.keys(upgrades) + if (names.length === 0) return + setUpgradingAll(true) + try { + // Serial upgrade — matches the gallery's Upgrade All behavior. + // Each backend upgrade is itself a cluster-wide fan-out, so parallel + // calls would multiply load on every worker. + for (const name of names) { + try { + await backendsApi.upgrade(name) + } catch (err) { + addToast(`Upgrade failed for ${name}: ${err.message}`, 'error') + } + } + addToast(`Upgrade started for ${names.length} backend${names.length === 1 ? '' : 's'}`, 'info') + } finally { + setUpgradingAll(false) + } + } + const handleDeleteBackend = (name) => { setConfirmDialog({ title: 'Delete Backend', @@ -227,29 +315,74 @@ export default function Manage() { {/* Tabs */}
- {TABS.map(t => ( - - ))} + {TABS.map(t => { + const upgradeCount = t.key === 'backends' ? Object.keys(upgrades).length : 0 + return ( + + ) + })}
{/* Models Tab */} - {activeTab === 'models' && ( + {activeTab === 'models' && (() => { + // Computed filters — done here so the result is available both to + // the FilterBar counts and to the table body. + const MODEL_FILTERS = [ + { key: 'all', label: 'All', icon: 'fa-layer-group' }, + { key: 'running', label: 'Running', icon: 'fa-circle-play' }, + { key: 'idle', label: 'Idle', icon: 'fa-pause' }, + { key: 'disabled', label: 'Disabled', icon: 'fa-ban' }, + { key: 'pinned', label: 'Pinned', icon: 'fa-thumbtack' }, + ...(distributedMode ? [{ key: 'distributed', label: 'Distributed', icon: 'fa-server' }] : []), + ] + const passesFilter = (m) => { + if (modelsFilter === 'running') return !m.disabled && (loadedModelIds.has(m.id) || (m.loaded_on && m.loaded_on.length > 0)) + if (modelsFilter === 'idle') return !m.disabled && !loadedModelIds.has(m.id) && !(m.loaded_on && m.loaded_on.length > 0) + if (modelsFilter === 'disabled') return !!m.disabled + if (modelsFilter === 'pinned') return !!m.pinned + if (modelsFilter === 'distributed') return Array.isArray(m.loaded_on) && m.loaded_on.length > 0 + return true + } + const q = modelsSearch.trim().toLowerCase() + const passesSearch = (m) => !q || (m.id || '').toLowerCase().includes(q) || (m.backend || '').toLowerCase().includes(q) + const visibleModels = models.filter(m => passesFilter(m) && passesSearch(m)) + return (
-
- -
+ + {distributedMode && ( + + Last synced {lastSyncedAgo} + + )} + + + )} + /> {modelsLoading ? (
@@ -274,6 +407,12 @@ export default function Manage() {
+ ) : visibleModels.length === 0 ? ( +
+ +

No models match the current filter.

+ +
) : (
@@ -288,7 +427,7 @@ export default function Manage() { - {models.map(model => ( + {visibleModels.map(model => ( {/* Enable/Disable toggle */} - {/* Status */} + {/* Status / Distribution */} {/* Backend */}
@@ -329,21 +468,33 @@ export default function Manage() { - {model.disabled ? ( - - Disabled - - ) : loadedModelIds.has(model.id) ? ( - - Running - - ) : ( - - Idle - - )} +
+ {model.disabled ? ( + + Disabled + + ) : model.loaded_on && model.loaded_on.length > 0 ? ( + // Distributed mode: surface where the model is + // actually loaded. Shared chip scales to any cluster + // size (inline for <=3, popover for larger). + + ) : loadedModelIds.has(model.id) ? ( + + Running + + ) : ( + + Idle + + )} + {model.source === 'registry-only' && ( + + Adopted + + )} +
@@ -394,11 +545,34 @@ export default function Manage() { )} - )} + ) + })()} {/* Backends Tab */} {activeTab === 'backends' && (
+ {/* Upgrade banner — mirrors the gallery so operators can't miss updates */} + {!backendsLoading && Object.keys(upgrades).length > 0 && ( +
+
+ + + {Object.keys(upgrades).length} backend{Object.keys(upgrades).length === 1 ? ' has' : 's have'} updates available + +
+
+ +
+
+ )} + {backendsLoading ? (
Loading backends... @@ -419,109 +593,217 @@ export default function Manage() {
- ) : ( -
+ ) : (() => { + // Count chip badges: show N in the filter buttons so operators can + // see at a glance how their chips bucket the list. + const upgradableCount = backends.filter(b => upgrades[b.Name]).length + const userCount = backends.filter(b => !b.IsSystem).length + const systemCount = backends.filter(b => b.IsSystem).length + const metaCount = backends.filter(b => b.IsMeta).length + const offlineCount = backends.filter(b => { + const n = b.Nodes || b.nodes || [] + return n.some(x => { + const s = x.node_status || x.NodeStatus + return s && s !== 'healthy' && s !== 'draining' + }) + }).length + + const BACKEND_FILTERS = [ + { key: 'all', label: 'All', icon: 'fa-layer-group', count: backends.length }, + { key: 'user', label: 'User', icon: 'fa-download', count: userCount }, + { key: 'system', label: 'System', icon: 'fa-shield-alt', count: systemCount }, + { key: 'meta', label: 'Meta', icon: 'fa-layer-group', count: metaCount }, + ...(upgradableCount > 0 ? [{ key: 'upgradable', label: 'Updates', icon: 'fa-arrow-up', count: upgradableCount }] : []), + ...(distributedMode && offlineCount > 0 ? [{ key: 'offline', label: 'Offline nodes', icon: 'fa-exclamation-circle', count: offlineCount }] : []), + ] + const q = backendsSearch.trim().toLowerCase() + const passesSearch = (b) => !q + || (b.Name || '').toLowerCase().includes(q) + || (b.Metadata?.alias || '').toLowerCase().includes(q) + || (b.Metadata?.meta_backend_for || '').toLowerCase().includes(q) + const passesFilter = (b) => { + switch (backendsFilter) { + case 'user': return !b.IsSystem + case 'system': return !!b.IsSystem + case 'meta': return !!b.IsMeta + case 'upgradable': return !!upgrades[b.Name] + case 'offline': { + const n = b.Nodes || b.nodes || [] + return n.some(x => { + const s = x.node_status || x.NodeStatus + return s && s !== 'healthy' && s !== 'draining' + }) + } + default: return true + } + } + // Legacy "showOnlyUpgradable" toggle is now the 'upgradable' chip — + // keep backward-compat by mapping it onto the new filter. + if (showOnlyUpgradable && backendsFilter !== 'upgradable') { + // One-shot reconciliation — the old state becomes the new chip. + setBackendsFilter('upgradable') + setShowOnlyUpgradable(false) + } + const visibleBackends = backends.filter(b => passesFilter(b) && passesSearch(b)) + if (visibleBackends.length === 0) { + return ( + <> + +
+ +

No backends match the current filter.

+ +
+ + ) + } + return ( + <> + +
- + + {distributedMode && } + - {backends.map((backend, i) => ( + {visibleBackends.map((backend, i) => { + const upgradeInfo = upgrades[backend.Name] + const hasDrift = upgradeInfo?.node_drift?.length > 0 + const nodes = backend.Nodes || backend.nodes || [] + return ( + {distributedMode && ( + + )} + - ))} + ) + })}
Name TypeMetadataVersionNodesInstalled Actions
-
- - {backend.Name} +
+ + {backend.Name} + {backend.Metadata?.alias && ( + alias: {backend.Metadata.alias} + )} + {backend.Metadata?.meta_backend_for && ( + for: {backend.Metadata.meta_backend_for} + )}
-
+
{backend.IsSystem ? ( - - System + + System ) : ( - - User + + User )} {backend.IsMeta && ( - - Meta + + Meta )}
-
- {backend.Metadata?.alias && ( - - - Alias: {backend.Metadata.alias} +
+ {backend.Metadata?.version ? ( + v{backend.Metadata.version} + ) : ( + + )} + {upgradeInfo && ( + + + {upgradeInfo.available_version ? ` v${upgradeInfo.available_version}` : ' Update available'} )} - {backend.Metadata?.meta_backend_for && ( - - - For: {backend.Metadata.meta_backend_for} + {hasDrift && ( + `${d.node_name}${d.version ? ' v' + d.version : ''}`).join(', ')}`} + > + + {' '}Drift: {upgradeInfo.node_drift.length} node{upgradeInfo.node_drift.length === 1 ? '' : 's'} )} - {backend.Metadata?.version && ( - - - Version: v{backend.Metadata.version} - {upgrades[backend.Name] && ( - - → v{upgrades[backend.Name].available_version} - - )} - - )} - {backend.Metadata?.installed_at && ( - - - {backend.Metadata.installed_at} - - )} - {!backend.Metadata?.alias && !backend.Metadata?.meta_backend_for && !backend.Metadata?.installed_at && '—'}
+ + -
- {!backend.IsSystem ? ( + + {backend.Metadata?.installed_at ? formatInstalledAt(backend.Metadata.installed_at) : '—'} + +
+
+ {backend.IsSystem ? ( + + Protected + + ) : ( <> + {upgradeInfo ? ( + + ) : ( + + )} - - ) : ( - )}
-
- )} +
+ + ) + })()} )} diff --git a/core/http/react-ui/src/pages/Nodes.jsx b/core/http/react-ui/src/pages/Nodes.jsx index 0903dfb34..0e0698241 100644 --- a/core/http/react-ui/src/pages/Nodes.jsx +++ b/core/http/react-ui/src/pages/Nodes.jsx @@ -51,15 +51,22 @@ const modelStateConfig = { idle: { bg: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)', border: 'var(--color-border-subtle)' }, } -function StatCard({ icon, label, value, color }) { +function StatCard({ icon, label, value, color, accentVar }) { + // accentVar: optional CSS variable for the left edge + icon chip, e.g. + // "--color-success". When unset the card reads neutral — used for simple + // counts so they don't compete with the semantic cards for attention. + const accent = color || (accentVar ? `var(${accentVar})` : 'var(--color-text-primary)') return ( -
-
- - {label} +
+
+
{label}
+
{value}
-
- {value} +
+
) @@ -543,45 +550,24 @@ export default function Nodes() {
{/* Tabs */} -
+
{showTips && } @@ -685,23 +671,28 @@ export default function Nodes() { >
- + {canExpand && ( + + )} +
-
{node.name}
-
+
{node.name}
+
{node.address}
{node.labels && Object.keys(node.labels).length > 0 && (
{Object.entries(node.labels).slice(0, 5).map(([k, v]) => ( - {k}={v} ))} {Object.keys(node.labels).length > 5 && ( - + +{Object.keys(node.labels).length - 5} more )} @@ -711,12 +702,10 @@ export default function Nodes() {
-
- - - {status.label} - -
+ + + {status.label} +
{hasGPU && totalVRAMStr ? ( @@ -745,38 +734,37 @@ export default function Nodes() { -
e.stopPropagation()}> +
e.stopPropagation()}> {node.status === 'pending' && ( )} {node.status === 'draining' && ( )} {node.status !== 'draining' && node.status !== 'pending' && ( )} @@ -794,7 +782,10 @@ export default function Nodes() { {!models ? ( ) : models.length === 0 ? ( -

No models loaded on this node

+
+ + No models loaded on this node yet. +
) : ( @@ -870,7 +861,10 @@ export default function Nodes() { {!backends ? ( ) : backends.length === 0 ? ( -

No backends installed on this node

+
+ + No backends installed on this node. Install one from the gallery to schedule models here. +
) : (
diff --git a/core/http/routes/ui_api.go b/core/http/routes/ui_api.go index b6db24e09..8d089f873 100644 --- a/core/http/routes/ui_api.go +++ b/core/http/routes/ui_api.go @@ -510,28 +510,89 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model modelConfigs := cl.GetAllModelsConfigs() modelsWithoutConfig, _ := galleryop.ListModels(cl, ml, config.NoFilterFn, galleryop.LOOSE_ONLY) + type loadedOn struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + State string `json:"state"` + NodeStatus string `json:"node_status"` + } type modelCapability struct { - ID string `json:"id"` - Capabilities []string `json:"capabilities"` - Backend string `json:"backend"` - Disabled bool `json:"disabled"` - Pinned bool `json:"pinned"` + ID string `json:"id"` + Capabilities []string `json:"capabilities"` + Backend string `json:"backend"` + Disabled bool `json:"disabled"` + Pinned bool `json:"pinned"` + // LoadedOn is populated only when the node registry is active + // (distributed mode). Lets the UI show "loaded on worker-1" without + // the operator having to expand every node manually. An empty slice + // with nil reports "no loaded replicas" vs. nil reports "not in + // cluster mode" — the frontend treats both as "no distribution info". + LoadedOn []loadedOn `json:"loaded_on,omitempty"` + // Source="registry-only" marks models adopted from the cluster that + // have no local config yet (ghosts that the reconciler discovered). + Source string `json:"source,omitempty"` + } + + // Join with the node registry when we have one (distributed mode). A + // single registry fetch + map join beats per-model queries for the + // 100-model case. + var loadedByModel map[string][]loadedOn + if ds := applicationInstance.Distributed(); ds != nil && ds.Registry != nil { + nodeModels, err := ds.Registry.ListAllLoadedModels(c.Request().Context()) + if err == nil { + allNodes, _ := ds.Registry.List(c.Request().Context()) + nameByID := make(map[string]string, len(allNodes)) + statusByID := make(map[string]string, len(allNodes)) + for _, n := range allNodes { + nameByID[n.ID] = n.Name + statusByID[n.ID] = n.Status + } + loadedByModel = make(map[string][]loadedOn) + for _, nm := range nodeModels { + loadedByModel[nm.ModelName] = append(loadedByModel[nm.ModelName], loadedOn{ + NodeID: nm.NodeID, + NodeName: nameByID[nm.NodeID], + State: nm.State, + NodeStatus: statusByID[nm.NodeID], + }) + } + } } result := make([]modelCapability, 0, len(modelConfigs)+len(modelsWithoutConfig)) + seen := make(map[string]bool, len(modelConfigs)+len(modelsWithoutConfig)) for _, cfg := range modelConfigs { + seen[cfg.Name] = true result = append(result, modelCapability{ ID: cfg.Name, Capabilities: cfg.KnownUsecaseStrings, Backend: cfg.Backend, Disabled: cfg.IsDisabled(), Pinned: cfg.IsPinned(), + LoadedOn: loadedByModel[cfg.Name], }) } for _, name := range modelsWithoutConfig { + seen[name] = true result = append(result, modelCapability{ ID: name, Capabilities: []string{}, + LoadedOn: loadedByModel[name], + }) + } + // Emit entries for cluster models that have no local config — these + // are the actual ghosts. Without this the operator would have no way + // to see a model the cluster is running if its config file wasn't + // synced to this frontend's filesystem. + for name, loc := range loadedByModel { + if seen[name] { + continue + } + result = append(result, modelCapability{ + ID: name, + Capabilities: []string{}, + LoadedOn: loc, + Source: "registry-only", }) } diff --git a/core/services/advisorylock/keys.go b/core/services/advisorylock/keys.go index d5378a5d1..277817229 100644 --- a/core/services/advisorylock/keys.go +++ b/core/services/advisorylock/keys.go @@ -11,4 +11,5 @@ const ( KeyHealthCheck int64 = 104 KeySchemaMigrate int64 = 105 KeyBackendUpgradeCheck int64 = 106 + KeyStateReconciler int64 = 107 ) diff --git a/core/services/galleryop/service.go b/core/services/galleryop/service.go index fef638425..3d77d11d6 100644 --- a/core/services/galleryop/service.go +++ b/core/services/galleryop/service.go @@ -57,6 +57,16 @@ func (g *GalleryService) SetBackendManager(b BackendManager) { g.backendManager = b } +// BackendManager returns the current backend manager. Callers like the +// periodic upgrade checker need this so they run CheckUpgrades through the +// distributed implementation (which asks workers) instead of the frontend's +// local filesystem — the latter is always empty in distributed deployments. +func (g *GalleryService) BackendManager() BackendManager { + g.Lock() + defer g.Unlock() + return g.backendManager +} + // SetNATSClient sets the NATS client for distributed progress publishing. func (g *GalleryService) SetNATSClient(nc messaging.Publisher) { g.Lock() diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 397f63ff0..3e9af53a9 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -157,6 +157,12 @@ type NodeBackendInfo struct { IsMeta bool `json:"is_meta"` InstalledAt string `json:"installed_at,omitempty"` GalleryURL string `json:"gallery_url,omitempty"` + // Version, URI and Digest enable cluster-wide upgrade detection — + // without them, the frontend cannot tell whether the installed OCI + // image matches the gallery entry, and upgrades silently never surface. + Version string `json:"version,omitempty"` + URI string `json:"uri,omitempty"` + Digest string `json:"digest,omitempty"` } // SubjectNodeBackendStop tells a worker node to stop its gRPC backend process. diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 62cb32552..e524756da 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -10,6 +10,7 @@ import ( "github.com/mudler/LocalAI/core/gallery" "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/xlog" "github.com/nats-io/nats.go" ) @@ -53,6 +54,7 @@ type DistributedBackendManager struct { adapter *RemoteUnloaderAdapter registry *NodeRegistry backendGalleries []config.Gallery + systemState *system.SystemState } // NewDistributedBackendManager creates a DistributedBackendManager. @@ -62,46 +64,161 @@ func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model adapter: adapter, registry: registry, backendGalleries: appConfig.BackendGalleries, + systemState: appConfig.SystemState, } } +// NodeOpStatus is the per-node outcome of a backend lifecycle operation. +// Returned as part of BackendOpResult so the frontend can surface exactly +// what happened on each worker instead of a single joined error string. +type NodeOpStatus struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + Status string `json:"status"` // "success" | "queued" | "error" + Error string `json:"error,omitempty"` +} + +// BackendOpResult aggregates per-node outcomes. +type BackendOpResult struct { + Nodes []NodeOpStatus `json:"nodes"` +} + +// enqueueAndDrainBackendOp is the shared scaffolding for +// delete/install/upgrade. Every non-pending node gets a pending_backend_ops +// row (intent is durable even if the node is offline). Currently-healthy +// nodes get an immediate attempt; success deletes the row, failure records +// the error and leaves the row for the reconciler to retry. +// +// `apply` is the NATS round-trip for one node. Returning an error keeps the +// row in the queue and marks the per-node status as "error"; returning nil +// deletes the row and reports "success". For non-healthy nodes the status +// is "queued" — no attempt is made right now, reconciler will pick it up +// when the node returns. +func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) { + allNodes, err := d.registry.List(ctx) + if err != nil { + return BackendOpResult{}, err + } + + result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))} + for _, node := range allNodes { + // Pending nodes haven't been approved yet — no intent to apply. + if node.Status == StatusPending { + continue + } + if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { + xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "error", + Error: fmt.Sprintf("enqueue failed: %v", err), + }) + continue + } + + if node.Status != StatusHealthy { + // Intent is recorded; reconciler will retry when the node recovers. + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "queued", + Error: fmt.Sprintf("node %s, will retry when healthy", node.Status), + }) + continue + } + + applyErr := apply(node) + if applyErr == nil { + // Find the row we just upserted and delete it; cheap but requires + // a lookup since UpsertPendingBackendOp doesn't return the ID. + if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil { + xlog.Debug("Failed to clear pending backend op after success", "error", err) + } + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "success", + }) + continue + } + + // Record failure for backoff. If it's an ErrNoResponders, the node's + // gone AWOL — mark unhealthy so the router stops picking it too. + errMsg := applyErr.Error() + if errors.Is(applyErr, nats.ErrNoResponders) { + xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) + d.registry.MarkUnhealthy(ctx, node.ID) + } + if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil { + _ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg) + } + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg, + }) + } + return result, nil +} + +// findPendingRow looks up the ID of a pending_backend_ops row by its +// composite key. Used to hand off to RecordPendingBackendOpFailure / +// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same +// composite key. +func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) { + var row PendingBackendOp + if err := d.registry.db.WithContext(ctx). + Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op). + First(&row).Error; err != nil { + return 0, err + } + return row.ID, nil +} + +// deletePendingRow removes the queue row keyed by (nodeID, backend, op). +func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error { + return d.registry.db.WithContext(ctx). + Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op). + Delete(&PendingBackendOp{}).Error +} + +// DeleteBackend fans out backend deletion to every known node. The previous +// implementation silently skipped non-healthy nodes, which meant zombies +// reappeared once those nodes returned. Now the intent is durable — see +// enqueueAndDrainBackendOp — and the reconciler catches up later. func (d *DistributedBackendManager) DeleteBackend(name string) error { - // Try local deletion but ignore "not found" errors — in distributed mode - // the frontend node typically doesn't have backends installed locally; - // they only exist on worker nodes. + // Local delete first (frontend rarely has backends installed in + // distributed mode, but the gallery operation still expects it; ignore + // "not found" which is the common case). if err := d.local.DeleteBackend(name); err != nil { if !errors.Is(err, gallery.ErrBackendNotFound) { return err } xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name) } - // Fan out backend.delete to all healthy nodes - allNodes, listErr := d.registry.List(context.Background()) - if listErr != nil { - xlog.Warn("Failed to list nodes for backend deletion fan-out", "error", listErr) - return listErr - } - var errs []error - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } - if _, delErr := d.adapter.DeleteBackend(node.ID, name); delErr != nil { - if errors.Is(delErr, nats.ErrNoResponders) { - // Node's NATS subscription is gone — likely restarted with a new ID. - // Mark it unhealthy so future fan-outs skip it. - xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - xlog.Warn("Failed to propagate backend deletion to worker", "node", node.Name, "backend", name, "error", delErr) - errs = append(errs, fmt.Errorf("node %s: %w", node.Name, delErr)) - } - } - return errors.Join(errs...) + + ctx := context.Background() + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + _, err := d.adapter.DeleteBackend(node.ID, name) + return err + }) + return err } -// ListBackends aggregates installed backends from all healthy worker nodes. +// DeleteBackendDetailed is the per-node-result variant called by the HTTP +// handler so the UI can render a per-node status drawer. DeleteBackend still +// returns error-only for callers that don't care about node breakdown. +func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) { + if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) { + return BackendOpResult{}, err + } + return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + _, err := d.adapter.DeleteBackend(node.ID, name) + return err + }) +} + +// ListBackends aggregates installed backends from all worker nodes, preserving +// per-node attribution. Each SystemBackend.Nodes entry records which node has +// the backend and the version/digest it reports. The top-level Metadata is +// populated from the first node seen so single-node-minded callers still work. +// +// Pending/offline/draining nodes are skipped because they aren't expected to +// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders +// then marks them unhealthy and the loop continues. func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) { result := make(gallery.SystemBackends) allNodes, err := d.registry.List(context.Background()) @@ -110,7 +227,7 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro } for _, node := range allNodes { - if node.Status != StatusHealthy { + if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining { continue } reply, err := d.adapter.ListBackends(node.ID) @@ -128,89 +245,92 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro continue } for _, b := range reply.Backends { - if _, exists := result[b.Name]; !exists { - result[b.Name] = gallery.SystemBackend{ + ref := gallery.NodeBackendRef{ + NodeID: node.ID, + NodeName: node.Name, + NodeStatus: node.Status, + Version: b.Version, + Digest: b.Digest, + URI: b.URI, + InstalledAt: b.InstalledAt, + } + entry, exists := result[b.Name] + if !exists { + entry = gallery.SystemBackend{ Name: b.Name, IsSystem: b.IsSystem, IsMeta: b.IsMeta, Metadata: &gallery.BackendMetadata{ + Name: b.Name, InstalledAt: b.InstalledAt, GalleryURL: b.GalleryURL, + Version: b.Version, + URI: b.URI, + Digest: b.Digest, }, } } + entry.Nodes = append(entry.Nodes, ref) + result[b.Name] = entry } } return result, nil } -// InstallBackend fans out backend installation to all healthy worker nodes. +// InstallBackend fans out installation through the pending-ops queue so +// non-healthy nodes get retried when they come back instead of being silently +// skipped. Reply success from the NATS round-trip deletes the queue row; +// reply.Success==false is treated as an error so the row stays for retry. func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error { - allNodes, err := d.registry.List(context.Background()) - if err != nil { - return err - } - galleriesJSON, _ := json.Marshal(op.Galleries) backendName := op.GalleryElementName - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON)) if err != nil { - if errors.Is(err, nats.ErrNoResponders) { - xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - xlog.Warn("Failed to install backend on worker", "node", node.Name, "backend", backendName, "error", err) - continue + return err } if !reply.Success { - xlog.Warn("Backend install failed on worker", "node", node.Name, "backend", backendName, "error", reply.Error) + return fmt.Errorf("install failed: %s", reply.Error) } - } - return nil + return nil + }) + return err } -// UpgradeBackend fans out a backend upgrade to all healthy worker nodes. -// TODO: Add dedicated NATS subject for upgrade (currently reuses install with force flag) +// UpgradeBackend reuses the install NATS subject (the worker re-downloads +// from the gallery). Same queue semantics as Install/Delete. func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { - allNodes, err := d.registry.List(context.Background()) - if err != nil { - return err - } - galleriesJSON, _ := json.Marshal(d.backendGalleries) - var errs []error - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } - // Reuse install endpoint which will re-download the backend (force mode) + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON)) if err != nil { - if errors.Is(err, nats.ErrNoResponders) { - xlog.Warn("No NATS responders for node during upgrade, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - errs = append(errs, fmt.Errorf("node %s: %w", node.Name, err)) - continue + return err } if !reply.Success { - errs = append(errs, fmt.Errorf("node %s: %s", node.Name, reply.Error)) + return fmt.Errorf("upgrade failed: %s", reply.Error) } - } - - return errors.Join(errs...) + return nil + }) + return err } -// CheckUpgrades checks for available backend upgrades. -// Gallery comparison is global (not per-node), so we delegate to the local manager. +// CheckUpgrades checks for available backend upgrades across the cluster. +// +// The previous implementation delegated to d.local, which called +// ListSystemBackends on the frontend — but in distributed mode the frontend +// has no backends installed locally, so the upgrade loop never ran and the UI +// never surfaced any upgrades. We now feed the cluster-wide aggregation +// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so +// digest-based detection actually works and cluster drift is visible. func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) { - return d.local.CheckUpgrades(ctx) + installed, err := d.ListBackends() + if err != nil { + return nil, err + } + // systemState is used by AvailableBackends (gallery paths + meta-backend + // resolution). The `installed` argument is what the old code got wrong — + // it used to come from the empty frontend filesystem. + return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed) } diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 92ba76edc..6d87165b4 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -3,26 +3,57 @@ package nodes import ( "context" "encoding/json" + "fmt" "time" "github.com/mudler/LocalAI/core/services/advisorylock" + grpcclient "github.com/mudler/LocalAI/pkg/grpc" "github.com/mudler/xlog" "gorm.io/gorm" ) +// ModelProber checks whether a model's backend process is still reachable. +// Defaulted to a gRPC health probe but overridable for tests so we don't +// need to stand up a real server. Returning false without an error means the +// process is reachable but unhealthy (same as a timeout for our purposes). +type ModelProber interface { + IsAlive(ctx context.Context, address string) bool +} + +// grpcModelProber does a 1s HealthCheck on the model's stored gRPC address. +type grpcModelProber struct{ token string } + +func (g grpcModelProber) IsAlive(ctx context.Context, address string) bool { + client := grpcclient.NewClientWithToken(address, false, nil, false, g.token) + probeCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + ok, _ := client.HealthCheck(probeCtx) + return ok +} + // ReplicaReconciler periodically ensures model replica counts match their // scheduling configs. It scales up replicas when below MinReplicas or when // all replicas are busy (up to MaxReplicas), and scales down idle replicas // above MinReplicas. // +// Alongside replica scaling it runs two state-reconciliation passes — draining +// the pending_backend_ops queue and probing loaded models' gRPC addresses to +// orphan ghosts. Both passes are wrapped in the KeyStateReconciler advisory +// lock so N frontends don't stampede. +// // Only processes models with auto-scaling enabled (MinReplicas > 0 or MaxReplicas > 0). type ReplicaReconciler struct { registry *NodeRegistry scheduler ModelScheduler // interface for scheduling new models unloader NodeCommandSender + adapter *RemoteUnloaderAdapter // NATS sender for pending-op drain + prober ModelProber // health probe for model gRPC addrs db *gorm.DB interval time.Duration scaleDownDelay time.Duration + // probeStaleAfter: only probe node_models rows older than this so we + // don't hammer every worker every tick for models we just heard from. + probeStaleAfter time.Duration } // ModelScheduler abstracts the scheduling logic needed by the reconciler. @@ -35,12 +66,21 @@ type ModelScheduler interface { // ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler. type ReplicaReconcilerOptions struct { - Registry *NodeRegistry - Scheduler ModelScheduler - Unloader NodeCommandSender - DB *gorm.DB - Interval time.Duration // default 30s - ScaleDownDelay time.Duration // default 5m + Registry *NodeRegistry + Scheduler ModelScheduler + Unloader NodeCommandSender + // Adapter is the NATS sender used to retry pending backend ops. When nil, + // the state-reconciler pending-drain pass is a no-op (single-node mode). + Adapter *RemoteUnloaderAdapter + // RegistrationToken is used by the default gRPC prober when probing model + // addresses. Matches the worker's token so HealthCheck auth succeeds. + RegistrationToken string + // Prober overrides the default gRPC health probe (used by tests). + Prober ModelProber + DB *gorm.DB + Interval time.Duration // default 30s + ScaleDownDelay time.Duration // default 5m + ProbeStaleAfter time.Duration // default 2m } // NewReplicaReconciler creates a new ReplicaReconciler. @@ -53,13 +93,24 @@ func NewReplicaReconciler(opts ReplicaReconcilerOptions) *ReplicaReconciler { if scaleDownDelay == 0 { scaleDownDelay = 5 * time.Minute } + probeStaleAfter := opts.ProbeStaleAfter + if probeStaleAfter == 0 { + probeStaleAfter = 2 * time.Minute + } + prober := opts.Prober + if prober == nil { + prober = grpcModelProber{token: opts.RegistrationToken} + } return &ReplicaReconciler{ - registry: opts.Registry, - scheduler: opts.Scheduler, - unloader: opts.Unloader, - db: opts.DB, - interval: interval, - scaleDownDelay: scaleDownDelay, + registry: opts.Registry, + scheduler: opts.Scheduler, + unloader: opts.Unloader, + adapter: opts.Adapter, + prober: prober, + db: opts.DB, + interval: interval, + scaleDownDelay: scaleDownDelay, + probeStaleAfter: probeStaleAfter, } } @@ -78,17 +129,122 @@ func (rc *ReplicaReconciler) Run(ctx context.Context) { } } -// reconcileOnce performs a single reconciliation pass. -// Uses an advisory lock so only one frontend instance reconciles at a time. +// reconcileOnce performs a single reconciliation pass. Replica work and +// state-reconciliation work run under *different* advisory locks so multiple +// frontends can share load across passes, and one long-running pass doesn't +// block the other forever if a frontend wedges. func (rc *ReplicaReconciler) reconcileOnce(ctx context.Context) { if rc.db != nil { - lockKey := advisorylock.KeyFromString("replica-reconciler") - _ = advisorylock.WithLockCtx(ctx, rc.db, lockKey, func() error { + replicaKey := advisorylock.KeyFromString("replica-reconciler") + _ = advisorylock.WithLockCtx(ctx, rc.db, replicaKey, func() error { rc.reconcile(ctx) return nil }) + // Try, don't block: if another frontend is already running the state + // pass, this tick is a no-op. Matches the health monitor pattern. + _, _ = advisorylock.TryWithLockCtx(ctx, rc.db, advisorylock.KeyStateReconciler, func() error { + rc.reconcileState(ctx) + return nil + }) } else { rc.reconcile(ctx) + rc.reconcileState(ctx) + } +} + +// reconcileState runs the state-reconciliation passes: drain pending backend +// ops for freshly-healthy nodes, then probe model gRPC addresses to orphan +// ghosts. Both passes are best-effort: a failure on one node doesn't stop +// the rest. +func (rc *ReplicaReconciler) reconcileState(ctx context.Context) { + if rc.adapter != nil { + rc.drainPendingBackendOps(ctx) + } + rc.probeLoadedModels(ctx) +} + +// drainPendingBackendOps retries queued backend ops whose next_retry_at has +// passed on nodes that are currently healthy. On success the row is deleted; +// on failure attempts++ and next_retry_at moves out via exponential backoff. +func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { + ops, err := rc.registry.ListDuePendingBackendOps(ctx) + if err != nil { + xlog.Warn("Reconciler: failed to list pending backend ops", "error", err) + return + } + if len(ops) == 0 { + return + } + xlog.Debug("Reconciler: draining pending backend ops", "count", len(ops)) + + for _, op := range ops { + if err := ctx.Err(); err != nil { + return + } + var applyErr error + switch op.Op { + case OpBackendDelete: + _, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend) + case OpBackendInstall, OpBackendUpgrade: + reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries)) + if err != nil { + applyErr = err + } else if !reply.Success { + applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error) + } + default: + xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID) + continue + } + + if applyErr == nil { + if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil { + xlog.Warn("Reconciler: failed to delete drained op row", "id", op.ID, "error", err) + } else { + xlog.Info("Reconciler: pending backend op applied", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1) + } + continue + } + _ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error()) + xlog.Warn("Reconciler: pending backend op retry failed", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr) + } +} + +// probeLoadedModels gRPC-health-checks model addresses that the DB says are +// loaded. If a model's backend process is gone (OOM, crash, manual restart) +// we remove the row so ghosts don't linger. Only probes rows older than +// probeStaleAfter so we don't hammer every worker every tick for models we +// just heard from. +func (rc *ReplicaReconciler) probeLoadedModels(ctx context.Context) { + var stale []NodeModel + cutoff := time.Now().Add(-rc.probeStaleAfter) + err := rc.registry.db.WithContext(ctx). + Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id"). + Where("node_models.state = ? AND backend_nodes.status = ? AND node_models.updated_at < ? AND node_models.address != ''", + "loaded", StatusHealthy, cutoff). + Find(&stale).Error + if err != nil { + xlog.Warn("Reconciler: failed to list loaded models for probe", "error", err) + return + } + for _, m := range stale { + if err := ctx.Err(); err != nil { + return + } + if rc.prober.IsAlive(ctx, m.Address) { + // Bump updated_at so we don't probe this row again immediately. + _ = rc.registry.db.WithContext(ctx).Model(&NodeModel{}). + Where("id = ?", m.ID).Update("updated_at", time.Now()).Error + continue + } + if err := rc.registry.RemoveNodeModel(ctx, m.NodeID, m.ModelName); err != nil { + xlog.Warn("Reconciler: failed to remove unreachable model", "node", m.NodeID, "model", m.ModelName, "error", err) + continue + } + xlog.Warn("Reconciler: model unreachable, removed from registry", + "node", m.NodeID, "model", m.ModelName, "address", m.Address) } } diff --git a/core/services/nodes/reconciler_test.go b/core/services/nodes/reconciler_test.go index e95f8bcea..52a488a2a 100644 --- a/core/services/nodes/reconciler_test.go +++ b/core/services/nodes/reconciler_test.go @@ -239,3 +239,138 @@ var _ = Describe("ReplicaReconciler", func() { }) }) }) + +// fakeProber lets tests control whether a model's gRPC address "responds". +type fakeProber struct { + alive map[string]bool + calls int +} + +func (f *fakeProber) IsAlive(_ context.Context, address string) bool { + f.calls++ + if f.alive == nil { + return false + } + return f.alive[address] +} + +var _ = Describe("ReplicaReconciler — state reconciliation", func() { + var ( + db *gorm.DB + registry *NodeRegistry + ) + + BeforeEach(func() { + if runtime.GOOS == "darwin" { + Skip("testcontainers requires Docker, not available on macOS CI") + } + db = testutil.SetupTestDB() + var err error + registry, err = NewNodeRegistry(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("probeLoadedModels", func() { + It("removes loaded models whose gRPC address is unreachable", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + // Two loaded models — one stale (will probe), one fresh (skipped). + stale := &NodeModel{ + ID: "stale-1", + NodeID: node.ID, + ModelName: "stale-model", + Address: "10.0.0.1:12345", + State: "loaded", + UpdatedAt: time.Now().Add(-5 * time.Minute), + } + fresh := &NodeModel{ + ID: "fresh-1", + NodeID: node.ID, + ModelName: "fresh-model", + Address: "10.0.0.1:54321", + State: "loaded", + UpdatedAt: time.Now(), // within probeStaleAfter + } + Expect(db.Create(stale).Error).To(Succeed()) + Expect(db.Create(fresh).Error).To(Succeed()) + + prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": false}} + rc := NewReplicaReconciler(ReplicaReconcilerOptions{ + Registry: registry, + DB: db, + Prober: prober, + ProbeStaleAfter: 2 * time.Minute, + }) + + rc.probeLoadedModels(context.Background()) + + // Stale was unreachable — row removed. + var after []NodeModel + Expect(db.Find(&after).Error).To(Succeed()) + Expect(after).To(HaveLen(1)) + Expect(after[0].ModelName).To(Equal("fresh-model")) + // Prober was only called once (the fresh row was filtered out). + Expect(prober.calls).To(Equal(1)) + }) + + It("keeps reachable models and bumps their updated_at", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + stale := &NodeModel{ + ID: "stale-2", + NodeID: node.ID, + ModelName: "alive-model", + Address: "10.0.0.1:12345", + State: "loaded", + UpdatedAt: time.Now().Add(-5 * time.Minute), + } + Expect(db.Create(stale).Error).To(Succeed()) + + prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": true}} + rc := NewReplicaReconciler(ReplicaReconcilerOptions{ + Registry: registry, + DB: db, + Prober: prober, + ProbeStaleAfter: 2 * time.Minute, + }) + + rc.probeLoadedModels(context.Background()) + + var after NodeModel + Expect(db.First(&after, "id = ?", "stale-2").Error).To(Succeed()) + Expect(after.UpdatedAt).To(BeTemporally("~", time.Now(), time.Second)) + }) + }) + + Describe("UpsertPendingBackendOp + RecordPendingBackendOpFailure", func() { + It("upserts on the composite key rather than duplicating rows", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + // Second call for the same (node, backend, op) should not create a + // new row — that's how re-issuing a delete works. + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + + var rows []PendingBackendOp + Expect(db.Find(&rows).Error).To(Succeed()) + Expect(rows).To(HaveLen(1)) + }) + + It("increments attempts and moves next_retry_at out on failure", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + + var row PendingBackendOp + Expect(db.First(&row).Error).To(Succeed()) + before := row.NextRetryAt + + Expect(registry.RecordPendingBackendOpFailure(context.Background(), row.ID, "boom")).To(Succeed()) + Expect(db.First(&row, row.ID).Error).To(Succeed()) + Expect(row.Attempts).To(Equal(1)) + Expect(row.LastError).To(Equal("boom")) + Expect(row.NextRetryAt).To(BeTemporally(">", before)) + }) + }) +}) diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index f56fcedcc..3894b41c5 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -104,6 +104,36 @@ type NodeWithExtras struct { Labels map[string]string `json:"labels,omitempty"` } +// PendingBackendOp is a durable intent for a backend lifecycle operation +// (delete/install/upgrade) that needs to eventually apply on a specific node. +// +// Without this table, a backend delete against an offline node silently +// dropped: the frontend skipped the node, the node came back later with the +// backend still installed, and the operator saw a zombie. Now the intent is +// recorded regardless of node status; the state reconciler drains the queue +// whenever a node is healthy and removes the row on success. Reissuing the +// same operation while a row exists updates NextRetryAt instead of stacking +// duplicates (see the unique index). +type PendingBackendOp struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + NodeID string `gorm:"index;size:36;not null;uniqueIndex:idx_pending_backend_op,priority:1" json:"node_id"` + Backend string `gorm:"index;size:255;not null;uniqueIndex:idx_pending_backend_op,priority:2" json:"backend"` + Op string `gorm:"size:16;not null;uniqueIndex:idx_pending_backend_op,priority:3" json:"op"` + Galleries []byte `gorm:"type:bytea" json:"-"` // serialized JSON for install/upgrade retries + Attempts int `gorm:"default:0" json:"attempts"` + LastError string `gorm:"type:text" json:"last_error,omitempty"` + CreatedAt time.Time `json:"created_at"` + NextRetryAt time.Time `gorm:"index" json:"next_retry_at"` +} + +// Op constants mirror the operation names used by DistributedBackendManager +// so callers don't repeat stringly-typed values. +const ( + OpBackendDelete = "delete" + OpBackendInstall = "install" + OpBackendUpgrade = "upgrade" +) + // NodeRegistry manages backend node registration and lookup in PostgreSQL. type NodeRegistry struct { db *gorm.DB @@ -114,7 +144,7 @@ type NodeRegistry struct { // when multiple instances (frontend + workers) start at the same time. func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) { if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { - return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}) + return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{}) }); err != nil { return nil, fmt.Errorf("migrating node tables: %w", err) } @@ -946,3 +976,114 @@ func (r *NodeRegistry) ApplyAutoLabels(ctx context.Context, nodeID string, node _ = r.SetNodeLabel(ctx, nodeID, "node.name", node.Name) } } + +// UpsertPendingBackendOp records or refreshes a pending backend operation for +// a node. If a row already exists for (nodeID, backend, op) we keep its +// Attempts/LastError but reset NextRetryAt to now, so reissuing the same +// delete/upgrade nudges it to the front of the queue instead of stacking a +// duplicate intent. +func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backend, op string, galleries []byte) error { + row := PendingBackendOp{ + NodeID: nodeID, + Backend: backend, + Op: op, + Galleries: galleries, + NextRetryAt: time.Now(), + } + return r.db.WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}}, + DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}), + }).Create(&row).Error +} + +// ListDuePendingBackendOps returns queued ops whose NextRetryAt has passed +// AND whose node is currently healthy. The reconciler drains this list; we +// filter by node status in the query so a tick doesn't hammer NATS for +// nodes that obviously can't answer. +func (r *NodeRegistry) ListDuePendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) { + var ops []PendingBackendOp + err := r.db.WithContext(ctx). + Joins("JOIN backend_nodes ON backend_nodes.id = pending_backend_ops.node_id"). + Where("pending_backend_ops.next_retry_at <= ? AND backend_nodes.status = ?", time.Now(), StatusHealthy). + Order("pending_backend_ops.next_retry_at ASC"). + Find(&ops).Error + if err != nil { + return nil, fmt.Errorf("listing due pending backend ops: %w", err) + } + return ops, nil +} + +// ListPendingBackendOps returns every queued row (for the UI "pending on N +// nodes" chip and the pre-delete ConfirmDialog). +func (r *NodeRegistry) ListPendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) { + var ops []PendingBackendOp + if err := r.db.WithContext(ctx).Order("backend ASC, created_at ASC").Find(&ops).Error; err != nil { + return nil, fmt.Errorf("listing pending backend ops: %w", err) + } + return ops, nil +} + +// DeletePendingBackendOp removes a queue row — called after the op succeeds. +func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) error { + if err := r.db.WithContext(ctx).Delete(&PendingBackendOp{}, id).Error; err != nil { + return fmt.Errorf("deleting pending backend op %d: %w", id, err) + } + return nil +} + +// RecordPendingBackendOpFailure bumps Attempts, captures the error, and +// pushes NextRetryAt out with exponential backoff capped at 15 minutes. +func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error { + return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var row PendingBackendOp + if err := tx.First(&row, id).Error; err != nil { + return err + } + row.Attempts++ + row.LastError = errMsg + row.NextRetryAt = time.Now().Add(backoffForAttempt(row.Attempts)) + return tx.Save(&row).Error + }) +} + +// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The +// reconciler tick is 30s so anything shorter would just re-fire immediately. +func backoffForAttempt(attempts int) time.Duration { + const cap = 15 * time.Minute + base := 30 * time.Second + shift := attempts - 1 + if shift < 0 { + shift = 0 + } + if shift > 10 { // 2^10 * 30s already exceeds the cap + shift = 10 + } + d := base << shift + if d > cap { + return cap + } + return d +} + +// CountPendingBackendOpsByBackend returns a map of backend name to the count +// of pending rows. Used to decorate Manage → Backends with a "pending on N +// nodes" chip without exposing the full queue. +func (r *NodeRegistry) CountPendingBackendOpsByBackend(ctx context.Context) (map[string]int, error) { + type row struct { + Backend string + Count int + } + var rows []row + err := r.db.WithContext(ctx).Model(&PendingBackendOp{}). + Select("backend, COUNT(*) as count"). + Group("backend"). + Scan(&rows).Error + if err != nil { + return nil, fmt.Errorf("counting pending backend ops: %w", err) + } + out := make(map[string]int, len(rows)) + for _, r := range rows { + out[r.Backend] = r.Count + } + return out, nil +}