mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-19 14:17:21 -04:00
Compare commits
8 Commits
feat/distr
...
distribute
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
44e7d9806b | ||
|
|
7a9d89fa54 | ||
|
|
ee34a52c5d | ||
|
|
92b9e22dc9 | ||
|
|
f0ab68e352 | ||
|
|
9373de9f9b | ||
|
|
1b3c951c85 | ||
|
|
1f43762655 |
@@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut
|
||||
DB: authDB,
|
||||
})
|
||||
|
||||
// Create ReplicaReconciler for auto-scaling model replicas
|
||||
// Create ReplicaReconciler for auto-scaling model replicas. Adapter +
|
||||
// RegistrationToken feed the state-reconciliation passes: pending op
|
||||
// drain uses the adapter, and model health probes use the token to auth
|
||||
// against workers' gRPC HealthCheck.
|
||||
reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{
|
||||
Registry: registry,
|
||||
Scheduler: router,
|
||||
Unloader: remoteUnloader,
|
||||
DB: authDB,
|
||||
Interval: 30 * time.Second,
|
||||
ScaleDownDelay: 5 * time.Minute,
|
||||
Registry: registry,
|
||||
Scheduler: router,
|
||||
Unloader: remoteUnloader,
|
||||
Adapter: remoteUnloader,
|
||||
RegistrationToken: cfg.Distributed.RegistrationToken,
|
||||
DB: authDB,
|
||||
Interval: 30 * time.Second,
|
||||
ScaleDownDelay: 5 * time.Minute,
|
||||
ProbeStaleAfter: 2 * time.Minute,
|
||||
})
|
||||
|
||||
// Create ModelRouterAdapter to wire into ModelLoader
|
||||
|
||||
@@ -235,7 +235,12 @@ func New(opts ...config.AppOption) (*Application, error) {
|
||||
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
|
||||
// instance runs periodic checks (avoids duplicate upgrades across replicas).
|
||||
if len(options.BackendGalleries) > 0 {
|
||||
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB())
|
||||
// Pass a lazy getter for the backend manager so the checker always
|
||||
// uses the active one — DistributedBackendManager is swapped in above
|
||||
// and asks workers for their installed backends, which is what
|
||||
// upgrade detection needs in distributed mode.
|
||||
bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() }
|
||||
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn)
|
||||
application.upgradeChecker = uc
|
||||
go uc.Run(options.Context)
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/gallery"
|
||||
"github.com/mudler/LocalAI/core/services/advisorylock"
|
||||
"github.com/mudler/LocalAI/core/services/galleryop"
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/LocalAI/pkg/system"
|
||||
"github.com/mudler/xlog"
|
||||
@@ -26,6 +27,12 @@ type UpgradeChecker struct {
|
||||
galleries []config.Gallery
|
||||
systemState *system.SystemState
|
||||
db *gorm.DB // non-nil in distributed mode
|
||||
// backendManagerFn lazily returns the current backend manager (may be
|
||||
// swapped from Local to Distributed after startup). Pulled through each
|
||||
// check so the UpgradeChecker uses whichever is active. In distributed
|
||||
// mode this ensures CheckUpgrades asks workers instead of the (empty)
|
||||
// frontend filesystem — fixing the bug where upgrades never surfaced.
|
||||
backendManagerFn func() galleryop.BackendManager
|
||||
|
||||
checkInterval time.Duration
|
||||
stop chan struct{}
|
||||
@@ -40,18 +47,22 @@ type UpgradeChecker struct {
|
||||
// NewUpgradeChecker creates a new UpgradeChecker service.
|
||||
// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode
|
||||
// (uses advisory locks so only one instance runs periodic checks).
|
||||
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker {
|
||||
// backendManagerFn is optional; when set, CheckUpgrades is routed through
|
||||
// the active backend manager — required in distributed mode so the check
|
||||
// aggregates from workers rather than the empty frontend filesystem.
|
||||
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB, backendManagerFn func() galleryop.BackendManager) *UpgradeChecker {
|
||||
return &UpgradeChecker{
|
||||
appConfig: appConfig,
|
||||
modelLoader: ml,
|
||||
galleries: appConfig.BackendGalleries,
|
||||
systemState: appConfig.SystemState,
|
||||
db: db,
|
||||
checkInterval: 6 * time.Hour,
|
||||
stop: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
triggerCh: make(chan struct{}, 1),
|
||||
lastUpgrades: make(map[string]gallery.UpgradeInfo),
|
||||
appConfig: appConfig,
|
||||
modelLoader: ml,
|
||||
galleries: appConfig.BackendGalleries,
|
||||
systemState: appConfig.SystemState,
|
||||
db: db,
|
||||
backendManagerFn: backendManagerFn,
|
||||
checkInterval: 6 * time.Hour,
|
||||
stop: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
triggerCh: make(chan struct{}, 1),
|
||||
lastUpgrades: make(map[string]gallery.UpgradeInfo),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,13 +75,16 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade
|
||||
func (uc *UpgradeChecker) Run(ctx context.Context) {
|
||||
defer close(uc.done)
|
||||
|
||||
// Initial delay: don't slow down startup
|
||||
// Initial delay: don't slow down startup. Short enough that operators
|
||||
// don't stare at an empty upgrade banner for long; long enough that
|
||||
// workers have registered and reported their installed backends.
|
||||
initialDelay := 10 * time.Second
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-uc.stop:
|
||||
return
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(initialDelay):
|
||||
}
|
||||
|
||||
// First check always runs locally (to warm the cache on this instance)
|
||||
@@ -144,7 +158,18 @@ func (uc *UpgradeChecker) GetAvailableUpgrades() map[string]gallery.UpgradeInfo
|
||||
}
|
||||
|
||||
func (uc *UpgradeChecker) runCheck(ctx context.Context) {
|
||||
upgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
|
||||
var (
|
||||
upgrades map[string]gallery.UpgradeInfo
|
||||
err error
|
||||
)
|
||||
if uc.backendManagerFn != nil {
|
||||
if bm := uc.backendManagerFn(); bm != nil {
|
||||
upgrades, err = bm.CheckUpgrades(ctx)
|
||||
}
|
||||
}
|
||||
if upgrades == nil && err == nil {
|
||||
upgrades, err = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
|
||||
}
|
||||
|
||||
uc.mu.Lock()
|
||||
uc.lastCheckTime = time.Now()
|
||||
|
||||
@@ -738,6 +738,9 @@ func (s *backendSupervisor) subscribeLifecycleEvents() {
|
||||
if b.Metadata != nil {
|
||||
info.InstalledAt = b.Metadata.InstalledAt
|
||||
info.GalleryURL = b.Metadata.GalleryURL
|
||||
info.Version = b.Metadata.Version
|
||||
info.URI = b.Metadata.URI
|
||||
info.Digest = b.Metadata.Digest
|
||||
}
|
||||
infos = append(infos, info)
|
||||
}
|
||||
|
||||
@@ -394,6 +394,23 @@ type SystemBackend struct {
|
||||
Metadata *BackendMetadata
|
||||
UpgradeAvailable bool `json:"upgrade_available,omitempty"`
|
||||
AvailableVersion string `json:"available_version,omitempty"`
|
||||
// Nodes holds per-node attribution in distributed mode. Empty in single-node.
|
||||
// Each entry describes a node that has this backend installed, with the
|
||||
// version/digest it reports. Lets the UI surface drift and per-node status.
|
||||
Nodes []NodeBackendRef `json:"nodes,omitempty"`
|
||||
}
|
||||
|
||||
// NodeBackendRef describes one node's view of an installed backend. Used both
|
||||
// for per-node attribution in the UI and for drift detection during upgrade
|
||||
// checks (a cluster with mismatched versions/digests is flagged upgradeable).
|
||||
type NodeBackendRef struct {
|
||||
NodeID string `json:"node_id"`
|
||||
NodeName string `json:"node_name"`
|
||||
NodeStatus string `json:"node_status"` // healthy | unhealthy | offline | draining | pending
|
||||
Version string `json:"version,omitempty"`
|
||||
Digest string `json:"digest,omitempty"`
|
||||
URI string `json:"uri,omitempty"`
|
||||
InstalledAt string `json:"installed_at,omitempty"`
|
||||
}
|
||||
|
||||
type SystemBackends map[string]SystemBackend
|
||||
|
||||
@@ -23,22 +23,45 @@ type UpgradeInfo struct {
|
||||
AvailableVersion string `json:"available_version"`
|
||||
InstalledDigest string `json:"installed_digest,omitempty"`
|
||||
AvailableDigest string `json:"available_digest,omitempty"`
|
||||
// NodeDrift lists nodes whose installed version or digest differs from
|
||||
// the cluster majority. Non-empty means the cluster has diverged and an
|
||||
// upgrade will realign it. Empty in single-node mode.
|
||||
NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"`
|
||||
}
|
||||
|
||||
// CheckBackendUpgrades compares installed backends against gallery entries
|
||||
// and returns a map of backend names to UpgradeInfo for those that have
|
||||
// newer versions or different OCI digests available.
|
||||
// NodeDriftInfo describes one node that disagrees with the cluster majority
|
||||
// on which version/digest of a backend is installed.
|
||||
type NodeDriftInfo struct {
|
||||
NodeID string `json:"node_id"`
|
||||
NodeName string `json:"node_name"`
|
||||
Version string `json:"version,omitempty"`
|
||||
Digest string `json:"digest,omitempty"`
|
||||
}
|
||||
|
||||
// CheckBackendUpgrades is the single-node entrypoint. Distributed callers use
|
||||
// CheckUpgradesAgainst directly with their aggregated SystemBackends.
|
||||
func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState) (map[string]UpgradeInfo, error) {
|
||||
installed, err := ListSystemBackends(systemState)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list installed backends: %w", err)
|
||||
}
|
||||
return CheckUpgradesAgainst(ctx, galleries, systemState, installed)
|
||||
}
|
||||
|
||||
// CheckUpgradesAgainst compares a caller-supplied SystemBackends set against
|
||||
// the gallery. Fixes the distributed-mode bug where the old code passed the
|
||||
// frontend's (empty) local filesystem through ListSystemBackends and so never
|
||||
// surfaced any upgrades.
|
||||
//
|
||||
// Cluster drift policy: if a backend's per-node versions/digests disagree, the
|
||||
// row is flagged upgradeable regardless of whether any node matches the gallery
|
||||
// — next Upgrade All realigns the cluster. NodeDrift lists the outliers.
|
||||
func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) {
|
||||
galleryBackends, err := AvailableBackends(galleries, systemState)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list available backends: %w", err)
|
||||
}
|
||||
|
||||
installedBackends, err := ListSystemBackends(systemState)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list installed backends: %w", err)
|
||||
}
|
||||
|
||||
result := make(map[string]UpgradeInfo)
|
||||
|
||||
for _, installed := range installedBackends {
|
||||
@@ -57,34 +80,48 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste
|
||||
}
|
||||
|
||||
installedVersion := installed.Metadata.Version
|
||||
installedDigest := installed.Metadata.Digest
|
||||
galleryVersion := galleryEntry.Version
|
||||
|
||||
// If both sides have versions, compare them
|
||||
// Detect cluster drift: does every node report the same version+digest?
|
||||
// In single-node mode this stays empty (Nodes is nil).
|
||||
majority, drift := summarizeNodeDrift(installed.Nodes)
|
||||
if majority.version != "" {
|
||||
installedVersion = majority.version
|
||||
}
|
||||
if majority.digest != "" {
|
||||
installedDigest = majority.digest
|
||||
}
|
||||
|
||||
makeInfo := func(info UpgradeInfo) UpgradeInfo {
|
||||
info.NodeDrift = drift
|
||||
return info
|
||||
}
|
||||
|
||||
// If versions are available on both sides, they're the source of truth.
|
||||
if galleryVersion != "" && installedVersion != "" {
|
||||
if galleryVersion != installedVersion {
|
||||
result[installed.Metadata.Name] = UpgradeInfo{
|
||||
if galleryVersion != installedVersion || len(drift) > 0 {
|
||||
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
|
||||
BackendName: installed.Metadata.Name,
|
||||
InstalledVersion: installedVersion,
|
||||
AvailableVersion: galleryVersion,
|
||||
}
|
||||
})
|
||||
}
|
||||
// Versions match — no upgrade needed
|
||||
continue
|
||||
}
|
||||
|
||||
// Gallery has a version but installed doesn't — this happens for backends
|
||||
// installed before version tracking was added. Flag as upgradeable so
|
||||
// users can re-install to pick up version metadata.
|
||||
// Gallery has a version but installed doesn't — backends installed before
|
||||
// version tracking was added. Flag as upgradeable to pick up metadata.
|
||||
if galleryVersion != "" && installedVersion == "" {
|
||||
result[installed.Metadata.Name] = UpgradeInfo{
|
||||
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
|
||||
BackendName: installed.Metadata.Name,
|
||||
InstalledVersion: "",
|
||||
AvailableVersion: galleryVersion,
|
||||
}
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Fall back to OCI digest comparison when versions are unavailable
|
||||
// Fall back to OCI digest comparison when versions are unavailable.
|
||||
if downloader.URI(galleryEntry.URI).LooksLikeOCI() {
|
||||
remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil)
|
||||
if err != nil {
|
||||
@@ -92,21 +129,68 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste
|
||||
continue
|
||||
}
|
||||
// If we have a stored digest, compare; otherwise any remote digest
|
||||
// means we can't confirm we're up to date — flag as upgradeable
|
||||
if installed.Metadata.Digest == "" || remoteDigest != installed.Metadata.Digest {
|
||||
result[installed.Metadata.Name] = UpgradeInfo{
|
||||
// means we can't confirm we're up to date — flag as upgradeable.
|
||||
if installedDigest == "" || remoteDigest != installedDigest || len(drift) > 0 {
|
||||
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
|
||||
BackendName: installed.Metadata.Name,
|
||||
InstalledDigest: installed.Metadata.Digest,
|
||||
InstalledDigest: installedDigest,
|
||||
AvailableDigest: remoteDigest,
|
||||
}
|
||||
})
|
||||
}
|
||||
} else if len(drift) > 0 {
|
||||
// No version/digest path but nodes disagree — still worth flagging.
|
||||
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
|
||||
BackendName: installed.Metadata.Name,
|
||||
InstalledVersion: installedVersion,
|
||||
InstalledDigest: installedDigest,
|
||||
})
|
||||
}
|
||||
// No version info and non-OCI URI — cannot determine, skip
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// summarizeNodeDrift collapses per-node version/digest tuples to a majority
|
||||
// pair and returns the outliers. In single-node mode (empty nodes slice) this
|
||||
// returns zero values and a nil drift list.
|
||||
func summarizeNodeDrift(nodes []NodeBackendRef) (majority struct{ version, digest string }, drift []NodeDriftInfo) {
|
||||
if len(nodes) == 0 {
|
||||
return majority, nil
|
||||
}
|
||||
|
||||
type key struct{ version, digest string }
|
||||
counts := map[key]int{}
|
||||
var topKey key
|
||||
var topCount int
|
||||
for _, n := range nodes {
|
||||
k := key{n.Version, n.Digest}
|
||||
counts[k]++
|
||||
if counts[k] > topCount {
|
||||
topCount = counts[k]
|
||||
topKey = k
|
||||
}
|
||||
}
|
||||
|
||||
majority.version = topKey.version
|
||||
majority.digest = topKey.digest
|
||||
|
||||
if len(counts) == 1 {
|
||||
return majority, nil // unanimous — no drift
|
||||
}
|
||||
for _, n := range nodes {
|
||||
if n.Version == majority.version && n.Digest == majority.digest {
|
||||
continue
|
||||
}
|
||||
drift = append(drift, NodeDriftInfo{
|
||||
NodeID: n.NodeID,
|
||||
NodeName: n.NodeName,
|
||||
Version: n.Version,
|
||||
Digest: n.Digest,
|
||||
})
|
||||
}
|
||||
return majority, drift
|
||||
}
|
||||
|
||||
// UpgradeBackend upgrades a single backend to the latest gallery version using
|
||||
// an atomic swap with backup-based rollback on failure.
|
||||
func UpgradeBackend(ctx context.Context, systemState *system.SystemState, modelLoader *model.ModelLoader, galleries []config.Gallery, backendName string, downloadStatus func(string, string, string, float64)) error {
|
||||
|
||||
@@ -144,6 +144,97 @@ var _ = Describe("Upgrade Detection and Execution", func() {
|
||||
})
|
||||
})
|
||||
|
||||
// CheckUpgradesAgainst is the entry point used by DistributedBackendManager.
|
||||
// It takes installed backends directly — typically aggregated from workers —
|
||||
// instead of reading the frontend filesystem. These tests exercise drift
|
||||
// detection, which is the feature the distributed path relies on.
|
||||
Describe("CheckUpgradesAgainst (distributed)", func() {
|
||||
It("flags upgrade when cluster nodes disagree on version, even if gallery matches majority", func() {
|
||||
writeGalleryYAML([]GalleryBackend{
|
||||
{
|
||||
Metadata: Metadata{Name: "my-backend"},
|
||||
URI: filepath.Join(tempDir, "some-source"),
|
||||
Version: "2.0.0",
|
||||
},
|
||||
})
|
||||
|
||||
installed := SystemBackends{
|
||||
"my-backend": SystemBackend{
|
||||
Name: "my-backend",
|
||||
Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"},
|
||||
Nodes: []NodeBackendRef{
|
||||
{NodeID: "a", NodeName: "worker-1", Version: "2.0.0"},
|
||||
{NodeID: "b", NodeName: "worker-2", Version: "2.0.0"},
|
||||
{NodeID: "c", NodeName: "worker-3", Version: "1.0.0"}, // drift
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(upgrades).To(HaveKey("my-backend"))
|
||||
info := upgrades["my-backend"]
|
||||
Expect(info.AvailableVersion).To(Equal("2.0.0"))
|
||||
Expect(info.NodeDrift).To(HaveLen(1))
|
||||
Expect(info.NodeDrift[0].NodeName).To(Equal("worker-3"))
|
||||
Expect(info.NodeDrift[0].Version).To(Equal("1.0.0"))
|
||||
})
|
||||
|
||||
It("does not flag upgrade when all nodes agree and match gallery", func() {
|
||||
writeGalleryYAML([]GalleryBackend{
|
||||
{
|
||||
Metadata: Metadata{Name: "my-backend"},
|
||||
URI: filepath.Join(tempDir, "some-source"),
|
||||
Version: "2.0.0",
|
||||
},
|
||||
})
|
||||
|
||||
installed := SystemBackends{
|
||||
"my-backend": SystemBackend{
|
||||
Name: "my-backend",
|
||||
Metadata: &BackendMetadata{Name: "my-backend", Version: "2.0.0"},
|
||||
Nodes: []NodeBackendRef{
|
||||
{NodeID: "a", NodeName: "worker-1", Version: "2.0.0"},
|
||||
{NodeID: "b", NodeName: "worker-2", Version: "2.0.0"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(upgrades).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("surfaces empty-installed-version path the old distributed code silently missed", func() {
|
||||
// Simulates the real-world bug: worker has a backend, its version
|
||||
// is empty (pre-tracking or OCI-pinned-to-latest), gallery has a
|
||||
// version. Pre-fix CheckUpgrades returned nothing; now it surfaces.
|
||||
writeGalleryYAML([]GalleryBackend{
|
||||
{
|
||||
Metadata: Metadata{Name: "my-backend"},
|
||||
URI: filepath.Join(tempDir, "some-source"),
|
||||
Version: "2.0.0",
|
||||
},
|
||||
})
|
||||
|
||||
installed := SystemBackends{
|
||||
"my-backend": SystemBackend{
|
||||
Name: "my-backend",
|
||||
Metadata: &BackendMetadata{Name: "my-backend"},
|
||||
Nodes: []NodeBackendRef{
|
||||
{NodeID: "a", NodeName: "worker-1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
upgrades, err := CheckUpgradesAgainst(context.Background(), galleries, systemState, installed)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(upgrades).To(HaveKey("my-backend"))
|
||||
Expect(upgrades["my-backend"].InstalledVersion).To(BeEmpty())
|
||||
Expect(upgrades["my-backend"].AvailableVersion).To(Equal("2.0.0"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("UpgradeBackend", func() {
|
||||
It("should replace backend directory and update metadata", func() {
|
||||
// Install v1
|
||||
|
||||
@@ -1529,6 +1529,401 @@ select.input {
|
||||
background: var(--color-warning-light);
|
||||
color: var(--color-warning);
|
||||
}
|
||||
.badge-accent {
|
||||
background: var(--color-accent-light);
|
||||
color: var(--color-accent);
|
||||
}
|
||||
|
||||
/* Horizontal row of badges used inside table cells — consistent spacing so
|
||||
cells line up regardless of how many badges are present. */
|
||||
.badge-row {
|
||||
display: inline-flex;
|
||||
flex-wrap: wrap;
|
||||
gap: 4px;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
/* Vertically stacked cell content (e.g. version + update chip + drift chip).
|
||||
Keeps rows readable at scale without inline style={{...}} everywhere. */
|
||||
.cell-stack {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 4px;
|
||||
align-items: flex-start;
|
||||
}
|
||||
|
||||
.cell-mono {
|
||||
font-family: 'JetBrains Mono', ui-monospace, monospace;
|
||||
font-size: var(--text-xs);
|
||||
color: var(--color-text-primary);
|
||||
}
|
||||
|
||||
.cell-muted {
|
||||
color: var(--color-text-muted);
|
||||
font-size: var(--text-xs);
|
||||
}
|
||||
|
||||
.cell-subtle {
|
||||
color: var(--color-text-muted);
|
||||
font-size: var(--text-xs);
|
||||
font-weight: 400;
|
||||
margin-left: 8px;
|
||||
}
|
||||
|
||||
.cell-name {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: var(--spacing-xs);
|
||||
font-weight: 500;
|
||||
}
|
||||
.cell-name > i {
|
||||
color: var(--color-accent);
|
||||
font-size: var(--text-xs);
|
||||
}
|
||||
|
||||
.row-actions {
|
||||
display: flex;
|
||||
gap: var(--spacing-xs);
|
||||
justify-content: flex-end;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
/* Softer delete button for dense tables — the destructive confirm dialog
|
||||
already owns the "are you sure" affordance, so the button itself doesn't
|
||||
need to scream. Keeps the delete red readable without dominating rows. */
|
||||
.btn.btn-danger-ghost {
|
||||
background: transparent;
|
||||
color: var(--color-error);
|
||||
border-color: transparent;
|
||||
}
|
||||
.btn.btn-danger-ghost:hover:not(:disabled) {
|
||||
background: var(--color-error-light);
|
||||
color: var(--color-error);
|
||||
border-color: var(--color-error-light);
|
||||
}
|
||||
|
||||
/* Small count pill used inside tabs ("(3) ↑ 2") so update counts are
|
||||
glanceable without extra rows of UI. */
|
||||
.tab-pill {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 3px;
|
||||
margin-left: 6px;
|
||||
padding: 1px 6px;
|
||||
border-radius: var(--radius-full);
|
||||
font-size: var(--text-xs);
|
||||
font-weight: 600;
|
||||
line-height: 1.4;
|
||||
}
|
||||
.tab-pill--warning {
|
||||
background: var(--color-warning-light);
|
||||
color: var(--color-warning);
|
||||
}
|
||||
|
||||
/* Stat cards — uniform-height cluster metrics for the Nodes dashboard.
|
||||
Left accent bar ties the color to the metric's semantic (success/warning/
|
||||
error/primary), icon chip sits top-right, value is left-aligned and
|
||||
prominent so you can scan a row of cards without reading labels. */
|
||||
.stat-grid {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(auto-fill, minmax(180px, 1fr));
|
||||
gap: var(--spacing-md);
|
||||
margin-bottom: var(--spacing-xl);
|
||||
}
|
||||
|
||||
.stat-card {
|
||||
position: relative;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: space-between;
|
||||
gap: var(--spacing-sm);
|
||||
padding: var(--spacing-md);
|
||||
min-height: 96px;
|
||||
background: var(--color-bg-raised, var(--color-bg-secondary));
|
||||
border: 1px solid var(--color-border-subtle);
|
||||
border-radius: var(--radius-lg);
|
||||
transition: transform var(--duration-fast) var(--ease-default),
|
||||
box-shadow var(--duration-fast) var(--ease-default),
|
||||
border-color var(--duration-fast) var(--ease-default);
|
||||
overflow: hidden;
|
||||
}
|
||||
.stat-card::before {
|
||||
content: '';
|
||||
position: absolute;
|
||||
left: 0; top: 0; bottom: 0;
|
||||
width: 3px;
|
||||
background: var(--stat-accent, var(--color-border-subtle));
|
||||
transition: background var(--duration-fast) var(--ease-default);
|
||||
}
|
||||
.stat-card:hover {
|
||||
transform: translateY(-1px);
|
||||
box-shadow: var(--shadow-sm);
|
||||
border-color: var(--color-border);
|
||||
}
|
||||
|
||||
.stat-card__body {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 6px;
|
||||
min-width: 0;
|
||||
}
|
||||
.stat-card__label {
|
||||
font-size: var(--text-xs);
|
||||
font-weight: 600;
|
||||
letter-spacing: 0.08em;
|
||||
text-transform: uppercase;
|
||||
color: var(--color-text-muted);
|
||||
white-space: normal;
|
||||
line-height: 1.2;
|
||||
}
|
||||
.stat-card__value {
|
||||
font-size: var(--text-2xl);
|
||||
font-weight: 600;
|
||||
font-family: 'JetBrains Mono', ui-monospace, monospace;
|
||||
line-height: 1;
|
||||
color: var(--color-text-primary);
|
||||
word-break: break-word;
|
||||
}
|
||||
.stat-card__icon {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
width: 36px;
|
||||
height: 36px;
|
||||
border-radius: var(--radius-md);
|
||||
background: color-mix(in srgb, var(--stat-accent, var(--color-text-muted)) 12%, transparent);
|
||||
color: var(--stat-accent, var(--color-text-muted));
|
||||
font-size: var(--text-lg);
|
||||
flex-shrink: 0;
|
||||
}
|
||||
|
||||
/* Subtle "Register a new worker" trigger replacing the broken-text chevron
|
||||
link. Still opens the same hint card — just reads like a button now. */
|
||||
.nodes-add-worker {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: var(--spacing-xs);
|
||||
padding: var(--spacing-xs) var(--spacing-sm);
|
||||
background: transparent;
|
||||
border: 1px dashed var(--color-border);
|
||||
border-radius: var(--radius-md);
|
||||
color: var(--color-text-secondary);
|
||||
font-size: var(--text-sm);
|
||||
font-family: inherit;
|
||||
font-weight: 500;
|
||||
cursor: pointer;
|
||||
margin-bottom: var(--spacing-md);
|
||||
transition: background var(--duration-fast) var(--ease-default),
|
||||
border-color var(--duration-fast) var(--ease-default),
|
||||
color var(--duration-fast) var(--ease-default);
|
||||
}
|
||||
.nodes-add-worker:hover {
|
||||
background: var(--color-bg-raised, var(--color-bg-secondary));
|
||||
border-color: var(--color-border-strong);
|
||||
color: var(--color-text-primary);
|
||||
}
|
||||
|
||||
/* Shared FilterBar layout — search strip + chip row + toggle strip. Lives
|
||||
outside the .filter-bar chip row so the padding and wrapping behavior is
|
||||
consistent between the Backends gallery and the System tabs. */
|
||||
.filter-bar-group {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: var(--spacing-sm);
|
||||
margin-bottom: var(--spacing-md);
|
||||
}
|
||||
.filter-bar-group__search {
|
||||
min-width: 200px;
|
||||
flex: 1;
|
||||
}
|
||||
.filter-bar-group__row {
|
||||
display: flex;
|
||||
gap: var(--spacing-md);
|
||||
align-items: center;
|
||||
flex-wrap: wrap;
|
||||
}
|
||||
.filter-bar-group__right {
|
||||
display: flex;
|
||||
gap: var(--spacing-md);
|
||||
align-items: center;
|
||||
flex-wrap: wrap;
|
||||
padding-left: var(--spacing-md);
|
||||
border-left: 1px solid var(--color-border-subtle);
|
||||
}
|
||||
.filter-bar-group__toggle {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: var(--spacing-xs);
|
||||
font-size: var(--text-xs);
|
||||
color: var(--color-text-secondary);
|
||||
cursor: pointer;
|
||||
user-select: none;
|
||||
white-space: nowrap;
|
||||
}
|
||||
.filter-btn__count {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
margin-left: 6px;
|
||||
min-width: 18px;
|
||||
padding: 0 5px;
|
||||
background: color-mix(in srgb, currentColor 18%, transparent);
|
||||
border-radius: var(--radius-full);
|
||||
font-size: 0.625rem;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
/* Popover — floating surface anchored to a trigger element. Uses the .card
|
||||
base so theming is free, adds z-index + fixed-position + scroll cap so it
|
||||
behaves on tables with many rows. Kept deliberately unstyled beyond that
|
||||
— content is expected to provide its own header/body structure. */
|
||||
.popover {
|
||||
position: fixed;
|
||||
z-index: 200;
|
||||
min-width: 260px;
|
||||
max-width: min(420px, 95vw);
|
||||
max-height: min(420px, 70vh);
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
padding: 0; /* sections provide their own padding */
|
||||
overflow: hidden;
|
||||
box-shadow: var(--shadow-lg);
|
||||
animation: popoverIn var(--duration-fast) var(--ease-default);
|
||||
}
|
||||
|
||||
@keyframes popoverIn {
|
||||
from { opacity: 0; transform: translateY(-4px) scale(0.98); }
|
||||
to { opacity: 1; transform: translateY(0) scale(1); }
|
||||
}
|
||||
|
||||
.popover__header {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: var(--spacing-sm);
|
||||
padding: var(--spacing-sm) var(--spacing-md);
|
||||
border-bottom: 1px solid var(--color-border-subtle);
|
||||
font-size: var(--text-sm);
|
||||
}
|
||||
|
||||
.popover__scroll {
|
||||
overflow: auto;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
.popover__table {
|
||||
margin: 0;
|
||||
width: 100%;
|
||||
}
|
||||
.popover__table th {
|
||||
position: sticky;
|
||||
top: 0;
|
||||
background: var(--color-bg-raised, var(--color-bg-secondary));
|
||||
z-index: 1;
|
||||
}
|
||||
|
||||
/* Inline-table chip trigger — looks like a badge but is a button (cursor,
|
||||
focus ring inherited from global :focus-visible). */
|
||||
.chip-trigger {
|
||||
border: none;
|
||||
cursor: pointer;
|
||||
font-family: inherit;
|
||||
}
|
||||
.chip-trigger:hover {
|
||||
filter: brightness(1.08);
|
||||
}
|
||||
|
||||
/* Truncate + ellipsize a long cell (e.g. OCI digest) without breaking the
|
||||
table layout. Tooltip preserves the full value. */
|
||||
.cell-truncate {
|
||||
max-width: 160px;
|
||||
overflow: hidden;
|
||||
text-overflow: ellipsis;
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
/* Compact empty-state used inside expanded drawer sections (e.g. "No
|
||||
models loaded on this node"). Dimmer than the page-level .empty-state
|
||||
because it lives inside another container and shouldn't compete with
|
||||
the row's primary content. */
|
||||
.drawer-empty {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: var(--spacing-sm);
|
||||
padding: var(--spacing-sm) var(--spacing-md);
|
||||
background: var(--color-bg-tertiary);
|
||||
border: 1px dashed var(--color-border-subtle);
|
||||
border-radius: var(--radius-md);
|
||||
color: var(--color-text-muted);
|
||||
font-size: var(--text-sm);
|
||||
}
|
||||
.drawer-empty > i {
|
||||
font-size: var(--text-sm);
|
||||
color: var(--color-text-muted);
|
||||
opacity: 0.8;
|
||||
}
|
||||
|
||||
/* Node-status indicator — replaces the tiny bullet with a proper LED-style
|
||||
dot next to a bold status label. Colors are applied inline from statusConfig
|
||||
so one primitive handles healthy/unhealthy/draining/pending in one shape. */
|
||||
.node-status {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 8px;
|
||||
font-size: var(--text-sm);
|
||||
font-weight: 600;
|
||||
}
|
||||
.node-status__dot {
|
||||
width: 8px;
|
||||
height: 8px;
|
||||
border-radius: 50%;
|
||||
box-shadow: 0 0 0 3px color-mix(in srgb, currentColor 15%, transparent);
|
||||
flex-shrink: 0;
|
||||
}
|
||||
|
||||
/* Row-chevron cell — small 20px toggle used in table rows that expand.
|
||||
The row itself is still clickable; the chevron provides the visible
|
||||
affordance users were missing. */
|
||||
.row-chevron {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
width: 20px;
|
||||
height: 20px;
|
||||
font-size: var(--text-xs);
|
||||
color: var(--color-text-muted);
|
||||
transition: transform var(--duration-fast) var(--ease-default);
|
||||
}
|
||||
.row-chevron.is-expanded {
|
||||
transform: rotate(90deg);
|
||||
color: var(--color-text-primary);
|
||||
}
|
||||
|
||||
/* Upgrade banner — the yellow strip operators see when updates are available.
|
||||
Mirrors the gallery so both pages speak the same visual language. */
|
||||
.upgrade-banner {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: space-between;
|
||||
gap: var(--spacing-md);
|
||||
padding: var(--spacing-sm) var(--spacing-md);
|
||||
margin-bottom: var(--spacing-md);
|
||||
background: var(--color-warning-light);
|
||||
border: 1px solid var(--color-warning);
|
||||
border-radius: var(--radius-md);
|
||||
color: var(--color-warning);
|
||||
}
|
||||
.upgrade-banner__text {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: var(--spacing-sm);
|
||||
font-weight: 500;
|
||||
font-size: var(--text-sm);
|
||||
}
|
||||
.upgrade-banner__actions {
|
||||
display: inline-flex;
|
||||
gap: var(--spacing-xs);
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
/* Tabs */
|
||||
.tabs {
|
||||
|
||||
87
core/http/react-ui/src/components/FilterBar.jsx
Normal file
87
core/http/react-ui/src/components/FilterBar.jsx
Normal file
@@ -0,0 +1,87 @@
|
||||
import Toggle from './Toggle'
|
||||
|
||||
// FilterBar is the shared search + chip filter + toggles control strip that
|
||||
// the Backends gallery pioneered. Pulled into its own component so the System
|
||||
// page's two tabs stop looking like a different app — matching visual
|
||||
// grammar + matching keyboard behavior.
|
||||
//
|
||||
// Props:
|
||||
// search: controlled value for the search input.
|
||||
// onSearchChange: (value) => void; null disables the search input entirely.
|
||||
// searchPlaceholder: placeholder for the search input.
|
||||
// filters: [{ key, label, icon }]; activeFilter is compared by key.
|
||||
// Omit to hide the chip row.
|
||||
// activeFilter: currently-selected filter key (use '' for "all" if
|
||||
// that's the first entry in `filters`).
|
||||
// onFilterChange: (key) => void.
|
||||
// toggles: [{ key, label, icon?, checked, onChange }]; optional
|
||||
// right-side toggle group (e.g. "Show all", "Development").
|
||||
// rightSlot: arbitrary element rendered after the toggles — use for
|
||||
// sort controls or extra buttons.
|
||||
export default function FilterBar({
|
||||
search,
|
||||
onSearchChange,
|
||||
searchPlaceholder = 'Search...',
|
||||
filters,
|
||||
activeFilter,
|
||||
onFilterChange,
|
||||
toggles,
|
||||
rightSlot,
|
||||
}) {
|
||||
const hasFilters = Array.isArray(filters) && filters.length > 0
|
||||
const hasToggles = Array.isArray(toggles) && toggles.length > 0
|
||||
|
||||
return (
|
||||
<div className="filter-bar-group">
|
||||
{onSearchChange && (
|
||||
<div className="search-bar filter-bar-group__search">
|
||||
<i className="fas fa-search search-icon" />
|
||||
<input
|
||||
className="input"
|
||||
placeholder={searchPlaceholder}
|
||||
value={search ?? ''}
|
||||
onChange={e => onSearchChange(e.target.value)}
|
||||
aria-label={searchPlaceholder}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{(hasFilters || hasToggles || rightSlot) && (
|
||||
<div className="filter-bar-group__row">
|
||||
{hasFilters && (
|
||||
<div className="filter-bar" role="tablist" aria-label="Filter">
|
||||
{filters.map(f => (
|
||||
<button
|
||||
key={f.key}
|
||||
role="tab"
|
||||
aria-selected={activeFilter === f.key}
|
||||
className={`filter-btn ${activeFilter === f.key ? 'active' : ''}`}
|
||||
onClick={() => onFilterChange(f.key)}
|
||||
>
|
||||
{f.icon && <i className={`fas ${f.icon}`} style={{ marginRight: 4 }} />}
|
||||
{f.label}
|
||||
{typeof f.count === 'number' && (
|
||||
<span className="filter-btn__count">{f.count}</span>
|
||||
)}
|
||||
</button>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
|
||||
{(hasToggles || rightSlot) && (
|
||||
<div className="filter-bar-group__right">
|
||||
{hasToggles && toggles.map(t => (
|
||||
<label key={t.key} className="filter-bar-group__toggle">
|
||||
<Toggle checked={t.checked} onChange={t.onChange} />
|
||||
{t.icon && <i className={`fas ${t.icon}`} />}
|
||||
{t.label}
|
||||
</label>
|
||||
))}
|
||||
{rightSlot}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
168
core/http/react-ui/src/components/NodeDistributionChip.jsx
Normal file
168
core/http/react-ui/src/components/NodeDistributionChip.jsx
Normal file
@@ -0,0 +1,168 @@
|
||||
import { useRef, useState } from 'react'
|
||||
import Popover from './Popover'
|
||||
|
||||
// NodeDistributionChip shows where something is installed/loaded across a
|
||||
// cluster. Used by both Manage → Backends (per-row Nodes column, data =
|
||||
// gallery NodeBackendRef with version/digest) and by the Models tab (data =
|
||||
// LoadedOn with state/status). Supports arbitrary cluster size — small
|
||||
// clusters render node-name chips inline, larger clusters collapse to a
|
||||
// summary chip and reveal the full per-node table in a popover on click.
|
||||
//
|
||||
// Field names are intentionally forgiving: both {node_name, node_status} and
|
||||
// {NodeName, NodeStatus} are supported so the component works whether it's
|
||||
// reading directly off the JSON or off a hydrated class.
|
||||
//
|
||||
// Props:
|
||||
// nodes: array of node refs (see shape below).
|
||||
// compactThreshold: max nodes to render inline before collapsing (default 3).
|
||||
// context: 'backends' (default) shows version/digest; 'models'
|
||||
// shows state.
|
||||
// emptyLabel: what to render when nodes is empty (default "—").
|
||||
export default function NodeDistributionChip({
|
||||
nodes,
|
||||
compactThreshold = 3,
|
||||
context = 'backends',
|
||||
emptyLabel = '—',
|
||||
}) {
|
||||
const triggerRef = useRef(null)
|
||||
const [open, setOpen] = useState(false)
|
||||
|
||||
const list = Array.isArray(nodes) ? nodes : []
|
||||
if (list.length === 0) {
|
||||
return <span className="cell-muted">{emptyLabel}</span>
|
||||
}
|
||||
|
||||
const getName = n => n.node_name ?? n.NodeName ?? ''
|
||||
const getStatus = n => n.node_status ?? n.NodeStatus ?? ''
|
||||
const getState = n => n.state ?? n.State ?? ''
|
||||
const getVersion = n => n.version ?? n.Version ?? ''
|
||||
const getDigest = n => n.digest ?? n.Digest ?? ''
|
||||
|
||||
// Inline mode: render every node as its own chip. Good for small clusters
|
||||
// where seeing the names directly is more useful than a summary.
|
||||
if (list.length <= compactThreshold) {
|
||||
return (
|
||||
<div className="badge-row">
|
||||
{list.map(n => {
|
||||
const status = getStatus(n)
|
||||
const variant = status === 'healthy' ? 'badge-success'
|
||||
: status === 'draining' ? 'badge-info'
|
||||
: 'badge-warning'
|
||||
const title = context === 'models'
|
||||
? `${getName(n)} — ${getState(n)} (${status})`
|
||||
: `${getName(n)} — ${status}${getVersion(n) ? ` · v${getVersion(n)}` : ''}`
|
||||
return (
|
||||
<span key={n.node_id ?? n.NodeID ?? getName(n)} className={`badge ${variant}`} title={title}>
|
||||
<i className="fas fa-server" /> {getName(n)}
|
||||
</span>
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
// Summary mode for anything bigger. Count unhealthy/offline explicitly so
|
||||
// the chip tells an operator at-a-glance whether to click in. "Drift" for
|
||||
// backends = more than one (version, digest) tuple across healthy nodes.
|
||||
const total = list.length
|
||||
const offline = list.filter(n => {
|
||||
const s = getStatus(n)
|
||||
return s !== 'healthy' && s !== 'draining'
|
||||
}).length
|
||||
const drift = context === 'backends' ? countDrift(list) : 0
|
||||
const severity = offline > 0 || drift > 0 ? 'badge-warning' : 'badge-info'
|
||||
|
||||
return (
|
||||
<>
|
||||
<button
|
||||
ref={triggerRef}
|
||||
type="button"
|
||||
className={`badge ${severity} chip-trigger`}
|
||||
aria-expanded={open}
|
||||
aria-haspopup="dialog"
|
||||
onClick={e => { e.stopPropagation(); setOpen(v => !v) }}
|
||||
>
|
||||
<i className="fas fa-server" />
|
||||
{' '}on {total} node{total === 1 ? '' : 's'}
|
||||
{offline > 0 ? ` · ${offline} offline` : ''}
|
||||
{drift > 0 ? ` · ${drift} drift` : ''}
|
||||
</button>
|
||||
<Popover
|
||||
anchor={triggerRef}
|
||||
open={open}
|
||||
onClose={() => setOpen(false)}
|
||||
ariaLabel={context === 'models' ? 'Model distribution' : 'Backend distribution'}
|
||||
>
|
||||
<div className="popover__header">
|
||||
<strong>Installed on {total} node{total === 1 ? '' : 's'}</strong>
|
||||
{offline > 0 && <span className="badge badge-warning">{offline} offline</span>}
|
||||
{drift > 0 && <span className="badge badge-warning">{drift} drift</span>}
|
||||
</div>
|
||||
<div className="popover__scroll">
|
||||
<table className="table popover__table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Node</th>
|
||||
<th>Status</th>
|
||||
{context === 'models' ? <th>State</th> : <>
|
||||
<th>Version</th>
|
||||
<th>Digest</th>
|
||||
</>}
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{list.map(n => (
|
||||
<tr key={n.node_id ?? n.NodeID ?? getName(n)}>
|
||||
<td className="cell-mono">{getName(n)}</td>
|
||||
<td>
|
||||
<span className={`badge ${getStatus(n) === 'healthy' ? 'badge-success' : 'badge-warning'}`}>
|
||||
{getStatus(n)}
|
||||
</span>
|
||||
</td>
|
||||
{context === 'models' ? (
|
||||
<td className="cell-mono">{getState(n) || '—'}</td>
|
||||
) : (
|
||||
<>
|
||||
<td className="cell-mono">{getVersion(n) ? `v${getVersion(n)}` : '—'}</td>
|
||||
<td className="cell-mono cell-truncate" title={getDigest(n)}>
|
||||
{getDigest(n) ? shortenDigest(getDigest(n)) : '—'}
|
||||
</td>
|
||||
</>
|
||||
)}
|
||||
</tr>
|
||||
))}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</Popover>
|
||||
</>
|
||||
)
|
||||
}
|
||||
|
||||
// countDrift counts nodes whose (version, digest) disagrees with the cluster
|
||||
// majority. Mirrors the backend summarizeNodeDrift logic so the UI number
|
||||
// matches what CheckUpgradesAgainst emits in UpgradeInfo.NodeDrift.
|
||||
function countDrift(nodes) {
|
||||
if (nodes.length <= 1) return 0
|
||||
const counts = new Map()
|
||||
for (const n of nodes) {
|
||||
const key = `${n.version ?? n.Version ?? ''}|${n.digest ?? n.Digest ?? ''}`
|
||||
counts.set(key, (counts.get(key) || 0) + 1)
|
||||
}
|
||||
if (counts.size === 1) return 0 // unanimous
|
||||
let topKey = ''
|
||||
let topCount = 0
|
||||
for (const [k, v] of counts.entries()) {
|
||||
if (v > topCount) { topKey = k; topCount = v }
|
||||
}
|
||||
return nodes.length - topCount
|
||||
}
|
||||
|
||||
// shortenDigest trims a full OCI digest to the common 12-char form used in
|
||||
// docker/oci tooling. Falls back to the raw value if it doesn't match.
|
||||
function shortenDigest(digest) {
|
||||
const m = /^(sha\d+:)?([a-f0-9]+)$/i.exec(digest)
|
||||
if (!m) return digest
|
||||
const hex = m[2]
|
||||
return (m[1] ?? '') + hex.slice(0, 12)
|
||||
}
|
||||
86
core/http/react-ui/src/components/Popover.jsx
Normal file
86
core/http/react-ui/src/components/Popover.jsx
Normal file
@@ -0,0 +1,86 @@
|
||||
import { useEffect, useRef, useState, useCallback } from 'react'
|
||||
|
||||
// Minimal popover: positions itself below-right of the trigger's bounding box,
|
||||
// flips above when there isn't room below, closes on outside click or Escape,
|
||||
// returns focus to the trigger. Uses the existing .card surface so it picks
|
||||
// up theme/border/shadow automatically — no new theming work.
|
||||
//
|
||||
// Props:
|
||||
// anchor: ref to the trigger DOMElement (required)
|
||||
// open: boolean
|
||||
// onClose: () => void
|
||||
// children: popover body
|
||||
// ariaLabel: accessible label for the dialog
|
||||
export default function Popover({ anchor, open, onClose, children, ariaLabel }) {
|
||||
const popoverRef = useRef(null)
|
||||
const [pos, setPos] = useState({ top: 0, left: 0, flipped: false })
|
||||
|
||||
// Compute position from the anchor's bounding box whenever we open or the
|
||||
// viewport changes. 240px is the minimum width we'll reserve; bigger content
|
||||
// grows naturally.
|
||||
const reposition = useCallback(() => {
|
||||
if (!anchor?.current) return
|
||||
const rect = anchor.current.getBoundingClientRect()
|
||||
const popoverHeight = popoverRef.current?.offsetHeight ?? 0
|
||||
const spaceBelow = window.innerHeight - rect.bottom
|
||||
const flipped = popoverHeight > spaceBelow - 16 && rect.top > popoverHeight
|
||||
const top = flipped ? rect.top - popoverHeight - 8 : rect.bottom + 8
|
||||
// Prefer left-aligned; clamp so we don't go off-screen right.
|
||||
const left = Math.min(rect.left, window.innerWidth - 320)
|
||||
setPos({ top, left: Math.max(8, left), flipped })
|
||||
}, [anchor])
|
||||
|
||||
useEffect(() => {
|
||||
if (!open) return
|
||||
reposition()
|
||||
window.addEventListener('resize', reposition)
|
||||
window.addEventListener('scroll', reposition, true)
|
||||
return () => {
|
||||
window.removeEventListener('resize', reposition)
|
||||
window.removeEventListener('scroll', reposition, true)
|
||||
}
|
||||
}, [open, reposition])
|
||||
|
||||
// Close on outside click or Escape. Mousedown (not click) so the close
|
||||
// happens before a parent handler could re-trigger us.
|
||||
useEffect(() => {
|
||||
if (!open) return
|
||||
const onMouseDown = (e) => {
|
||||
if (popoverRef.current && !popoverRef.current.contains(e.target) && !anchor?.current?.contains(e.target)) {
|
||||
onClose()
|
||||
}
|
||||
}
|
||||
const onKey = (e) => { if (e.key === 'Escape') onClose() }
|
||||
document.addEventListener('mousedown', onMouseDown)
|
||||
document.addEventListener('keydown', onKey)
|
||||
return () => {
|
||||
document.removeEventListener('mousedown', onMouseDown)
|
||||
document.removeEventListener('keydown', onKey)
|
||||
}
|
||||
}, [open, onClose, anchor])
|
||||
|
||||
// Return focus to the trigger when the popover closes — keyboard users
|
||||
// shouldn't have to tab back through the whole page to find their spot.
|
||||
useEffect(() => {
|
||||
if (!open && anchor?.current) {
|
||||
// requestAnimationFrame so the close is painted before focus jumps;
|
||||
// otherwise screen readers announce the trigger mid-transition.
|
||||
const raf = requestAnimationFrame(() => anchor.current?.focus?.())
|
||||
return () => cancelAnimationFrame(raf)
|
||||
}
|
||||
}, [open, anchor])
|
||||
|
||||
if (!open) return null
|
||||
|
||||
return (
|
||||
<div
|
||||
ref={popoverRef}
|
||||
role="dialog"
|
||||
aria-label={ariaLabel}
|
||||
className="popover card"
|
||||
style={{ top: pos.top, left: pos.left }}
|
||||
>
|
||||
{children}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
@@ -3,6 +3,8 @@ import { useNavigate, useOutletContext, useSearchParams } from 'react-router-dom
|
||||
import ResourceMonitor from '../components/ResourceMonitor'
|
||||
import ConfirmDialog from '../components/ConfirmDialog'
|
||||
import Toggle from '../components/Toggle'
|
||||
import NodeDistributionChip from '../components/NodeDistributionChip'
|
||||
import FilterBar from '../components/FilterBar'
|
||||
import { useModels } from '../hooks/useModels'
|
||||
import { backendControlApi, modelsApi, backendsApi, systemApi, nodesApi } from '../utils/api'
|
||||
|
||||
@@ -11,6 +13,22 @@ const TABS = [
|
||||
{ key: 'backends', label: 'Backends', icon: 'fa-server' },
|
||||
]
|
||||
|
||||
// formatInstalledAt renders an installed_at timestamp as a short relative/abs
|
||||
// string suitable for dense tables. Returns the raw value if parsing fails so
|
||||
// we never display "Invalid Date".
|
||||
function formatInstalledAt(value) {
|
||||
if (!value) return '—'
|
||||
const d = new Date(value)
|
||||
if (isNaN(d.getTime())) return value
|
||||
const now = Date.now()
|
||||
const diffMin = Math.floor((now - d.getTime()) / 60000)
|
||||
if (diffMin < 1) return 'just now'
|
||||
if (diffMin < 60) return `${diffMin}m ago`
|
||||
if (diffMin < 60 * 24) return `${Math.floor(diffMin / 60)}h ago`
|
||||
if (diffMin < 60 * 24 * 30) return `${Math.floor(diffMin / (60 * 24))}d ago`
|
||||
return d.toISOString().slice(0, 10)
|
||||
}
|
||||
|
||||
export default function Manage() {
|
||||
const { addToast } = useOutletContext()
|
||||
const navigate = useNavigate()
|
||||
@@ -28,6 +46,24 @@ export default function Manage() {
|
||||
const [distributedMode, setDistributedMode] = useState(false)
|
||||
const [togglingModels, setTogglingModels] = useState(new Set())
|
||||
const [pinningModels, setPinningModels] = useState(new Set())
|
||||
// Filter state per tab. Persisted in the URL query so switching tabs
|
||||
// doesn't lose the filter the operator just set.
|
||||
const [modelsSearch, setModelsSearch] = useState(() => searchParams.get('mq') || '')
|
||||
const [modelsFilter, setModelsFilter] = useState(() => searchParams.get('mf') || 'all')
|
||||
const [backendsSearch, setBackendsSearch] = useState(() => searchParams.get('bq') || '')
|
||||
const [backendsFilter, setBackendsFilter] = useState(() => searchParams.get('bf') || 'all')
|
||||
|
||||
// Sync filter state into the URL so deep-links + tab switches survive.
|
||||
useEffect(() => {
|
||||
const p = new URLSearchParams(searchParams)
|
||||
const setOrDelete = (k, v) => { if (v && v !== 'all') p.set(k, v); else p.delete(k) }
|
||||
setOrDelete('mq', modelsSearch)
|
||||
setOrDelete('mf', modelsFilter)
|
||||
setOrDelete('bq', backendsSearch)
|
||||
setOrDelete('bf', backendsFilter)
|
||||
setSearchParams(p, { replace: true })
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [modelsSearch, modelsFilter, backendsSearch, backendsFilter])
|
||||
|
||||
const handleTabChange = (tab) => {
|
||||
setActiveTab(tab)
|
||||
@@ -64,6 +100,35 @@ export default function Manage() {
|
||||
nodesApi.list().then(() => setDistributedMode(true)).catch(() => {})
|
||||
}, [fetchLoadedModels, fetchBackends])
|
||||
|
||||
// Auto-refresh the Models tab every 10s in distributed mode so ghost models
|
||||
// (loaded on a worker but absent from this frontend's in-memory cache)
|
||||
// clear on their own without the user clicking Update.
|
||||
const [lastSyncedAt, setLastSyncedAt] = useState(() => Date.now())
|
||||
const [nowTick, setNowTick] = useState(() => Date.now())
|
||||
useEffect(() => {
|
||||
if (!distributedMode || activeTab !== 'models') return
|
||||
const interval = setInterval(() => {
|
||||
refetchModels()
|
||||
fetchLoadedModels()
|
||||
setLastSyncedAt(Date.now())
|
||||
}, 10000)
|
||||
return () => clearInterval(interval)
|
||||
}, [distributedMode, activeTab, refetchModels, fetchLoadedModels])
|
||||
|
||||
// Drive the "last synced Ns ago" label without over-rendering the table.
|
||||
useEffect(() => {
|
||||
if (!distributedMode) return
|
||||
const interval = setInterval(() => setNowTick(Date.now()), 1000)
|
||||
return () => clearInterval(interval)
|
||||
}, [distributedMode])
|
||||
const lastSyncedAgo = (() => {
|
||||
const s = Math.max(0, Math.floor((nowTick - lastSyncedAt) / 1000))
|
||||
if (s < 5) return 'just now'
|
||||
if (s < 60) return `${s}s ago`
|
||||
const m = Math.floor(s / 60)
|
||||
return `${m}m ago`
|
||||
})()
|
||||
|
||||
// Fetch available backend upgrades
|
||||
useEffect(() => {
|
||||
if (activeTab === 'backends') {
|
||||
@@ -196,6 +261,29 @@ export default function Manage() {
|
||||
}
|
||||
}
|
||||
|
||||
const [upgradingAll, setUpgradingAll] = useState(false)
|
||||
const [showOnlyUpgradable, setShowOnlyUpgradable] = useState(false)
|
||||
const handleUpgradeAll = async () => {
|
||||
const names = Object.keys(upgrades)
|
||||
if (names.length === 0) return
|
||||
setUpgradingAll(true)
|
||||
try {
|
||||
// Serial upgrade — matches the gallery's Upgrade All behavior.
|
||||
// Each backend upgrade is itself a cluster-wide fan-out, so parallel
|
||||
// calls would multiply load on every worker.
|
||||
for (const name of names) {
|
||||
try {
|
||||
await backendsApi.upgrade(name)
|
||||
} catch (err) {
|
||||
addToast(`Upgrade failed for ${name}: ${err.message}`, 'error')
|
||||
}
|
||||
}
|
||||
addToast(`Upgrade started for ${names.length} backend${names.length === 1 ? '' : 's'}`, 'info')
|
||||
} finally {
|
||||
setUpgradingAll(false)
|
||||
}
|
||||
}
|
||||
|
||||
const handleDeleteBackend = (name) => {
|
||||
setConfirmDialog({
|
||||
title: 'Delete Backend',
|
||||
@@ -227,29 +315,74 @@ export default function Manage() {
|
||||
|
||||
{/* Tabs */}
|
||||
<div className="tabs" style={{ marginTop: 'var(--spacing-lg)', marginBottom: 'var(--spacing-md)' }}>
|
||||
{TABS.map(t => (
|
||||
<button
|
||||
key={t.key}
|
||||
className={`tab ${activeTab === t.key ? 'tab-active' : ''}`}
|
||||
onClick={() => handleTabChange(t.key)}
|
||||
>
|
||||
<i className={`fas ${t.icon}`} style={{ marginRight: 6 }} />
|
||||
{t.label}
|
||||
{t.key === 'models' && !modelsLoading && ` (${models.length})`}
|
||||
{t.key === 'backends' && !backendsLoading && ` (${backends.length})`}
|
||||
</button>
|
||||
))}
|
||||
{TABS.map(t => {
|
||||
const upgradeCount = t.key === 'backends' ? Object.keys(upgrades).length : 0
|
||||
return (
|
||||
<button
|
||||
key={t.key}
|
||||
className={`tab ${activeTab === t.key ? 'tab-active' : ''}`}
|
||||
onClick={() => handleTabChange(t.key)}
|
||||
>
|
||||
<i className={`fas ${t.icon}`} style={{ marginRight: 6 }} />
|
||||
{t.label}
|
||||
{t.key === 'models' && !modelsLoading && ` (${models.length})`}
|
||||
{t.key === 'backends' && !backendsLoading && ` (${backends.length})`}
|
||||
{upgradeCount > 0 && (
|
||||
<span className="tab-pill tab-pill--warning" title={`${upgradeCount} update${upgradeCount === 1 ? '' : 's'} available`}>
|
||||
<i className="fas fa-arrow-up" /> {upgradeCount}
|
||||
</span>
|
||||
)}
|
||||
</button>
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
|
||||
{/* Models Tab */}
|
||||
{activeTab === 'models' && (
|
||||
{activeTab === 'models' && (() => {
|
||||
// Computed filters — done here so the result is available both to
|
||||
// the FilterBar counts and to the table body.
|
||||
const MODEL_FILTERS = [
|
||||
{ key: 'all', label: 'All', icon: 'fa-layer-group' },
|
||||
{ key: 'running', label: 'Running', icon: 'fa-circle-play' },
|
||||
{ key: 'idle', label: 'Idle', icon: 'fa-pause' },
|
||||
{ key: 'disabled', label: 'Disabled', icon: 'fa-ban' },
|
||||
{ key: 'pinned', label: 'Pinned', icon: 'fa-thumbtack' },
|
||||
...(distributedMode ? [{ key: 'distributed', label: 'Distributed', icon: 'fa-server' }] : []),
|
||||
]
|
||||
const passesFilter = (m) => {
|
||||
if (modelsFilter === 'running') return !m.disabled && (loadedModelIds.has(m.id) || (m.loaded_on && m.loaded_on.length > 0))
|
||||
if (modelsFilter === 'idle') return !m.disabled && !loadedModelIds.has(m.id) && !(m.loaded_on && m.loaded_on.length > 0)
|
||||
if (modelsFilter === 'disabled') return !!m.disabled
|
||||
if (modelsFilter === 'pinned') return !!m.pinned
|
||||
if (modelsFilter === 'distributed') return Array.isArray(m.loaded_on) && m.loaded_on.length > 0
|
||||
return true
|
||||
}
|
||||
const q = modelsSearch.trim().toLowerCase()
|
||||
const passesSearch = (m) => !q || (m.id || '').toLowerCase().includes(q) || (m.backend || '').toLowerCase().includes(q)
|
||||
const visibleModels = models.filter(m => passesFilter(m) && passesSearch(m))
|
||||
return (
|
||||
<div>
|
||||
<div style={{ display: 'flex', alignItems: 'center', justifyContent: 'flex-end', marginBottom: 'var(--spacing-md)' }}>
|
||||
<button className="btn btn-secondary btn-sm" onClick={handleReload} disabled={reloading}>
|
||||
<i className={`fas ${reloading ? 'fa-spinner fa-spin' : 'fa-rotate'}`} />
|
||||
{reloading ? 'Updating...' : 'Update'}
|
||||
</button>
|
||||
</div>
|
||||
<FilterBar
|
||||
search={modelsSearch}
|
||||
onSearchChange={setModelsSearch}
|
||||
searchPlaceholder="Search models by name or backend..."
|
||||
filters={MODEL_FILTERS}
|
||||
activeFilter={modelsFilter}
|
||||
onFilterChange={setModelsFilter}
|
||||
rightSlot={(
|
||||
<>
|
||||
{distributedMode && (
|
||||
<span className="cell-muted" title="Auto-refreshes every 10s in distributed mode so ghost models clear promptly">
|
||||
<i className="fas fa-rotate" /> Last synced {lastSyncedAgo}
|
||||
</span>
|
||||
)}
|
||||
<button className="btn btn-secondary btn-sm" onClick={handleReload} disabled={reloading}>
|
||||
<i className={`fas ${reloading ? 'fa-spinner fa-spin' : 'fa-rotate'}`} />
|
||||
{reloading ? ' Updating...' : ' Update'}
|
||||
</button>
|
||||
</>
|
||||
)}
|
||||
/>
|
||||
|
||||
{modelsLoading ? (
|
||||
<div className="card" style={{ padding: 'var(--spacing-xl)', textAlign: 'center', color: 'var(--color-text-muted)' }}>
|
||||
@@ -274,6 +407,12 @@ export default function Manage() {
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
) : visibleModels.length === 0 ? (
|
||||
<div className="empty-state">
|
||||
<i className="fas fa-filter" />
|
||||
<p>No models match the current filter.</p>
|
||||
<button className="btn btn-ghost btn-sm" onClick={() => { setModelsSearch(''); setModelsFilter('all') }}>Clear filters</button>
|
||||
</div>
|
||||
) : (
|
||||
<div className="table-container">
|
||||
<table className="table">
|
||||
@@ -288,7 +427,7 @@ export default function Manage() {
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{models.map(model => (
|
||||
{visibleModels.map(model => (
|
||||
<tr key={model.id} style={{ opacity: model.disabled ? 0.55 : 1, transition: 'opacity 0.2s' }}>
|
||||
{/* Enable/Disable toggle */}
|
||||
<td>
|
||||
@@ -329,21 +468,33 @@ export default function Manage() {
|
||||
</div>
|
||||
</div>
|
||||
</td>
|
||||
{/* Status */}
|
||||
{/* Status / Distribution */}
|
||||
<td>
|
||||
{model.disabled ? (
|
||||
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
|
||||
<i className="fas fa-ban" style={{ fontSize: '6px' }} /> Disabled
|
||||
</span>
|
||||
) : loadedModelIds.has(model.id) ? (
|
||||
<span className="badge badge-success">
|
||||
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Running
|
||||
</span>
|
||||
) : (
|
||||
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
|
||||
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Idle
|
||||
</span>
|
||||
)}
|
||||
<div className="cell-stack">
|
||||
{model.disabled ? (
|
||||
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
|
||||
<i className="fas fa-ban" /> Disabled
|
||||
</span>
|
||||
) : model.loaded_on && model.loaded_on.length > 0 ? (
|
||||
// Distributed mode: surface where the model is
|
||||
// actually loaded. Shared chip scales to any cluster
|
||||
// size (inline for <=3, popover for larger).
|
||||
<NodeDistributionChip nodes={model.loaded_on} context="models" />
|
||||
) : loadedModelIds.has(model.id) ? (
|
||||
<span className="badge badge-success">
|
||||
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Running
|
||||
</span>
|
||||
) : (
|
||||
<span className="badge" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
|
||||
<i className="fas fa-circle" style={{ fontSize: '6px' }} /> Idle
|
||||
</span>
|
||||
)}
|
||||
{model.source === 'registry-only' && (
|
||||
<span className="badge badge-warning" title="Discovered on a worker but not configured locally. Persist the config to make it permanent.">
|
||||
<i className="fas fa-ghost" /> Adopted
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
</td>
|
||||
{/* Backend */}
|
||||
<td>
|
||||
@@ -394,11 +545,34 @@ export default function Manage() {
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
)
|
||||
})()}
|
||||
|
||||
{/* Backends Tab */}
|
||||
{activeTab === 'backends' && (
|
||||
<div>
|
||||
{/* Upgrade banner — mirrors the gallery so operators can't miss updates */}
|
||||
{!backendsLoading && Object.keys(upgrades).length > 0 && (
|
||||
<div className="upgrade-banner">
|
||||
<div className="upgrade-banner__text">
|
||||
<i className="fas fa-arrow-up" />
|
||||
<span>
|
||||
{Object.keys(upgrades).length} backend{Object.keys(upgrades).length === 1 ? ' has' : 's have'} updates available
|
||||
</span>
|
||||
</div>
|
||||
<div className="upgrade-banner__actions">
|
||||
<button
|
||||
className="btn btn-primary btn-sm"
|
||||
onClick={handleUpgradeAll}
|
||||
disabled={upgradingAll}
|
||||
>
|
||||
<i className={`fas ${upgradingAll ? 'fa-spinner fa-spin' : 'fa-arrow-up'}`} />
|
||||
{upgradingAll ? ' Upgrading...' : ' Upgrade all'}
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{backendsLoading ? (
|
||||
<div style={{ textAlign: 'center', padding: 'var(--spacing-md)', color: 'var(--color-text-muted)', fontSize: '0.875rem' }}>
|
||||
Loading backends...
|
||||
@@ -419,109 +593,217 @@ export default function Manage() {
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
<div className="table-container">
|
||||
) : (() => {
|
||||
// Count chip badges: show N in the filter buttons so operators can
|
||||
// see at a glance how their chips bucket the list.
|
||||
const upgradableCount = backends.filter(b => upgrades[b.Name]).length
|
||||
const userCount = backends.filter(b => !b.IsSystem).length
|
||||
const systemCount = backends.filter(b => b.IsSystem).length
|
||||
const metaCount = backends.filter(b => b.IsMeta).length
|
||||
const offlineCount = backends.filter(b => {
|
||||
const n = b.Nodes || b.nodes || []
|
||||
return n.some(x => {
|
||||
const s = x.node_status || x.NodeStatus
|
||||
return s && s !== 'healthy' && s !== 'draining'
|
||||
})
|
||||
}).length
|
||||
|
||||
const BACKEND_FILTERS = [
|
||||
{ key: 'all', label: 'All', icon: 'fa-layer-group', count: backends.length },
|
||||
{ key: 'user', label: 'User', icon: 'fa-download', count: userCount },
|
||||
{ key: 'system', label: 'System', icon: 'fa-shield-alt', count: systemCount },
|
||||
{ key: 'meta', label: 'Meta', icon: 'fa-layer-group', count: metaCount },
|
||||
...(upgradableCount > 0 ? [{ key: 'upgradable', label: 'Updates', icon: 'fa-arrow-up', count: upgradableCount }] : []),
|
||||
...(distributedMode && offlineCount > 0 ? [{ key: 'offline', label: 'Offline nodes', icon: 'fa-exclamation-circle', count: offlineCount }] : []),
|
||||
]
|
||||
const q = backendsSearch.trim().toLowerCase()
|
||||
const passesSearch = (b) => !q
|
||||
|| (b.Name || '').toLowerCase().includes(q)
|
||||
|| (b.Metadata?.alias || '').toLowerCase().includes(q)
|
||||
|| (b.Metadata?.meta_backend_for || '').toLowerCase().includes(q)
|
||||
const passesFilter = (b) => {
|
||||
switch (backendsFilter) {
|
||||
case 'user': return !b.IsSystem
|
||||
case 'system': return !!b.IsSystem
|
||||
case 'meta': return !!b.IsMeta
|
||||
case 'upgradable': return !!upgrades[b.Name]
|
||||
case 'offline': {
|
||||
const n = b.Nodes || b.nodes || []
|
||||
return n.some(x => {
|
||||
const s = x.node_status || x.NodeStatus
|
||||
return s && s !== 'healthy' && s !== 'draining'
|
||||
})
|
||||
}
|
||||
default: return true
|
||||
}
|
||||
}
|
||||
// Legacy "showOnlyUpgradable" toggle is now the 'upgradable' chip —
|
||||
// keep backward-compat by mapping it onto the new filter.
|
||||
if (showOnlyUpgradable && backendsFilter !== 'upgradable') {
|
||||
// One-shot reconciliation — the old state becomes the new chip.
|
||||
setBackendsFilter('upgradable')
|
||||
setShowOnlyUpgradable(false)
|
||||
}
|
||||
const visibleBackends = backends.filter(b => passesFilter(b) && passesSearch(b))
|
||||
if (visibleBackends.length === 0) {
|
||||
return (
|
||||
<>
|
||||
<FilterBar
|
||||
search={backendsSearch}
|
||||
onSearchChange={setBackendsSearch}
|
||||
searchPlaceholder="Search backends by name or alias..."
|
||||
filters={BACKEND_FILTERS}
|
||||
activeFilter={backendsFilter}
|
||||
onFilterChange={setBackendsFilter}
|
||||
/>
|
||||
<div className="empty-state">
|
||||
<i className="fas fa-filter" />
|
||||
<p>No backends match the current filter.</p>
|
||||
<button className="btn btn-ghost btn-sm" onClick={() => { setBackendsSearch(''); setBackendsFilter('all') }}>Clear filters</button>
|
||||
</div>
|
||||
</>
|
||||
)
|
||||
}
|
||||
return (
|
||||
<>
|
||||
<FilterBar
|
||||
search={backendsSearch}
|
||||
onSearchChange={setBackendsSearch}
|
||||
searchPlaceholder="Search backends by name or alias..."
|
||||
filters={BACKEND_FILTERS}
|
||||
activeFilter={backendsFilter}
|
||||
onFilterChange={setBackendsFilter}
|
||||
/>
|
||||
<div className="table-container">
|
||||
<table className="table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Name</th>
|
||||
<th>Type</th>
|
||||
<th>Metadata</th>
|
||||
<th>Version</th>
|
||||
{distributedMode && <th>Nodes</th>}
|
||||
<th>Installed</th>
|
||||
<th style={{ textAlign: 'right' }}>Actions</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{backends.map((backend, i) => (
|
||||
{visibleBackends.map((backend, i) => {
|
||||
const upgradeInfo = upgrades[backend.Name]
|
||||
const hasDrift = upgradeInfo?.node_drift?.length > 0
|
||||
const nodes = backend.Nodes || backend.nodes || []
|
||||
return (
|
||||
<tr key={backend.Name || i}>
|
||||
<td>
|
||||
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)' }}>
|
||||
<i className="fas fa-cog" style={{ color: 'var(--color-accent)', fontSize: '0.75rem' }} />
|
||||
<span style={{ fontWeight: 500 }}>{backend.Name}</span>
|
||||
<div className="cell-name">
|
||||
<i className="fas fa-cog" />
|
||||
<span>{backend.Name}</span>
|
||||
{backend.Metadata?.alias && (
|
||||
<span className="cell-subtle">alias: {backend.Metadata.alias}</span>
|
||||
)}
|
||||
{backend.Metadata?.meta_backend_for && (
|
||||
<span className="cell-subtle">for: {backend.Metadata.meta_backend_for}</span>
|
||||
)}
|
||||
</div>
|
||||
</td>
|
||||
<td>
|
||||
<div style={{ display: 'flex', gap: '4px', flexWrap: 'wrap' }}>
|
||||
<div className="badge-row">
|
||||
{backend.IsSystem ? (
|
||||
<span className="badge badge-info" style={{ fontSize: '0.625rem' }}>
|
||||
<i className="fas fa-shield-alt" style={{ fontSize: '0.5rem', marginRight: 2 }} />System
|
||||
<span className="badge badge-info">
|
||||
<i className="fas fa-shield-alt" /> System
|
||||
</span>
|
||||
) : (
|
||||
<span className="badge badge-success" style={{ fontSize: '0.625rem' }}>
|
||||
<i className="fas fa-download" style={{ fontSize: '0.5rem', marginRight: 2 }} />User
|
||||
<span className="badge badge-success">
|
||||
<i className="fas fa-download" /> User
|
||||
</span>
|
||||
)}
|
||||
{backend.IsMeta && (
|
||||
<span className="badge" style={{ background: 'var(--color-accent-light)', color: 'var(--color-accent)', fontSize: '0.625rem' }}>
|
||||
<i className="fas fa-layer-group" style={{ fontSize: '0.5rem', marginRight: 2 }} />Meta
|
||||
<span className="badge badge-accent">
|
||||
<i className="fas fa-layer-group" /> Meta
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
</td>
|
||||
<td>
|
||||
<div style={{ display: 'flex', flexDirection: 'column', gap: 2, fontSize: '0.75rem', color: 'var(--color-text-secondary)' }}>
|
||||
{backend.Metadata?.alias && (
|
||||
<span>
|
||||
<i className="fas fa-tag" style={{ fontSize: '0.5rem', marginRight: 4 }} />
|
||||
Alias: <span style={{ color: 'var(--color-text-primary)' }}>{backend.Metadata.alias}</span>
|
||||
<div className="cell-stack">
|
||||
{backend.Metadata?.version ? (
|
||||
<span className="cell-mono">v{backend.Metadata.version}</span>
|
||||
) : (
|
||||
<span className="cell-muted">—</span>
|
||||
)}
|
||||
{upgradeInfo && (
|
||||
<span className="badge badge-warning" title={upgradeInfo.available_version ? `Upgrade to v${upgradeInfo.available_version}` : 'Update available'}>
|
||||
<i className="fas fa-arrow-up" />
|
||||
{upgradeInfo.available_version ? ` v${upgradeInfo.available_version}` : ' Update available'}
|
||||
</span>
|
||||
)}
|
||||
{backend.Metadata?.meta_backend_for && (
|
||||
<span>
|
||||
<i className="fas fa-link" style={{ fontSize: '0.5rem', marginRight: 4 }} />
|
||||
For: <span style={{ color: 'var(--color-accent)' }}>{backend.Metadata.meta_backend_for}</span>
|
||||
{hasDrift && (
|
||||
<span
|
||||
className="badge badge-warning"
|
||||
title={`Drift: ${upgradeInfo.node_drift.map(d => `${d.node_name}${d.version ? ' v' + d.version : ''}`).join(', ')}`}
|
||||
>
|
||||
<i className="fas fa-code-branch" />
|
||||
{' '}Drift: {upgradeInfo.node_drift.length} node{upgradeInfo.node_drift.length === 1 ? '' : 's'}
|
||||
</span>
|
||||
)}
|
||||
{backend.Metadata?.version && (
|
||||
<span>
|
||||
<i className="fas fa-code-branch" style={{ fontSize: '0.5rem', marginRight: 4 }} />
|
||||
Version: <span style={{ color: 'var(--color-text-primary)' }}>v{backend.Metadata.version}</span>
|
||||
{upgrades[backend.Name] && (
|
||||
<span style={{ color: '#856404', marginLeft: 4 }}>
|
||||
→ v{upgrades[backend.Name].available_version}
|
||||
</span>
|
||||
)}
|
||||
</span>
|
||||
)}
|
||||
{backend.Metadata?.installed_at && (
|
||||
<span>
|
||||
<i className="fas fa-calendar" style={{ fontSize: '0.5rem', marginRight: 4 }} />
|
||||
{backend.Metadata.installed_at}
|
||||
</span>
|
||||
)}
|
||||
{!backend.Metadata?.alias && !backend.Metadata?.meta_backend_for && !backend.Metadata?.installed_at && '—'}
|
||||
</div>
|
||||
</td>
|
||||
{distributedMode && (
|
||||
<td>
|
||||
<NodeDistributionChip nodes={nodes} context="backends" />
|
||||
</td>
|
||||
)}
|
||||
<td>
|
||||
<div style={{ display: 'flex', gap: 'var(--spacing-xs)', justifyContent: 'flex-end' }}>
|
||||
{!backend.IsSystem ? (
|
||||
<span className="cell-muted cell-mono">
|
||||
{backend.Metadata?.installed_at ? formatInstalledAt(backend.Metadata.installed_at) : '—'}
|
||||
</span>
|
||||
</td>
|
||||
<td>
|
||||
<div className="row-actions">
|
||||
{backend.IsSystem ? (
|
||||
<span className="badge" title="System backends are managed outside the gallery">
|
||||
<i className="fas fa-lock" /> Protected
|
||||
</span>
|
||||
) : (
|
||||
<>
|
||||
{upgradeInfo ? (
|
||||
<button
|
||||
className="btn btn-primary btn-sm"
|
||||
onClick={() => handleUpgradeBackend(backend.Name)}
|
||||
disabled={reinstallingBackends.has(backend.Name)}
|
||||
>
|
||||
<i className={`fas ${reinstallingBackends.has(backend.Name) ? 'fa-spinner fa-spin' : 'fa-arrow-up'}`} />
|
||||
{' '}Upgrade{upgradeInfo.available_version ? ` to v${upgradeInfo.available_version}` : ''}
|
||||
</button>
|
||||
) : (
|
||||
<button
|
||||
className="btn btn-secondary btn-sm"
|
||||
onClick={() => handleReinstallBackend(backend.Name)}
|
||||
disabled={reinstallingBackends.has(backend.Name)}
|
||||
>
|
||||
<i className={`fas ${reinstallingBackends.has(backend.Name) ? 'fa-spinner fa-spin' : 'fa-rotate'}`} />
|
||||
{' '}Reinstall
|
||||
</button>
|
||||
)}
|
||||
<button
|
||||
className={`btn ${upgrades[backend.Name] ? 'btn-primary' : 'btn-secondary'} btn-sm`}
|
||||
onClick={() => upgrades[backend.Name] ? handleUpgradeBackend(backend.Name) : handleReinstallBackend(backend.Name)}
|
||||
disabled={reinstallingBackends.has(backend.Name)}
|
||||
title={upgrades[backend.Name] ? `Upgrade to v${upgrades[backend.Name]?.available_version || 'latest'}` : 'Reinstall'}
|
||||
>
|
||||
<i className={`fas ${reinstallingBackends.has(backend.Name) ? 'fa-spinner fa-spin' : upgrades[backend.Name] ? 'fa-arrow-up' : 'fa-rotate'}`} />
|
||||
</button>
|
||||
<button
|
||||
className="btn btn-danger btn-sm"
|
||||
className="btn btn-danger-ghost btn-sm"
|
||||
onClick={() => handleDeleteBackend(backend.Name)}
|
||||
title="Delete"
|
||||
title="Delete backend (removes from all nodes)"
|
||||
>
|
||||
<i className="fas fa-trash" />
|
||||
</button>
|
||||
</>
|
||||
) : (
|
||||
<span style={{ fontSize: '0.75rem', color: 'var(--color-text-muted)' }}>—</span>
|
||||
)}
|
||||
</div>
|
||||
</td>
|
||||
</tr>
|
||||
))}
|
||||
)
|
||||
})}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</>
|
||||
)
|
||||
})()}
|
||||
</div>
|
||||
)}
|
||||
|
||||
|
||||
@@ -51,15 +51,22 @@ const modelStateConfig = {
|
||||
idle: { bg: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)', border: 'var(--color-border-subtle)' },
|
||||
}
|
||||
|
||||
function StatCard({ icon, label, value, color }) {
|
||||
function StatCard({ icon, label, value, color, accentVar }) {
|
||||
// accentVar: optional CSS variable for the left edge + icon chip, e.g.
|
||||
// "--color-success". When unset the card reads neutral — used for simple
|
||||
// counts so they don't compete with the semantic cards for attention.
|
||||
const accent = color || (accentVar ? `var(${accentVar})` : 'var(--color-text-primary)')
|
||||
return (
|
||||
<div className="card" style={{ padding: 'var(--spacing-sm) var(--spacing-md)', flex: '1 1 0', minWidth: 120 }}>
|
||||
<div style={{ display: 'flex', alignItems: 'center', gap: 6, marginBottom: 2 }}>
|
||||
<i className={icon} style={{ color: 'var(--color-text-muted)', fontSize: '0.75rem' }} />
|
||||
<span style={{ fontSize: '0.6875rem', color: 'var(--color-text-muted)', fontWeight: 500, textTransform: 'uppercase', letterSpacing: '0.03em' }}>{label}</span>
|
||||
<div
|
||||
className="stat-card"
|
||||
style={accentVar ? { ['--stat-accent']: `var(${accentVar})` } : undefined}
|
||||
>
|
||||
<div className="stat-card__body">
|
||||
<div className="stat-card__label">{label}</div>
|
||||
<div className="stat-card__value" style={{ color: accent }}>{value}</div>
|
||||
</div>
|
||||
<div style={{ fontSize: '1.375rem', fontWeight: 700, fontFamily: 'JetBrains Mono, monospace', color: color || 'var(--color-text-primary)' }}>
|
||||
{value}
|
||||
<div className="stat-card__icon" style={accentVar ? { color: accent } : undefined}>
|
||||
<i className={icon} />
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
@@ -543,45 +550,24 @@ export default function Nodes() {
|
||||
</div>
|
||||
|
||||
{/* Tabs */}
|
||||
<div style={{ display: 'flex', gap: 'var(--spacing-xs)', marginBottom: 'var(--spacing-lg)', borderBottom: '2px solid var(--color-border)' }}>
|
||||
<div className="tabs" style={{ marginBottom: 'var(--spacing-lg)' }}>
|
||||
<button
|
||||
onClick={() => setActiveTab('backend')}
|
||||
style={{
|
||||
padding: 'var(--spacing-sm) var(--spacing-lg)',
|
||||
border: 'none', cursor: 'pointer', fontWeight: 600, fontSize: '0.875rem',
|
||||
background: 'none',
|
||||
color: activeTab === 'backend' ? 'var(--color-primary)' : 'var(--color-text-muted)',
|
||||
borderBottom: activeTab === 'backend' ? '2px solid var(--color-primary)' : '2px solid transparent',
|
||||
marginBottom: '-2px',
|
||||
}}
|
||||
className={`tab ${activeTab === 'backend' ? 'tab-active' : ''}`}
|
||||
>
|
||||
<i className="fas fa-server" style={{ marginRight: 6 }} />
|
||||
Backend Workers ({backendNodes.length})
|
||||
</button>
|
||||
<button
|
||||
onClick={() => setActiveTab('agent')}
|
||||
style={{
|
||||
padding: 'var(--spacing-sm) var(--spacing-lg)',
|
||||
border: 'none', cursor: 'pointer', fontWeight: 600, fontSize: '0.875rem',
|
||||
background: 'none',
|
||||
color: activeTab === 'agent' ? 'var(--color-primary)' : 'var(--color-text-muted)',
|
||||
borderBottom: activeTab === 'agent' ? '2px solid var(--color-primary)' : '2px solid transparent',
|
||||
marginBottom: '-2px',
|
||||
}}
|
||||
className={`tab ${activeTab === 'agent' ? 'tab-active' : ''}`}
|
||||
>
|
||||
<i className="fas fa-robot" style={{ marginRight: 6 }} />
|
||||
Agent Workers ({agentNodes.length})
|
||||
</button>
|
||||
<button
|
||||
onClick={() => setActiveTab('scheduling')}
|
||||
style={{
|
||||
padding: 'var(--spacing-sm) var(--spacing-lg)',
|
||||
border: 'none', cursor: 'pointer', fontWeight: 600, fontSize: '0.875rem',
|
||||
background: 'none',
|
||||
color: activeTab === 'scheduling' ? 'var(--color-primary)' : 'var(--color-text-muted)',
|
||||
borderBottom: activeTab === 'scheduling' ? '2px solid var(--color-primary)' : '2px solid transparent',
|
||||
marginBottom: '-2px',
|
||||
}}
|
||||
className={`tab ${activeTab === 'scheduling' ? 'tab-active' : ''}`}
|
||||
>
|
||||
<i className="fas fa-calendar-alt" style={{ marginRight: 6 }} />
|
||||
Scheduling ({schedulingConfigs.length})
|
||||
@@ -590,13 +576,17 @@ export default function Nodes() {
|
||||
|
||||
{activeTab !== 'scheduling' && <>
|
||||
{/* Stat cards */}
|
||||
<div style={{ display: 'flex', gap: 'var(--spacing-md)', marginBottom: 'var(--spacing-xl)', flexWrap: 'wrap' }}>
|
||||
<StatCard icon={activeTab === 'agent' ? 'fas fa-robot' : 'fas fa-server'} label={`Total ${activeTab === 'agent' ? 'Agent' : 'Backend'} Workers`} value={total} />
|
||||
<StatCard icon="fas fa-check-circle" label="Healthy" value={healthy} color="var(--color-success)" />
|
||||
<StatCard icon="fas fa-exclamation-circle" label="Unhealthy" value={unhealthy} color={unhealthy > 0 ? 'var(--color-error)' : undefined} />
|
||||
<StatCard icon="fas fa-hourglass-half" label="Draining" value={draining} color={draining > 0 ? 'var(--color-warning)' : undefined} />
|
||||
<div className="stat-grid">
|
||||
<StatCard icon={activeTab === 'agent' ? 'fas fa-robot' : 'fas fa-server'}
|
||||
label={`Total ${activeTab === 'agent' ? 'Agent' : 'Backend'} Workers`} value={total} />
|
||||
<StatCard icon="fas fa-check-circle" label="Healthy" value={healthy}
|
||||
accentVar={healthy > 0 ? '--color-success' : undefined} />
|
||||
<StatCard icon="fas fa-exclamation-circle" label="Unhealthy" value={unhealthy}
|
||||
accentVar={unhealthy > 0 ? '--color-error' : undefined} />
|
||||
<StatCard icon="fas fa-hourglass-half" label="Draining" value={draining}
|
||||
accentVar={draining > 0 ? '--color-warning' : undefined} />
|
||||
{pending > 0 && (
|
||||
<StatCard icon="fas fa-clock" label="Pending" value={pending} color="var(--color-warning)" />
|
||||
<StatCard icon="fas fa-clock" label="Pending" value={pending} accentVar="--color-warning" />
|
||||
)}
|
||||
{activeTab === 'backend' && (() => {
|
||||
const clusterTotalVRAM = backendNodes.reduce((sum, n) => sum + (n.total_vram || 0), 0)
|
||||
@@ -614,7 +604,7 @@ export default function Nodes() {
|
||||
)}
|
||||
<StatCard icon="fas fa-cube" label="Models Loaded" value={totalModelsLoaded} />
|
||||
<StatCard icon="fas fa-exchange-alt" label="In-Flight Requests" value={totalInFlight}
|
||||
color={totalInFlight > 0 ? 'var(--color-primary)' : undefined} />
|
||||
accentVar={totalInFlight > 0 ? '--color-primary' : undefined} />
|
||||
</>
|
||||
)
|
||||
})()}
|
||||
@@ -627,15 +617,11 @@ export default function Nodes() {
|
||||
<>
|
||||
<button
|
||||
onClick={() => setShowTips(t => !t)}
|
||||
style={{
|
||||
background: 'none', border: 'none', cursor: 'pointer',
|
||||
color: 'var(--color-primary)', fontSize: '0.8125rem', fontWeight: 500,
|
||||
display: 'flex', alignItems: 'center', gap: 6,
|
||||
padding: 0, marginBottom: 'var(--spacing-md)',
|
||||
}}
|
||||
className="nodes-add-worker"
|
||||
aria-expanded={showTips}
|
||||
>
|
||||
<i className={`fas fa-chevron-${showTips ? 'down' : 'right'}`} style={{ fontSize: '0.625rem' }} />
|
||||
Add more workers
|
||||
<i className={`fas ${showTips ? 'fa-chevron-down' : 'fa-plus'}`} />
|
||||
{showTips ? 'Hide instructions' : 'Register a new worker'}
|
||||
</button>
|
||||
{showTips && <WorkerHintCard addToast={addToast} activeTab={activeTab} hasWorkers />}
|
||||
</>
|
||||
@@ -685,23 +671,28 @@ export default function Nodes() {
|
||||
>
|
||||
<td>
|
||||
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)' }}>
|
||||
<i className="fas fa-server" style={{ color: 'var(--color-text-muted)', fontSize: '0.875rem' }} />
|
||||
{canExpand && (
|
||||
<span className={`row-chevron${isExpanded ? ' is-expanded' : ''}`} aria-hidden="true">
|
||||
<i className="fas fa-chevron-right" />
|
||||
</span>
|
||||
)}
|
||||
<i className="fas fa-server" style={{ color: 'var(--color-text-muted)', fontSize: 'var(--text-sm)' }} />
|
||||
<div>
|
||||
<div style={{ fontWeight: 600, fontSize: '0.875rem' }}>{node.name}</div>
|
||||
<div style={{ fontSize: '0.75rem', fontFamily: "'JetBrains Mono', monospace", color: 'var(--color-text-muted)' }}>
|
||||
<div style={{ fontWeight: 600, fontSize: 'var(--text-sm)' }}>{node.name}</div>
|
||||
<div className="cell-mono cell-muted">
|
||||
{node.address}
|
||||
</div>
|
||||
{node.labels && Object.keys(node.labels).length > 0 && (
|
||||
<div style={{ display: 'flex', flexWrap: 'wrap', gap: 3, marginTop: 3 }}>
|
||||
{Object.entries(node.labels).slice(0, 5).map(([k, v]) => (
|
||||
<span key={k} style={{
|
||||
fontSize: '0.625rem', padding: '1px 5px', borderRadius: 3,
|
||||
background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)',
|
||||
fontFamily: "'JetBrains Mono', monospace", border: '1px solid var(--color-border-subtle)',
|
||||
<span key={k} className="cell-mono" style={{
|
||||
padding: '1px 5px', borderRadius: 3,
|
||||
background: 'var(--color-bg-tertiary)',
|
||||
border: '1px solid var(--color-border-subtle)',
|
||||
}}>{k}={v}</span>
|
||||
))}
|
||||
{Object.keys(node.labels).length > 5 && (
|
||||
<span style={{ fontSize: '0.625rem', color: 'var(--color-text-muted)' }}>
|
||||
<span className="cell-muted">
|
||||
+{Object.keys(node.labels).length - 5} more
|
||||
</span>
|
||||
)}
|
||||
@@ -711,12 +702,10 @@ export default function Nodes() {
|
||||
</div>
|
||||
</td>
|
||||
<td>
|
||||
<div style={{ display: 'flex', alignItems: 'center', gap: 6 }}>
|
||||
<i className="fas fa-circle" style={{ fontSize: '0.5rem', color: status.color }} />
|
||||
<span style={{ fontSize: '0.8125rem', color: status.color, fontWeight: 500 }}>
|
||||
{status.label}
|
||||
</span>
|
||||
</div>
|
||||
<span className="node-status" style={{ color: status.color }}>
|
||||
<span className="node-status__dot" style={{ background: status.color }} />
|
||||
{status.label}
|
||||
</span>
|
||||
</td>
|
||||
<td>
|
||||
{hasGPU && totalVRAMStr ? (
|
||||
@@ -745,38 +734,37 @@ export default function Nodes() {
|
||||
</span>
|
||||
</td>
|
||||
<td style={{ textAlign: 'right' }}>
|
||||
<div style={{ display: 'flex', gap: 'var(--spacing-xs)', justifyContent: 'flex-end' }} onClick={e => e.stopPropagation()}>
|
||||
<div className="row-actions" onClick={e => e.stopPropagation()}>
|
||||
{node.status === 'pending' && (
|
||||
<button
|
||||
className="btn btn-primary btn-sm"
|
||||
onClick={() => handleApprove(node.id)}
|
||||
title="Approve node"
|
||||
>
|
||||
<i className="fas fa-check" />
|
||||
<i className="fas fa-check" /> Approve
|
||||
</button>
|
||||
)}
|
||||
{node.status === 'draining' && (
|
||||
<button
|
||||
className="btn btn-secondary btn-sm"
|
||||
onClick={() => handleResume(node.id)}
|
||||
title="Resume node"
|
||||
title="Resume accepting requests"
|
||||
>
|
||||
<i className="fas fa-play" />
|
||||
<i className="fas fa-play" /> Resume
|
||||
</button>
|
||||
)}
|
||||
{node.status !== 'draining' && node.status !== 'pending' && (
|
||||
<button
|
||||
className="btn btn-secondary btn-sm"
|
||||
onClick={() => handleDrain(node.id)}
|
||||
title="Drain node"
|
||||
title="Stop sending new requests to this node"
|
||||
>
|
||||
<i className="fas fa-pause" />
|
||||
<i className="fas fa-pause" /> Drain
|
||||
</button>
|
||||
)}
|
||||
<button
|
||||
className="btn btn-danger btn-sm"
|
||||
className="btn btn-danger-ghost btn-sm"
|
||||
onClick={() => setConfirmDelete(node)}
|
||||
title="Remove node"
|
||||
title="Remove node from cluster"
|
||||
>
|
||||
<i className="fas fa-trash" />
|
||||
</button>
|
||||
@@ -794,7 +782,10 @@ export default function Nodes() {
|
||||
{!models ? (
|
||||
<LoadingSpinner size="sm" />
|
||||
) : models.length === 0 ? (
|
||||
<p style={{ fontSize: '0.8125rem', color: 'var(--color-text-muted)' }}>No models loaded on this node</p>
|
||||
<div className="drawer-empty">
|
||||
<i className="fas fa-cube" />
|
||||
<span>No models loaded on this node yet.</span>
|
||||
</div>
|
||||
) : (
|
||||
<table className="table" style={{ margin: 0 }}>
|
||||
<thead>
|
||||
@@ -870,7 +861,10 @@ export default function Nodes() {
|
||||
{!backends ? (
|
||||
<LoadingSpinner size="sm" />
|
||||
) : backends.length === 0 ? (
|
||||
<p style={{ fontSize: '0.8125rem', color: 'var(--color-text-muted)' }}>No backends installed on this node</p>
|
||||
<div className="drawer-empty">
|
||||
<i className="fas fa-cogs" />
|
||||
<span>No backends installed on this node. Install one from the gallery to schedule models here.</span>
|
||||
</div>
|
||||
) : (
|
||||
<table className="table" style={{ margin: 0 }}>
|
||||
<thead>
|
||||
|
||||
@@ -510,28 +510,89 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
|
||||
modelConfigs := cl.GetAllModelsConfigs()
|
||||
modelsWithoutConfig, _ := galleryop.ListModels(cl, ml, config.NoFilterFn, galleryop.LOOSE_ONLY)
|
||||
|
||||
type loadedOn struct {
|
||||
NodeID string `json:"node_id"`
|
||||
NodeName string `json:"node_name"`
|
||||
State string `json:"state"`
|
||||
NodeStatus string `json:"node_status"`
|
||||
}
|
||||
type modelCapability struct {
|
||||
ID string `json:"id"`
|
||||
Capabilities []string `json:"capabilities"`
|
||||
Backend string `json:"backend"`
|
||||
Disabled bool `json:"disabled"`
|
||||
Pinned bool `json:"pinned"`
|
||||
ID string `json:"id"`
|
||||
Capabilities []string `json:"capabilities"`
|
||||
Backend string `json:"backend"`
|
||||
Disabled bool `json:"disabled"`
|
||||
Pinned bool `json:"pinned"`
|
||||
// LoadedOn is populated only when the node registry is active
|
||||
// (distributed mode). Lets the UI show "loaded on worker-1" without
|
||||
// the operator having to expand every node manually. An empty slice
|
||||
// with nil reports "no loaded replicas" vs. nil reports "not in
|
||||
// cluster mode" — the frontend treats both as "no distribution info".
|
||||
LoadedOn []loadedOn `json:"loaded_on,omitempty"`
|
||||
// Source="registry-only" marks models adopted from the cluster that
|
||||
// have no local config yet (ghosts that the reconciler discovered).
|
||||
Source string `json:"source,omitempty"`
|
||||
}
|
||||
|
||||
// Join with the node registry when we have one (distributed mode). A
|
||||
// single registry fetch + map join beats per-model queries for the
|
||||
// 100-model case.
|
||||
var loadedByModel map[string][]loadedOn
|
||||
if ds := applicationInstance.Distributed(); ds != nil && ds.Registry != nil {
|
||||
nodeModels, err := ds.Registry.ListAllLoadedModels(c.Request().Context())
|
||||
if err == nil {
|
||||
allNodes, _ := ds.Registry.List(c.Request().Context())
|
||||
nameByID := make(map[string]string, len(allNodes))
|
||||
statusByID := make(map[string]string, len(allNodes))
|
||||
for _, n := range allNodes {
|
||||
nameByID[n.ID] = n.Name
|
||||
statusByID[n.ID] = n.Status
|
||||
}
|
||||
loadedByModel = make(map[string][]loadedOn)
|
||||
for _, nm := range nodeModels {
|
||||
loadedByModel[nm.ModelName] = append(loadedByModel[nm.ModelName], loadedOn{
|
||||
NodeID: nm.NodeID,
|
||||
NodeName: nameByID[nm.NodeID],
|
||||
State: nm.State,
|
||||
NodeStatus: statusByID[nm.NodeID],
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]modelCapability, 0, len(modelConfigs)+len(modelsWithoutConfig))
|
||||
seen := make(map[string]bool, len(modelConfigs)+len(modelsWithoutConfig))
|
||||
for _, cfg := range modelConfigs {
|
||||
seen[cfg.Name] = true
|
||||
result = append(result, modelCapability{
|
||||
ID: cfg.Name,
|
||||
Capabilities: cfg.KnownUsecaseStrings,
|
||||
Backend: cfg.Backend,
|
||||
Disabled: cfg.IsDisabled(),
|
||||
Pinned: cfg.IsPinned(),
|
||||
LoadedOn: loadedByModel[cfg.Name],
|
||||
})
|
||||
}
|
||||
for _, name := range modelsWithoutConfig {
|
||||
seen[name] = true
|
||||
result = append(result, modelCapability{
|
||||
ID: name,
|
||||
Capabilities: []string{},
|
||||
LoadedOn: loadedByModel[name],
|
||||
})
|
||||
}
|
||||
// Emit entries for cluster models that have no local config — these
|
||||
// are the actual ghosts. Without this the operator would have no way
|
||||
// to see a model the cluster is running if its config file wasn't
|
||||
// synced to this frontend's filesystem.
|
||||
for name, loc := range loadedByModel {
|
||||
if seen[name] {
|
||||
continue
|
||||
}
|
||||
result = append(result, modelCapability{
|
||||
ID: name,
|
||||
Capabilities: []string{},
|
||||
LoadedOn: loc,
|
||||
Source: "registry-only",
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -11,4 +11,5 @@ const (
|
||||
KeyHealthCheck int64 = 104
|
||||
KeySchemaMigrate int64 = 105
|
||||
KeyBackendUpgradeCheck int64 = 106
|
||||
KeyStateReconciler int64 = 107
|
||||
)
|
||||
|
||||
@@ -57,6 +57,16 @@ func (g *GalleryService) SetBackendManager(b BackendManager) {
|
||||
g.backendManager = b
|
||||
}
|
||||
|
||||
// BackendManager returns the current backend manager. Callers like the
|
||||
// periodic upgrade checker need this so they run CheckUpgrades through the
|
||||
// distributed implementation (which asks workers) instead of the frontend's
|
||||
// local filesystem — the latter is always empty in distributed deployments.
|
||||
func (g *GalleryService) BackendManager() BackendManager {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
return g.backendManager
|
||||
}
|
||||
|
||||
// SetNATSClient sets the NATS client for distributed progress publishing.
|
||||
func (g *GalleryService) SetNATSClient(nc messaging.Publisher) {
|
||||
g.Lock()
|
||||
|
||||
@@ -157,6 +157,12 @@ type NodeBackendInfo struct {
|
||||
IsMeta bool `json:"is_meta"`
|
||||
InstalledAt string `json:"installed_at,omitempty"`
|
||||
GalleryURL string `json:"gallery_url,omitempty"`
|
||||
// Version, URI and Digest enable cluster-wide upgrade detection —
|
||||
// without them, the frontend cannot tell whether the installed OCI
|
||||
// image matches the gallery entry, and upgrades silently never surface.
|
||||
Version string `json:"version,omitempty"`
|
||||
URI string `json:"uri,omitempty"`
|
||||
Digest string `json:"digest,omitempty"`
|
||||
}
|
||||
|
||||
// SubjectNodeBackendStop tells a worker node to stop its gRPC backend process.
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/mudler/LocalAI/core/gallery"
|
||||
"github.com/mudler/LocalAI/core/services/galleryop"
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/LocalAI/pkg/system"
|
||||
"github.com/mudler/xlog"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
@@ -53,6 +54,7 @@ type DistributedBackendManager struct {
|
||||
adapter *RemoteUnloaderAdapter
|
||||
registry *NodeRegistry
|
||||
backendGalleries []config.Gallery
|
||||
systemState *system.SystemState
|
||||
}
|
||||
|
||||
// NewDistributedBackendManager creates a DistributedBackendManager.
|
||||
@@ -62,46 +64,168 @@ func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model
|
||||
adapter: adapter,
|
||||
registry: registry,
|
||||
backendGalleries: appConfig.BackendGalleries,
|
||||
systemState: appConfig.SystemState,
|
||||
}
|
||||
}
|
||||
|
||||
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
|
||||
// Returned as part of BackendOpResult so the frontend can surface exactly
|
||||
// what happened on each worker instead of a single joined error string.
|
||||
type NodeOpStatus struct {
|
||||
NodeID string `json:"node_id"`
|
||||
NodeName string `json:"node_name"`
|
||||
Status string `json:"status"` // "success" | "queued" | "error"
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// BackendOpResult aggregates per-node outcomes.
|
||||
type BackendOpResult struct {
|
||||
Nodes []NodeOpStatus `json:"nodes"`
|
||||
}
|
||||
|
||||
// enqueueAndDrainBackendOp is the shared scaffolding for
|
||||
// delete/install/upgrade. Every non-pending node gets a pending_backend_ops
|
||||
// row (intent is durable even if the node is offline). Currently-healthy
|
||||
// nodes get an immediate attempt; success deletes the row, failure records
|
||||
// the error and leaves the row for the reconciler to retry.
|
||||
//
|
||||
// `apply` is the NATS round-trip for one node. Returning an error keeps the
|
||||
// row in the queue and marks the per-node status as "error"; returning nil
|
||||
// deletes the row and reports "success". For non-healthy nodes the status
|
||||
// is "queued" — no attempt is made right now, reconciler will pick it up
|
||||
// when the node returns.
|
||||
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) {
|
||||
allNodes, err := d.registry.List(ctx)
|
||||
if err != nil {
|
||||
return BackendOpResult{}, err
|
||||
}
|
||||
|
||||
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
|
||||
for _, node := range allNodes {
|
||||
// Pending nodes haven't been approved yet — no intent to apply.
|
||||
if node.Status == StatusPending {
|
||||
continue
|
||||
}
|
||||
// Backend lifecycle ops only make sense on backend-type workers.
|
||||
// Agent workers don't subscribe to backend.install/delete/list, so
|
||||
// enqueueing for them guarantees a forever-retrying row that the
|
||||
// reconciler can never drain. Silently skip — they aren't consumers.
|
||||
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
|
||||
continue
|
||||
}
|
||||
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
|
||||
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
|
||||
result.Nodes = append(result.Nodes, NodeOpStatus{
|
||||
NodeID: node.ID, NodeName: node.Name, Status: "error",
|
||||
Error: fmt.Sprintf("enqueue failed: %v", err),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if node.Status != StatusHealthy {
|
||||
// Intent is recorded; reconciler will retry when the node recovers.
|
||||
result.Nodes = append(result.Nodes, NodeOpStatus{
|
||||
NodeID: node.ID, NodeName: node.Name, Status: "queued",
|
||||
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
applyErr := apply(node)
|
||||
if applyErr == nil {
|
||||
// Find the row we just upserted and delete it; cheap but requires
|
||||
// a lookup since UpsertPendingBackendOp doesn't return the ID.
|
||||
if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil {
|
||||
xlog.Debug("Failed to clear pending backend op after success", "error", err)
|
||||
}
|
||||
result.Nodes = append(result.Nodes, NodeOpStatus{
|
||||
NodeID: node.ID, NodeName: node.Name, Status: "success",
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Record failure for backoff. If it's an ErrNoResponders, the node's
|
||||
// gone AWOL — mark unhealthy so the router stops picking it too.
|
||||
errMsg := applyErr.Error()
|
||||
if errors.Is(applyErr, nats.ErrNoResponders) {
|
||||
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
||||
d.registry.MarkUnhealthy(ctx, node.ID)
|
||||
}
|
||||
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
|
||||
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
|
||||
}
|
||||
result.Nodes = append(result.Nodes, NodeOpStatus{
|
||||
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// findPendingRow looks up the ID of a pending_backend_ops row by its
|
||||
// composite key. Used to hand off to RecordPendingBackendOpFailure /
|
||||
// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same
|
||||
// composite key.
|
||||
func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) {
|
||||
var row PendingBackendOp
|
||||
if err := d.registry.db.WithContext(ctx).
|
||||
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
|
||||
First(&row).Error; err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return row.ID, nil
|
||||
}
|
||||
|
||||
// deletePendingRow removes the queue row keyed by (nodeID, backend, op).
|
||||
func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error {
|
||||
return d.registry.db.WithContext(ctx).
|
||||
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
|
||||
Delete(&PendingBackendOp{}).Error
|
||||
}
|
||||
|
||||
// DeleteBackend fans out backend deletion to every known node. The previous
|
||||
// implementation silently skipped non-healthy nodes, which meant zombies
|
||||
// reappeared once those nodes returned. Now the intent is durable — see
|
||||
// enqueueAndDrainBackendOp — and the reconciler catches up later.
|
||||
func (d *DistributedBackendManager) DeleteBackend(name string) error {
|
||||
// Try local deletion but ignore "not found" errors — in distributed mode
|
||||
// the frontend node typically doesn't have backends installed locally;
|
||||
// they only exist on worker nodes.
|
||||
// Local delete first (frontend rarely has backends installed in
|
||||
// distributed mode, but the gallery operation still expects it; ignore
|
||||
// "not found" which is the common case).
|
||||
if err := d.local.DeleteBackend(name); err != nil {
|
||||
if !errors.Is(err, gallery.ErrBackendNotFound) {
|
||||
return err
|
||||
}
|
||||
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
|
||||
}
|
||||
// Fan out backend.delete to all healthy nodes
|
||||
allNodes, listErr := d.registry.List(context.Background())
|
||||
if listErr != nil {
|
||||
xlog.Warn("Failed to list nodes for backend deletion fan-out", "error", listErr)
|
||||
return listErr
|
||||
}
|
||||
var errs []error
|
||||
for _, node := range allNodes {
|
||||
if node.Status != StatusHealthy {
|
||||
continue
|
||||
}
|
||||
if _, delErr := d.adapter.DeleteBackend(node.ID, name); delErr != nil {
|
||||
if errors.Is(delErr, nats.ErrNoResponders) {
|
||||
// Node's NATS subscription is gone — likely restarted with a new ID.
|
||||
// Mark it unhealthy so future fan-outs skip it.
|
||||
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
||||
d.registry.MarkUnhealthy(context.Background(), node.ID)
|
||||
continue
|
||||
}
|
||||
xlog.Warn("Failed to propagate backend deletion to worker", "node", node.Name, "backend", name, "error", delErr)
|
||||
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, delErr))
|
||||
}
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
|
||||
_, err := d.adapter.DeleteBackend(node.ID, name)
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// ListBackends aggregates installed backends from all healthy worker nodes.
|
||||
// DeleteBackendDetailed is the per-node-result variant called by the HTTP
|
||||
// handler so the UI can render a per-node status drawer. DeleteBackend still
|
||||
// returns error-only for callers that don't care about node breakdown.
|
||||
func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) {
|
||||
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
|
||||
return BackendOpResult{}, err
|
||||
}
|
||||
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
|
||||
_, err := d.adapter.DeleteBackend(node.ID, name)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// ListBackends aggregates installed backends from all worker nodes, preserving
|
||||
// per-node attribution. Each SystemBackend.Nodes entry records which node has
|
||||
// the backend and the version/digest it reports. The top-level Metadata is
|
||||
// populated from the first node seen so single-node-minded callers still work.
|
||||
//
|
||||
// Pending/offline/draining nodes are skipped because they aren't expected to
|
||||
// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders
|
||||
// then marks them unhealthy and the loop continues.
|
||||
func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) {
|
||||
result := make(gallery.SystemBackends)
|
||||
allNodes, err := d.registry.List(context.Background())
|
||||
@@ -110,7 +234,7 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
|
||||
}
|
||||
|
||||
for _, node := range allNodes {
|
||||
if node.Status != StatusHealthy {
|
||||
if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining {
|
||||
continue
|
||||
}
|
||||
reply, err := d.adapter.ListBackends(node.ID)
|
||||
@@ -128,89 +252,92 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
|
||||
continue
|
||||
}
|
||||
for _, b := range reply.Backends {
|
||||
if _, exists := result[b.Name]; !exists {
|
||||
result[b.Name] = gallery.SystemBackend{
|
||||
ref := gallery.NodeBackendRef{
|
||||
NodeID: node.ID,
|
||||
NodeName: node.Name,
|
||||
NodeStatus: node.Status,
|
||||
Version: b.Version,
|
||||
Digest: b.Digest,
|
||||
URI: b.URI,
|
||||
InstalledAt: b.InstalledAt,
|
||||
}
|
||||
entry, exists := result[b.Name]
|
||||
if !exists {
|
||||
entry = gallery.SystemBackend{
|
||||
Name: b.Name,
|
||||
IsSystem: b.IsSystem,
|
||||
IsMeta: b.IsMeta,
|
||||
Metadata: &gallery.BackendMetadata{
|
||||
Name: b.Name,
|
||||
InstalledAt: b.InstalledAt,
|
||||
GalleryURL: b.GalleryURL,
|
||||
Version: b.Version,
|
||||
URI: b.URI,
|
||||
Digest: b.Digest,
|
||||
},
|
||||
}
|
||||
}
|
||||
entry.Nodes = append(entry.Nodes, ref)
|
||||
result[b.Name] = entry
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// InstallBackend fans out backend installation to all healthy worker nodes.
|
||||
// InstallBackend fans out installation through the pending-ops queue so
|
||||
// non-healthy nodes get retried when they come back instead of being silently
|
||||
// skipped. Reply success from the NATS round-trip deletes the queue row;
|
||||
// reply.Success==false is treated as an error so the row stays for retry.
|
||||
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
|
||||
allNodes, err := d.registry.List(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
galleriesJSON, _ := json.Marshal(op.Galleries)
|
||||
backendName := op.GalleryElementName
|
||||
|
||||
for _, node := range allNodes {
|
||||
if node.Status != StatusHealthy {
|
||||
continue
|
||||
}
|
||||
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error {
|
||||
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON))
|
||||
if err != nil {
|
||||
if errors.Is(err, nats.ErrNoResponders) {
|
||||
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
||||
d.registry.MarkUnhealthy(context.Background(), node.ID)
|
||||
continue
|
||||
}
|
||||
xlog.Warn("Failed to install backend on worker", "node", node.Name, "backend", backendName, "error", err)
|
||||
continue
|
||||
return err
|
||||
}
|
||||
if !reply.Success {
|
||||
xlog.Warn("Backend install failed on worker", "node", node.Name, "backend", backendName, "error", reply.Error)
|
||||
return fmt.Errorf("install failed: %s", reply.Error)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// UpgradeBackend fans out a backend upgrade to all healthy worker nodes.
|
||||
// TODO: Add dedicated NATS subject for upgrade (currently reuses install with force flag)
|
||||
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
|
||||
// from the gallery). Same queue semantics as Install/Delete.
|
||||
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
|
||||
allNodes, err := d.registry.List(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
galleriesJSON, _ := json.Marshal(d.backendGalleries)
|
||||
var errs []error
|
||||
|
||||
for _, node := range allNodes {
|
||||
if node.Status != StatusHealthy {
|
||||
continue
|
||||
}
|
||||
// Reuse install endpoint which will re-download the backend (force mode)
|
||||
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error {
|
||||
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON))
|
||||
if err != nil {
|
||||
if errors.Is(err, nats.ErrNoResponders) {
|
||||
xlog.Warn("No NATS responders for node during upgrade, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
||||
d.registry.MarkUnhealthy(context.Background(), node.ID)
|
||||
continue
|
||||
}
|
||||
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, err))
|
||||
continue
|
||||
return err
|
||||
}
|
||||
if !reply.Success {
|
||||
errs = append(errs, fmt.Errorf("node %s: %s", node.Name, reply.Error))
|
||||
return fmt.Errorf("upgrade failed: %s", reply.Error)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Join(errs...)
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// CheckUpgrades checks for available backend upgrades.
|
||||
// Gallery comparison is global (not per-node), so we delegate to the local manager.
|
||||
// CheckUpgrades checks for available backend upgrades across the cluster.
|
||||
//
|
||||
// The previous implementation delegated to d.local, which called
|
||||
// ListSystemBackends on the frontend — but in distributed mode the frontend
|
||||
// has no backends installed locally, so the upgrade loop never ran and the UI
|
||||
// never surfaced any upgrades. We now feed the cluster-wide aggregation
|
||||
// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so
|
||||
// digest-based detection actually works and cluster drift is visible.
|
||||
func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) {
|
||||
return d.local.CheckUpgrades(ctx)
|
||||
installed, err := d.ListBackends()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// systemState is used by AvailableBackends (gallery paths + meta-backend
|
||||
// resolution). The `installed` argument is what the old code got wrong —
|
||||
// it used to come from the empty frontend filesystem.
|
||||
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
|
||||
}
|
||||
|
||||
@@ -3,26 +3,59 @@ package nodes
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/advisorylock"
|
||||
grpcclient "github.com/mudler/LocalAI/pkg/grpc"
|
||||
"github.com/mudler/xlog"
|
||||
"github.com/nats-io/nats.go"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// ModelProber checks whether a model's backend process is still reachable.
|
||||
// Defaulted to a gRPC health probe but overridable for tests so we don't
|
||||
// need to stand up a real server. Returning false without an error means the
|
||||
// process is reachable but unhealthy (same as a timeout for our purposes).
|
||||
type ModelProber interface {
|
||||
IsAlive(ctx context.Context, address string) bool
|
||||
}
|
||||
|
||||
// grpcModelProber does a 1s HealthCheck on the model's stored gRPC address.
|
||||
type grpcModelProber struct{ token string }
|
||||
|
||||
func (g grpcModelProber) IsAlive(ctx context.Context, address string) bool {
|
||||
client := grpcclient.NewClientWithToken(address, false, nil, false, g.token)
|
||||
probeCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
|
||||
defer cancel()
|
||||
ok, _ := client.HealthCheck(probeCtx)
|
||||
return ok
|
||||
}
|
||||
|
||||
// ReplicaReconciler periodically ensures model replica counts match their
|
||||
// scheduling configs. It scales up replicas when below MinReplicas or when
|
||||
// all replicas are busy (up to MaxReplicas), and scales down idle replicas
|
||||
// above MinReplicas.
|
||||
//
|
||||
// Alongside replica scaling it runs two state-reconciliation passes — draining
|
||||
// the pending_backend_ops queue and probing loaded models' gRPC addresses to
|
||||
// orphan ghosts. Both passes are wrapped in the KeyStateReconciler advisory
|
||||
// lock so N frontends don't stampede.
|
||||
//
|
||||
// Only processes models with auto-scaling enabled (MinReplicas > 0 or MaxReplicas > 0).
|
||||
type ReplicaReconciler struct {
|
||||
registry *NodeRegistry
|
||||
scheduler ModelScheduler // interface for scheduling new models
|
||||
unloader NodeCommandSender
|
||||
adapter *RemoteUnloaderAdapter // NATS sender for pending-op drain
|
||||
prober ModelProber // health probe for model gRPC addrs
|
||||
db *gorm.DB
|
||||
interval time.Duration
|
||||
scaleDownDelay time.Duration
|
||||
// probeStaleAfter: only probe node_models rows older than this so we
|
||||
// don't hammer every worker every tick for models we just heard from.
|
||||
probeStaleAfter time.Duration
|
||||
}
|
||||
|
||||
// ModelScheduler abstracts the scheduling logic needed by the reconciler.
|
||||
@@ -35,12 +68,21 @@ type ModelScheduler interface {
|
||||
|
||||
// ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler.
|
||||
type ReplicaReconcilerOptions struct {
|
||||
Registry *NodeRegistry
|
||||
Scheduler ModelScheduler
|
||||
Unloader NodeCommandSender
|
||||
DB *gorm.DB
|
||||
Interval time.Duration // default 30s
|
||||
ScaleDownDelay time.Duration // default 5m
|
||||
Registry *NodeRegistry
|
||||
Scheduler ModelScheduler
|
||||
Unloader NodeCommandSender
|
||||
// Adapter is the NATS sender used to retry pending backend ops. When nil,
|
||||
// the state-reconciler pending-drain pass is a no-op (single-node mode).
|
||||
Adapter *RemoteUnloaderAdapter
|
||||
// RegistrationToken is used by the default gRPC prober when probing model
|
||||
// addresses. Matches the worker's token so HealthCheck auth succeeds.
|
||||
RegistrationToken string
|
||||
// Prober overrides the default gRPC health probe (used by tests).
|
||||
Prober ModelProber
|
||||
DB *gorm.DB
|
||||
Interval time.Duration // default 30s
|
||||
ScaleDownDelay time.Duration // default 5m
|
||||
ProbeStaleAfter time.Duration // default 2m
|
||||
}
|
||||
|
||||
// NewReplicaReconciler creates a new ReplicaReconciler.
|
||||
@@ -53,13 +95,24 @@ func NewReplicaReconciler(opts ReplicaReconcilerOptions) *ReplicaReconciler {
|
||||
if scaleDownDelay == 0 {
|
||||
scaleDownDelay = 5 * time.Minute
|
||||
}
|
||||
probeStaleAfter := opts.ProbeStaleAfter
|
||||
if probeStaleAfter == 0 {
|
||||
probeStaleAfter = 2 * time.Minute
|
||||
}
|
||||
prober := opts.Prober
|
||||
if prober == nil {
|
||||
prober = grpcModelProber{token: opts.RegistrationToken}
|
||||
}
|
||||
return &ReplicaReconciler{
|
||||
registry: opts.Registry,
|
||||
scheduler: opts.Scheduler,
|
||||
unloader: opts.Unloader,
|
||||
db: opts.DB,
|
||||
interval: interval,
|
||||
scaleDownDelay: scaleDownDelay,
|
||||
registry: opts.Registry,
|
||||
scheduler: opts.Scheduler,
|
||||
unloader: opts.Unloader,
|
||||
adapter: opts.Adapter,
|
||||
prober: prober,
|
||||
db: opts.DB,
|
||||
interval: interval,
|
||||
scaleDownDelay: scaleDownDelay,
|
||||
probeStaleAfter: probeStaleAfter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,17 +131,157 @@ func (rc *ReplicaReconciler) Run(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileOnce performs a single reconciliation pass.
|
||||
// Uses an advisory lock so only one frontend instance reconciles at a time.
|
||||
// reconcileOnce performs a single reconciliation pass. Replica work and
|
||||
// state-reconciliation work run under *different* advisory locks so multiple
|
||||
// frontends can share load across passes, and one long-running pass doesn't
|
||||
// block the other forever if a frontend wedges.
|
||||
func (rc *ReplicaReconciler) reconcileOnce(ctx context.Context) {
|
||||
if rc.db != nil {
|
||||
lockKey := advisorylock.KeyFromString("replica-reconciler")
|
||||
_ = advisorylock.WithLockCtx(ctx, rc.db, lockKey, func() error {
|
||||
replicaKey := advisorylock.KeyFromString("replica-reconciler")
|
||||
_ = advisorylock.WithLockCtx(ctx, rc.db, replicaKey, func() error {
|
||||
rc.reconcile(ctx)
|
||||
return nil
|
||||
})
|
||||
// Try, don't block: if another frontend is already running the state
|
||||
// pass, this tick is a no-op. Matches the health monitor pattern.
|
||||
_, _ = advisorylock.TryWithLockCtx(ctx, rc.db, advisorylock.KeyStateReconciler, func() error {
|
||||
rc.reconcileState(ctx)
|
||||
return nil
|
||||
})
|
||||
} else {
|
||||
rc.reconcile(ctx)
|
||||
rc.reconcileState(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileState runs the state-reconciliation passes: drain pending backend
|
||||
// ops for freshly-healthy nodes, then probe model gRPC addresses to orphan
|
||||
// ghosts. Both passes are best-effort: a failure on one node doesn't stop
|
||||
// the rest.
|
||||
func (rc *ReplicaReconciler) reconcileState(ctx context.Context) {
|
||||
if rc.adapter != nil {
|
||||
rc.drainPendingBackendOps(ctx)
|
||||
}
|
||||
rc.probeLoadedModels(ctx)
|
||||
}
|
||||
|
||||
// drainPendingBackendOps retries queued backend ops whose next_retry_at has
|
||||
// passed on nodes that are currently healthy. On success the row is deleted;
|
||||
// on failure attempts++ and next_retry_at moves out via exponential backoff.
|
||||
func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
|
||||
ops, err := rc.registry.ListDuePendingBackendOps(ctx)
|
||||
if err != nil {
|
||||
xlog.Warn("Reconciler: failed to list pending backend ops", "error", err)
|
||||
return
|
||||
}
|
||||
if len(ops) == 0 {
|
||||
return
|
||||
}
|
||||
xlog.Debug("Reconciler: draining pending backend ops", "count", len(ops))
|
||||
|
||||
for _, op := range ops {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
var applyErr error
|
||||
switch op.Op {
|
||||
case OpBackendDelete:
|
||||
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
|
||||
case OpBackendInstall, OpBackendUpgrade:
|
||||
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries))
|
||||
if err != nil {
|
||||
applyErr = err
|
||||
} else if !reply.Success {
|
||||
applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error)
|
||||
}
|
||||
default:
|
||||
xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
if applyErr == nil {
|
||||
if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil {
|
||||
xlog.Warn("Reconciler: failed to delete drained op row", "id", op.ID, "error", err)
|
||||
} else {
|
||||
xlog.Info("Reconciler: pending backend op applied",
|
||||
"op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// ErrNoResponders means the node has no active NATS subscription for
|
||||
// this subject. Either its connection dropped, or it's the wrong
|
||||
// node type entirely. Mark unhealthy so the health monitor's
|
||||
// heartbeat-only pass doesn't immediately flip it back — and so
|
||||
// ListDuePendingBackendOps (which filters by status=healthy) stops
|
||||
// picking the row until the node genuinely recovers.
|
||||
if errors.Is(applyErr, nats.ErrNoResponders) {
|
||||
xlog.Warn("Reconciler: no NATS responders — marking node unhealthy",
|
||||
"op", op.Op, "backend", op.Backend, "node", op.NodeID)
|
||||
_ = rc.registry.MarkUnhealthy(ctx, op.NodeID)
|
||||
}
|
||||
|
||||
// Dead-letter cap: after maxAttempts the row is the reconciler
|
||||
// equivalent of a poison message. Delete it loudly so the queue
|
||||
// doesn't churn NATS every tick forever — operators can re-issue
|
||||
// the op from the UI if they still want it applied.
|
||||
if op.Attempts+1 >= maxPendingBackendOpAttempts {
|
||||
xlog.Error("Reconciler: abandoning pending backend op after max attempts",
|
||||
"op", op.Op, "backend", op.Backend, "node", op.NodeID,
|
||||
"attempts", op.Attempts+1, "last_error", applyErr)
|
||||
if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil {
|
||||
xlog.Warn("Reconciler: failed to delete abandoned op row", "id", op.ID, "error", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
_ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error())
|
||||
xlog.Warn("Reconciler: pending backend op retry failed",
|
||||
"op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr)
|
||||
}
|
||||
}
|
||||
|
||||
// maxPendingBackendOpAttempts caps how many times the reconciler retries a
|
||||
// failing row before dead-lettering it. Ten attempts at exponential backoff
|
||||
// (30s → 15m cap) is >1h of wall-clock patience — well past any transient
|
||||
// worker restart or network blip. Poisoned rows beyond that are almost
|
||||
// certainly structural (wrong node type, non-existent gallery entry) and no
|
||||
// amount of further retrying will help.
|
||||
const maxPendingBackendOpAttempts = 10
|
||||
|
||||
// probeLoadedModels gRPC-health-checks model addresses that the DB says are
|
||||
// loaded. If a model's backend process is gone (OOM, crash, manual restart)
|
||||
// we remove the row so ghosts don't linger. Only probes rows older than
|
||||
// probeStaleAfter so we don't hammer every worker every tick for models we
|
||||
// just heard from.
|
||||
func (rc *ReplicaReconciler) probeLoadedModels(ctx context.Context) {
|
||||
var stale []NodeModel
|
||||
cutoff := time.Now().Add(-rc.probeStaleAfter)
|
||||
err := rc.registry.db.WithContext(ctx).
|
||||
Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id").
|
||||
Where("node_models.state = ? AND backend_nodes.status = ? AND node_models.updated_at < ? AND node_models.address != ''",
|
||||
"loaded", StatusHealthy, cutoff).
|
||||
Find(&stale).Error
|
||||
if err != nil {
|
||||
xlog.Warn("Reconciler: failed to list loaded models for probe", "error", err)
|
||||
return
|
||||
}
|
||||
for _, m := range stale {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
if rc.prober.IsAlive(ctx, m.Address) {
|
||||
// Bump updated_at so we don't probe this row again immediately.
|
||||
_ = rc.registry.db.WithContext(ctx).Model(&NodeModel{}).
|
||||
Where("id = ?", m.ID).Update("updated_at", time.Now()).Error
|
||||
continue
|
||||
}
|
||||
if err := rc.registry.RemoveNodeModel(ctx, m.NodeID, m.ModelName); err != nil {
|
||||
xlog.Warn("Reconciler: failed to remove unreachable model", "node", m.NodeID, "model", m.ModelName, "error", err)
|
||||
continue
|
||||
}
|
||||
xlog.Warn("Reconciler: model unreachable, removed from registry",
|
||||
"node", m.NodeID, "model", m.ModelName, "address", m.Address)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -239,3 +239,164 @@ var _ = Describe("ReplicaReconciler", func() {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// fakeProber lets tests control whether a model's gRPC address "responds".
|
||||
type fakeProber struct {
|
||||
alive map[string]bool
|
||||
calls int
|
||||
}
|
||||
|
||||
func (f *fakeProber) IsAlive(_ context.Context, address string) bool {
|
||||
f.calls++
|
||||
if f.alive == nil {
|
||||
return false
|
||||
}
|
||||
return f.alive[address]
|
||||
}
|
||||
|
||||
var _ = Describe("ReplicaReconciler — state reconciliation", func() {
|
||||
var (
|
||||
db *gorm.DB
|
||||
registry *NodeRegistry
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
if runtime.GOOS == "darwin" {
|
||||
Skip("testcontainers requires Docker, not available on macOS CI")
|
||||
}
|
||||
db = testutil.SetupTestDB()
|
||||
var err error
|
||||
registry, err = NewNodeRegistry(db)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
Describe("probeLoadedModels", func() {
|
||||
It("removes loaded models whose gRPC address is unreachable", func() {
|
||||
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
|
||||
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
||||
// Two loaded models — one stale (will probe), one fresh (skipped).
|
||||
stale := &NodeModel{
|
||||
ID: "stale-1",
|
||||
NodeID: node.ID,
|
||||
ModelName: "stale-model",
|
||||
Address: "10.0.0.1:12345",
|
||||
State: "loaded",
|
||||
UpdatedAt: time.Now().Add(-5 * time.Minute),
|
||||
}
|
||||
fresh := &NodeModel{
|
||||
ID: "fresh-1",
|
||||
NodeID: node.ID,
|
||||
ModelName: "fresh-model",
|
||||
Address: "10.0.0.1:54321",
|
||||
State: "loaded",
|
||||
UpdatedAt: time.Now(), // within probeStaleAfter
|
||||
}
|
||||
Expect(db.Create(stale).Error).To(Succeed())
|
||||
Expect(db.Create(fresh).Error).To(Succeed())
|
||||
|
||||
prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": false}}
|
||||
rc := NewReplicaReconciler(ReplicaReconcilerOptions{
|
||||
Registry: registry,
|
||||
DB: db,
|
||||
Prober: prober,
|
||||
ProbeStaleAfter: 2 * time.Minute,
|
||||
})
|
||||
|
||||
rc.probeLoadedModels(context.Background())
|
||||
|
||||
// Stale was unreachable — row removed.
|
||||
var after []NodeModel
|
||||
Expect(db.Find(&after).Error).To(Succeed())
|
||||
Expect(after).To(HaveLen(1))
|
||||
Expect(after[0].ModelName).To(Equal("fresh-model"))
|
||||
// Prober was only called once (the fresh row was filtered out).
|
||||
Expect(prober.calls).To(Equal(1))
|
||||
})
|
||||
|
||||
It("keeps reachable models and bumps their updated_at", func() {
|
||||
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
|
||||
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
||||
stale := &NodeModel{
|
||||
ID: "stale-2",
|
||||
NodeID: node.ID,
|
||||
ModelName: "alive-model",
|
||||
Address: "10.0.0.1:12345",
|
||||
State: "loaded",
|
||||
UpdatedAt: time.Now().Add(-5 * time.Minute),
|
||||
}
|
||||
Expect(db.Create(stale).Error).To(Succeed())
|
||||
|
||||
prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": true}}
|
||||
rc := NewReplicaReconciler(ReplicaReconcilerOptions{
|
||||
Registry: registry,
|
||||
DB: db,
|
||||
Prober: prober,
|
||||
ProbeStaleAfter: 2 * time.Minute,
|
||||
})
|
||||
|
||||
rc.probeLoadedModels(context.Background())
|
||||
|
||||
var after NodeModel
|
||||
Expect(db.First(&after, "id = ?", "stale-2").Error).To(Succeed())
|
||||
Expect(after.UpdatedAt).To(BeTemporally("~", time.Now(), time.Second))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("UpsertPendingBackendOp + RecordPendingBackendOpFailure", func() {
|
||||
It("upserts on the composite key rather than duplicating rows", func() {
|
||||
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
|
||||
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
||||
|
||||
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
|
||||
// Second call for the same (node, backend, op) should not create a
|
||||
// new row — that's how re-issuing a delete works.
|
||||
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
|
||||
|
||||
var rows []PendingBackendOp
|
||||
Expect(db.Find(&rows).Error).To(Succeed())
|
||||
Expect(rows).To(HaveLen(1))
|
||||
})
|
||||
|
||||
It("increments attempts and moves next_retry_at out on failure", func() {
|
||||
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
|
||||
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
||||
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
|
||||
|
||||
var row PendingBackendOp
|
||||
Expect(db.First(&row).Error).To(Succeed())
|
||||
before := row.NextRetryAt
|
||||
|
||||
Expect(registry.RecordPendingBackendOpFailure(context.Background(), row.ID, "boom")).To(Succeed())
|
||||
Expect(db.First(&row, row.ID).Error).To(Succeed())
|
||||
Expect(row.Attempts).To(Equal(1))
|
||||
Expect(row.LastError).To(Equal("boom"))
|
||||
Expect(row.NextRetryAt).To(BeTemporally(">", before))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("NewNodeRegistry malformed-row pruning", func() {
|
||||
It("drops queue rows for agent nodes and non-existent nodes on startup", func() {
|
||||
agent := &BackendNode{Name: "agent-1", NodeType: NodeTypeAgent, Address: "x"}
|
||||
Expect(registry.Register(context.Background(), agent, true)).To(Succeed())
|
||||
backend := &BackendNode{Name: "backend-1", NodeType: NodeTypeBackend, Address: "y"}
|
||||
Expect(registry.Register(context.Background(), backend, true)).To(Succeed())
|
||||
|
||||
// Three rows: one for a valid backend node (should survive),
|
||||
// one for an agent node (pruned), one for an empty backend name
|
||||
// on the valid node (pruned).
|
||||
Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "foo", OpBackendInstall, nil)).To(Succeed())
|
||||
Expect(registry.UpsertPendingBackendOp(context.Background(), agent.ID, "foo", OpBackendInstall, nil)).To(Succeed())
|
||||
Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "", OpBackendInstall, nil)).To(Succeed())
|
||||
|
||||
// Re-instantiating the registry runs the cleanup migration.
|
||||
_, err := NewNodeRegistry(db)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
var rows []PendingBackendOp
|
||||
Expect(db.Find(&rows).Error).To(Succeed())
|
||||
Expect(rows).To(HaveLen(1))
|
||||
Expect(rows[0].NodeID).To(Equal(backend.ID))
|
||||
Expect(rows[0].Backend).To(Equal("foo"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -104,6 +104,36 @@ type NodeWithExtras struct {
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
}
|
||||
|
||||
// PendingBackendOp is a durable intent for a backend lifecycle operation
|
||||
// (delete/install/upgrade) that needs to eventually apply on a specific node.
|
||||
//
|
||||
// Without this table, a backend delete against an offline node silently
|
||||
// dropped: the frontend skipped the node, the node came back later with the
|
||||
// backend still installed, and the operator saw a zombie. Now the intent is
|
||||
// recorded regardless of node status; the state reconciler drains the queue
|
||||
// whenever a node is healthy and removes the row on success. Reissuing the
|
||||
// same operation while a row exists updates NextRetryAt instead of stacking
|
||||
// duplicates (see the unique index).
|
||||
type PendingBackendOp struct {
|
||||
ID uint `gorm:"primaryKey;autoIncrement" json:"id"`
|
||||
NodeID string `gorm:"index;size:36;not null;uniqueIndex:idx_pending_backend_op,priority:1" json:"node_id"`
|
||||
Backend string `gorm:"index;size:255;not null;uniqueIndex:idx_pending_backend_op,priority:2" json:"backend"`
|
||||
Op string `gorm:"size:16;not null;uniqueIndex:idx_pending_backend_op,priority:3" json:"op"`
|
||||
Galleries []byte `gorm:"type:bytea" json:"-"` // serialized JSON for install/upgrade retries
|
||||
Attempts int `gorm:"default:0" json:"attempts"`
|
||||
LastError string `gorm:"type:text" json:"last_error,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
NextRetryAt time.Time `gorm:"index" json:"next_retry_at"`
|
||||
}
|
||||
|
||||
// Op constants mirror the operation names used by DistributedBackendManager
|
||||
// so callers don't repeat stringly-typed values.
|
||||
const (
|
||||
OpBackendDelete = "delete"
|
||||
OpBackendInstall = "install"
|
||||
OpBackendUpgrade = "upgrade"
|
||||
)
|
||||
|
||||
// NodeRegistry manages backend node registration and lookup in PostgreSQL.
|
||||
type NodeRegistry struct {
|
||||
db *gorm.DB
|
||||
@@ -114,10 +144,34 @@ type NodeRegistry struct {
|
||||
// when multiple instances (frontend + workers) start at the same time.
|
||||
func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) {
|
||||
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
|
||||
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{})
|
||||
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{})
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("migrating node tables: %w", err)
|
||||
}
|
||||
|
||||
// One-shot cleanup of queue rows that can never drain: ops targeted at
|
||||
// agent workers (wrong subscription set), at non-existent nodes, or with
|
||||
// an empty backend name. The guard in enqueueAndDrainBackendOp prevents
|
||||
// new ones from being written, but rows persisted by earlier versions
|
||||
// keep the reconciler busy retrying a permanently-failing NATS request
|
||||
// every 30s. Guarded by the same migration advisory lock so only one
|
||||
// frontend runs it.
|
||||
_ = advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
|
||||
res := db.Exec(`
|
||||
DELETE FROM pending_backend_ops
|
||||
WHERE backend = ''
|
||||
OR node_id NOT IN (SELECT id FROM backend_nodes WHERE node_type = ? OR node_type = '')
|
||||
`, NodeTypeBackend)
|
||||
if res.Error != nil {
|
||||
xlog.Warn("Failed to prune malformed pending_backend_ops rows", "error", res.Error)
|
||||
return res.Error
|
||||
}
|
||||
if res.RowsAffected > 0 {
|
||||
xlog.Info("Pruned pending_backend_ops rows (wrong node type or empty backend)", "count", res.RowsAffected)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return &NodeRegistry{db: db}, nil
|
||||
}
|
||||
|
||||
@@ -946,3 +1000,114 @@ func (r *NodeRegistry) ApplyAutoLabels(ctx context.Context, nodeID string, node
|
||||
_ = r.SetNodeLabel(ctx, nodeID, "node.name", node.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// UpsertPendingBackendOp records or refreshes a pending backend operation for
|
||||
// a node. If a row already exists for (nodeID, backend, op) we keep its
|
||||
// Attempts/LastError but reset NextRetryAt to now, so reissuing the same
|
||||
// delete/upgrade nudges it to the front of the queue instead of stacking a
|
||||
// duplicate intent.
|
||||
func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backend, op string, galleries []byte) error {
|
||||
row := PendingBackendOp{
|
||||
NodeID: nodeID,
|
||||
Backend: backend,
|
||||
Op: op,
|
||||
Galleries: galleries,
|
||||
NextRetryAt: time.Now(),
|
||||
}
|
||||
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
|
||||
}).Create(&row).Error
|
||||
}
|
||||
|
||||
// ListDuePendingBackendOps returns queued ops whose NextRetryAt has passed
|
||||
// AND whose node is currently healthy. The reconciler drains this list; we
|
||||
// filter by node status in the query so a tick doesn't hammer NATS for
|
||||
// nodes that obviously can't answer.
|
||||
func (r *NodeRegistry) ListDuePendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
|
||||
var ops []PendingBackendOp
|
||||
err := r.db.WithContext(ctx).
|
||||
Joins("JOIN backend_nodes ON backend_nodes.id = pending_backend_ops.node_id").
|
||||
Where("pending_backend_ops.next_retry_at <= ? AND backend_nodes.status = ?", time.Now(), StatusHealthy).
|
||||
Order("pending_backend_ops.next_retry_at ASC").
|
||||
Find(&ops).Error
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing due pending backend ops: %w", err)
|
||||
}
|
||||
return ops, nil
|
||||
}
|
||||
|
||||
// ListPendingBackendOps returns every queued row (for the UI "pending on N
|
||||
// nodes" chip and the pre-delete ConfirmDialog).
|
||||
func (r *NodeRegistry) ListPendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
|
||||
var ops []PendingBackendOp
|
||||
if err := r.db.WithContext(ctx).Order("backend ASC, created_at ASC").Find(&ops).Error; err != nil {
|
||||
return nil, fmt.Errorf("listing pending backend ops: %w", err)
|
||||
}
|
||||
return ops, nil
|
||||
}
|
||||
|
||||
// DeletePendingBackendOp removes a queue row — called after the op succeeds.
|
||||
func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) error {
|
||||
if err := r.db.WithContext(ctx).Delete(&PendingBackendOp{}, id).Error; err != nil {
|
||||
return fmt.Errorf("deleting pending backend op %d: %w", id, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RecordPendingBackendOpFailure bumps Attempts, captures the error, and
|
||||
// pushes NextRetryAt out with exponential backoff capped at 15 minutes.
|
||||
func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error {
|
||||
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
var row PendingBackendOp
|
||||
if err := tx.First(&row, id).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
row.Attempts++
|
||||
row.LastError = errMsg
|
||||
row.NextRetryAt = time.Now().Add(backoffForAttempt(row.Attempts))
|
||||
return tx.Save(&row).Error
|
||||
})
|
||||
}
|
||||
|
||||
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
|
||||
// reconciler tick is 30s so anything shorter would just re-fire immediately.
|
||||
func backoffForAttempt(attempts int) time.Duration {
|
||||
const cap = 15 * time.Minute
|
||||
base := 30 * time.Second
|
||||
shift := attempts - 1
|
||||
if shift < 0 {
|
||||
shift = 0
|
||||
}
|
||||
if shift > 10 { // 2^10 * 30s already exceeds the cap
|
||||
shift = 10
|
||||
}
|
||||
d := base << shift
|
||||
if d > cap {
|
||||
return cap
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// CountPendingBackendOpsByBackend returns a map of backend name to the count
|
||||
// of pending rows. Used to decorate Manage → Backends with a "pending on N
|
||||
// nodes" chip without exposing the full queue.
|
||||
func (r *NodeRegistry) CountPendingBackendOpsByBackend(ctx context.Context) (map[string]int, error) {
|
||||
type row struct {
|
||||
Backend string
|
||||
Count int
|
||||
}
|
||||
var rows []row
|
||||
err := r.db.WithContext(ctx).Model(&PendingBackendOp{}).
|
||||
Select("backend, COUNT(*) as count").
|
||||
Group("backend").
|
||||
Scan(&rows).Error
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("counting pending backend ops: %w", err)
|
||||
}
|
||||
out := make(map[string]int, len(rows))
|
||||
for _, r := range rows {
|
||||
out[r.Backend] = r.Count
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user