mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 13:10:23 -04:00
* feat(messaging): add backend.upgrade NATS subject + payload types
Splits the slow force-reinstall path off backend.install so it can run on
its own subscription goroutine, eliminating head-of-line blocking between
routine model loads and full gallery upgrades.
Wire-level Force flag on BackendInstallRequest is kept for one release as
the rolling-update fallback target; doc note marks it deprecated.
Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): add per-backend mutex helper to backendSupervisor
Different backend names lock independently; same backend serializes. This
is the synchronization primitive used by the upcoming concurrent install
handler — without it, wrapping the NATS callback in a goroutine would
race the gallery directory when two requests target the same backend.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed/worker): run backend.install handler in a goroutine
NATS subscriptions deliver messages serially on a single per-subscription
goroutine. With a synchronous install handler, a multi-minute gallery
download would head-of-line-block every other install request to the
same worker — manifesting upstream as a 5-minute "nats: timeout" on
unrelated routine model loads.
The body now runs in its own goroutine, with a per-backend mutex
(lockBackend) protecting the gallery directory from concurrent operations
on the same backend. Different backend names install in parallel.
Backward-compat: req.Force=true is still honored here, so an older master
that hasn't been updated to send on backend.upgrade keeps working.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): subscribe to backend.upgrade as a separate path
Slow force-reinstall now lives on its own NATS subscription, so a
multi-minute gallery pull cannot head-of-line-block the routine
backend.install handler on the same worker. Same per-backend mutex
guards both — concurrent install + upgrade for the same backend
serialize at the gallery directory; different backends are independent.
upgradeBackend stops every live process for the backend, force-installs
from gallery, and re-registers. It does not start a new process — the
next backend.install will spawn one with the freshly-pulled binary.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend
Master now sends to backend.upgrade for force-reinstall, with a
nats.ErrNoResponders fallback to the legacy backend.install Force=true
path so a rolling update with a new master + an old worker still
converges. The Force parameter leaves the public Go API surface
entirely — only the internal fallback sets it on the wire.
InstallBackend timeout drops 5min -> 3min (most replies are sub-second
since the worker short-circuits on already-running or already-installed).
UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi
gallery pulls.
Updates the admin install HTTP endpoint
(core/http/endpoints/localai/nodes.go) to the new signature too.
router_test.go's fakeUnloader does not yet implement the new interface
shape; Task 3.2 will catch it up before the next package-level test run.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): update fakeUnloader for new NodeCommandSender shape
InstallBackend lost its force bool param (Force is not part of the public
Go API anymore — only the internal upgrade-fallback path sets it on the
wire). UpgradeBackend gained a method. Fake records both call slices and
provides an installHook concurrency seam for upcoming singleflight tests.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): cover UpgradeBackend's new subject + rolling-update fallback
Task 3.1 changed the master to publish UpgradeBackend on the new
backend.upgrade subject; the existing UpgradeBackend tests scripted the
old install subject and so all 3 began failing as expected. Updates them
to script SubjectNodeBackendUpgrade with BackendUpgradeReply.
Adds two new specs for the rolling-update fallback:
- ErrNoResponders on backend.upgrade triggers a backend.install
Force=true retry on the same node.
- Non-NoResponders errors propagate to the caller unchanged.
scriptedMessagingClient gains scriptNoResponders (real nats sentinel) and
scriptReplyMatching (predicate-matched canned reply, used to assert that
the fallback path actually sets Force=true on the install retry).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed): coalesce concurrent identical backend.install via singleflight
Six simultaneous chat completions for the same not-yet-loaded model were
observed firing six independent NATS install requests, each serializing
through the worker's per-subscription goroutine and amplifying queue
depth. SmartRouter now wraps the NATS round-trip in a singleflight.Group
keyed by (nodeID, backend, modelID, replica): N concurrent identical
loads share one round-trip and one reply.
Distinct (modelID, replica) keys still fire independent calls, so
multi-replica scaling and multi-model fan-out are unaffected.
fakeUnloader gains a sync.Mutex around its recording slices to keep
concurrent test goroutines race-clean.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(e2e/distributed): drop force arg from InstallBackend test calls
Two e2e test call sites still passed the trailing force bool that was
removed from RemoteUnloaderAdapter.InstallBackend in 9bde76d7. Caught
by golangci-lint typecheck on the upgrade-split branch (master CI was
already green because these tests don't run in the standard test path).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed): extract worker business logic to core/services/worker
core/cli/worker.go grew to 1212 lines after the backend.upgrade split.
The CLI package was carrying backendSupervisor, NATS lifecycle handlers,
gallery install/upgrade orchestration, S3 file staging, and registration
helpers — all distributed-worker business logic that doesn't belong in
the cobra surface.
Move it to a new core/services/worker package, mirroring the existing
core/services/{nodes,messaging,galleryop} pattern. core/cli/worker.go
shrinks to ~19 lines: a kong-tagged shim that embeds worker.Config and
delegates Run.
No behavior change. All symbols stay unexported except Config and Run.
The three worker-specific tests (addr/replica/concurrency) move with
the code via git mv so history follows them.
Files split as:
worker.go - Run entry point
config.go - Config struct (kong tags retained, kong not imported)
supervisor.go - backendProcess, backendSupervisor, process lifecycle
install.go - installBackend, upgradeBackend, findBackend, lockBackend
lifecycle.go - subscribeLifecycleEvents (verbatim, decomposition is
a follow-up commit)
file_staging.go - subscribeFileStaging, isPathAllowed
registration.go - advertiseAddr, registrationBody, heartbeatBody, etc.
reply.go - replyJSON
process_helpers.go - readLastLinesFromFile
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed/worker): decompose subscribeLifecycleEvents into per-event handlers
The 226-line subscribeLifecycleEvents method packed eight NATS subscriptions
inline. Each grew context-shaped doc comments mixed with subscription
plumbing, making it hard to read any one handler without scrolling past the
others. Extract each handler into its own method on *backendSupervisor; the
subscriber becomes a thin 8-line dispatcher.
No behavior change: each method body is byte-equivalent to its corresponding
inline goroutine + handler. Doc comments that were attached to the inline
SubscribeReply calls migrate to the new method godocs.
Adding the next NATS subject is now a 2-line patch to the dispatcher plus
one new method, instead of grafting onto a monolith.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
---------
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
436 lines
17 KiB
Go
436 lines
17 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/services/galleryop"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/LocalAI/pkg/system"
|
|
"github.com/mudler/xlog"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// DistributedModelManager wraps a local ModelManager and adds NATS fan-out
|
|
// for model deletion so worker nodes clean up stale files.
|
|
type DistributedModelManager struct {
|
|
local galleryop.ModelManager
|
|
adapter *RemoteUnloaderAdapter
|
|
}
|
|
|
|
// NewDistributedModelManager creates a DistributedModelManager.
|
|
// Backend auto-install is disabled because the frontend node delegates
|
|
// inference to workers and never runs backends locally.
|
|
func NewDistributedModelManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter) *DistributedModelManager {
|
|
local := galleryop.NewLocalModelManager(appConfig, ml)
|
|
local.SetAutoInstallBackend(false)
|
|
return &DistributedModelManager{
|
|
local: local,
|
|
adapter: adapter,
|
|
}
|
|
}
|
|
|
|
func (d *DistributedModelManager) DeleteModel(name string) error {
|
|
err := d.local.DeleteModel(name)
|
|
// Best-effort: fan out model.delete to worker nodes
|
|
if rcErr := d.adapter.DeleteModelFiles(name); rcErr != nil {
|
|
xlog.Warn("Failed to propagate model file deletion to workers", "model", name, "error", rcErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryModel, gallery.ModelConfig], progressCb galleryop.ProgressCallback) error {
|
|
return d.local.InstallModel(ctx, op, progressCb)
|
|
}
|
|
|
|
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
|
|
// for backend deletion so worker nodes clean up stale files.
|
|
type DistributedBackendManager struct {
|
|
local galleryop.BackendManager
|
|
adapter *RemoteUnloaderAdapter
|
|
registry *NodeRegistry
|
|
backendGalleries []config.Gallery
|
|
systemState *system.SystemState
|
|
}
|
|
|
|
// NewDistributedBackendManager creates a DistributedBackendManager.
|
|
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
|
|
return &DistributedBackendManager{
|
|
local: galleryop.NewLocalBackendManager(appConfig, ml),
|
|
adapter: adapter,
|
|
registry: registry,
|
|
backendGalleries: appConfig.BackendGalleries,
|
|
systemState: appConfig.SystemState,
|
|
}
|
|
}
|
|
|
|
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
|
|
// Returned as part of BackendOpResult so the frontend can surface exactly
|
|
// what happened on each worker instead of a single joined error string.
|
|
type NodeOpStatus struct {
|
|
NodeID string `json:"node_id"`
|
|
NodeName string `json:"node_name"`
|
|
Status string `json:"status"` // "success" | "queued" | "error"
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// BackendOpResult aggregates per-node outcomes.
|
|
type BackendOpResult struct {
|
|
Nodes []NodeOpStatus `json:"nodes"`
|
|
}
|
|
|
|
// Err returns a non-nil error aggregating per-node hard failures
|
|
// (Status == "error"). Queued nodes (waiting for reconciler retry) are not
|
|
// failures — surfacing them as errors would mislead users about durable
|
|
// intent. Used by Install/Upgrade/Delete so reply.Success=false from
|
|
// workers reaches OpStatus.Error and the UI, instead of being silently
|
|
// dropped on the way up.
|
|
func (r BackendOpResult) Err() error {
|
|
var failures []string
|
|
for _, n := range r.Nodes {
|
|
if n.Status == "error" {
|
|
failures = append(failures, fmt.Sprintf("%s: %s", n.NodeName, n.Error))
|
|
}
|
|
}
|
|
if len(failures) == 0 {
|
|
return nil
|
|
}
|
|
return errors.New(strings.Join(failures, "; "))
|
|
}
|
|
|
|
// enqueueAndDrainBackendOp is the shared scaffolding for
|
|
// delete/install/upgrade. Every non-pending node gets a pending_backend_ops
|
|
// row (intent is durable even if the node is offline). Currently-healthy
|
|
// nodes get an immediate attempt; success deletes the row, failure records
|
|
// the error and leaves the row for the reconciler to retry.
|
|
//
|
|
// `apply` is the NATS round-trip for one node. Returning an error keeps the
|
|
// row in the queue and marks the per-node status as "error"; returning nil
|
|
// deletes the row and reports "success". For non-healthy nodes the status
|
|
// is "queued" — no attempt is made right now, reconciler will pick it up
|
|
// when the node returns.
|
|
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
|
|
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
|
|
// never had the backend installed to "upgrade" it — such requests fail at the
|
|
// gallery (no platform variant) and would otherwise leave a forever-retrying
|
|
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
|
|
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
|
|
allNodes, err := d.registry.List(ctx)
|
|
if err != nil {
|
|
return BackendOpResult{}, err
|
|
}
|
|
|
|
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
|
|
for _, node := range allNodes {
|
|
// Pending nodes haven't been approved yet — no intent to apply.
|
|
if node.Status == StatusPending {
|
|
continue
|
|
}
|
|
// Backend lifecycle ops only make sense on backend-type workers.
|
|
// Agent workers don't subscribe to backend.install/delete/list, so
|
|
// enqueueing for them guarantees a forever-retrying row that the
|
|
// reconciler can never drain. Silently skip — they aren't consumers.
|
|
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
|
|
continue
|
|
}
|
|
if targetNodeIDs != nil && !targetNodeIDs[node.ID] {
|
|
continue
|
|
}
|
|
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
|
|
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "error",
|
|
Error: fmt.Sprintf("enqueue failed: %v", err),
|
|
})
|
|
continue
|
|
}
|
|
|
|
if node.Status != StatusHealthy {
|
|
// Intent is recorded; reconciler will retry when the node recovers.
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "queued",
|
|
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
|
|
})
|
|
continue
|
|
}
|
|
|
|
applyErr := apply(node)
|
|
if applyErr == nil {
|
|
// Find the row we just upserted and delete it; cheap but requires
|
|
// a lookup since UpsertPendingBackendOp doesn't return the ID.
|
|
if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil {
|
|
xlog.Debug("Failed to clear pending backend op after success", "error", err)
|
|
}
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "success",
|
|
})
|
|
continue
|
|
}
|
|
|
|
// Record failure for backoff. If it's an ErrNoResponders, the node's
|
|
// gone AWOL — mark unhealthy so the router stops picking it too.
|
|
errMsg := applyErr.Error()
|
|
if errors.Is(applyErr, nats.ErrNoResponders) {
|
|
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
|
d.registry.MarkUnhealthy(ctx, node.ID)
|
|
}
|
|
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
|
|
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
|
|
}
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
|
|
})
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// findPendingRow looks up the ID of a pending_backend_ops row by its
|
|
// composite key. Used to hand off to RecordPendingBackendOpFailure /
|
|
// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same
|
|
// composite key.
|
|
func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) {
|
|
var row PendingBackendOp
|
|
if err := d.registry.db.WithContext(ctx).
|
|
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
|
|
First(&row).Error; err != nil {
|
|
return 0, err
|
|
}
|
|
return row.ID, nil
|
|
}
|
|
|
|
// deletePendingRow removes the queue row keyed by (nodeID, backend, op).
|
|
func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error {
|
|
return d.registry.db.WithContext(ctx).
|
|
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
|
|
Delete(&PendingBackendOp{}).Error
|
|
}
|
|
|
|
// DeleteBackend fans out backend deletion to every known node. The previous
|
|
// implementation silently skipped non-healthy nodes, which meant zombies
|
|
// reappeared once those nodes returned. Now the intent is durable — see
|
|
// enqueueAndDrainBackendOp — and the reconciler catches up later.
|
|
func (d *DistributedBackendManager) DeleteBackend(name string) error {
|
|
// Local delete first (frontend rarely has backends installed in
|
|
// distributed mode, but the gallery operation still expects it; ignore
|
|
// "not found" which is the common case).
|
|
if err := d.local.DeleteBackend(name); err != nil {
|
|
if !errors.Is(err, gallery.ErrBackendNotFound) {
|
|
return err
|
|
}
|
|
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
|
|
reply, err := d.adapter.DeleteBackend(node.ID, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("delete failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return result.Err()
|
|
}
|
|
|
|
// DeleteBackendDetailed is the per-node-result variant called by the HTTP
|
|
// handler so the UI can render a per-node status drawer. DeleteBackend still
|
|
// returns error-only for callers that don't care about node breakdown.
|
|
func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) {
|
|
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
|
|
return BackendOpResult{}, err
|
|
}
|
|
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
|
|
reply, err := d.adapter.DeleteBackend(node.ID, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("delete failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ListBackends aggregates installed backends from all worker nodes, preserving
|
|
// per-node attribution. Each SystemBackend.Nodes entry records which node has
|
|
// the backend and the version/digest it reports. The top-level Metadata is
|
|
// populated from the first node seen so single-node-minded callers still work.
|
|
//
|
|
// Pending/offline/draining nodes are skipped because they aren't expected to
|
|
// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders
|
|
// then marks them unhealthy and the loop continues.
|
|
func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) {
|
|
result := make(gallery.SystemBackends)
|
|
allNodes, err := d.registry.List(context.Background())
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
for _, node := range allNodes {
|
|
if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining {
|
|
continue
|
|
}
|
|
reply, err := d.adapter.ListBackends(node.ID)
|
|
if err != nil {
|
|
if errors.Is(err, nats.ErrNoResponders) {
|
|
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
|
d.registry.MarkUnhealthy(context.Background(), node.ID)
|
|
continue
|
|
}
|
|
xlog.Warn("Failed to list backends on worker", "node", node.Name, "error", err)
|
|
continue
|
|
}
|
|
if reply.Error != "" {
|
|
xlog.Warn("Worker returned error listing backends", "node", node.Name, "error", reply.Error)
|
|
continue
|
|
}
|
|
for _, b := range reply.Backends {
|
|
ref := gallery.NodeBackendRef{
|
|
NodeID: node.ID,
|
|
NodeName: node.Name,
|
|
NodeStatus: node.Status,
|
|
Version: b.Version,
|
|
Digest: b.Digest,
|
|
URI: b.URI,
|
|
InstalledAt: b.InstalledAt,
|
|
}
|
|
entry, exists := result[b.Name]
|
|
if !exists {
|
|
entry = gallery.SystemBackend{
|
|
Name: b.Name,
|
|
IsSystem: b.IsSystem,
|
|
IsMeta: b.IsMeta,
|
|
Metadata: &gallery.BackendMetadata{
|
|
Name: b.Name,
|
|
InstalledAt: b.InstalledAt,
|
|
GalleryURL: b.GalleryURL,
|
|
Version: b.Version,
|
|
URI: b.URI,
|
|
Digest: b.Digest,
|
|
},
|
|
}
|
|
}
|
|
entry.Nodes = append(entry.Nodes, ref)
|
|
result[b.Name] = entry
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// InstallBackend fans out installation through the pending-ops queue so
|
|
// non-healthy nodes get retried when they come back instead of being silently
|
|
// skipped. Reply success from the NATS round-trip deletes the queue row;
|
|
// reply.Success==false is treated as an error so the row stays for retry.
|
|
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
|
|
galleriesJSON, _ := json.Marshal(op.Galleries)
|
|
backendName := op.GalleryElementName
|
|
|
|
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error {
|
|
// Admin-driven backend install: not tied to a specific replica slot.
|
|
// Pass replica 0 — the worker's processKey is "backend#0" when no
|
|
// modelID is supplied, matching pre-PR4 behavior.
|
|
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("install failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return result.Err()
|
|
}
|
|
|
|
// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
|
|
// force-reinstall path doesn't head-of-line-block routine model loads on
|
|
// the same worker. Only nodes that already report this backend as installed
|
|
// are targeted — fanning out to every node would ask workers to "upgrade"
|
|
// something they never had, which fails at the gallery (e.g. a darwin/arm64
|
|
// worker has no platform variant for a linux-only backend) and leaves a
|
|
// forever-retrying pending_backend_ops row.
|
|
//
|
|
// Rolling-update fallback: when a worker returns nats.ErrNoResponders on
|
|
// backend.upgrade, we try the legacy backend.install Force=true path so a
|
|
// new master + old worker still converges. Drop the fallback once every
|
|
// worker in the fleet is on 2026-05-08 or newer.
|
|
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
|
|
galleriesJSON, _ := json.Marshal(d.backendGalleries)
|
|
|
|
installed, err := d.ListBackends()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list cluster backends: %w", err)
|
|
}
|
|
entry, ok := installed[name]
|
|
if !ok || len(entry.Nodes) == 0 {
|
|
return fmt.Errorf("backend %q is not installed on any node", name)
|
|
}
|
|
targetNodeIDs := make(map[string]bool, len(entry.Nodes))
|
|
for _, n := range entry.Nodes {
|
|
targetNodeIDs[n.NodeID] = true
|
|
}
|
|
|
|
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
|
|
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
|
|
if err != nil {
|
|
// Rolling-update fallback: an older worker doesn't know
|
|
// backend.upgrade. Try the legacy install-with-force path.
|
|
if errors.Is(err, nats.ErrNoResponders) {
|
|
instReply, instErr := d.adapter.installWithForceFallback(node.ID, name, string(galleriesJSON), "", "", "", 0)
|
|
if instErr != nil {
|
|
return instErr
|
|
}
|
|
if !instReply.Success {
|
|
return fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error)
|
|
}
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("upgrade failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return result.Err()
|
|
}
|
|
|
|
// IsDistributed reports that installs from this manager fan out across the
|
|
// cluster. The HTTP layer reads this to gate hardware-specific installs on
|
|
// /api/backends/apply (which would otherwise silently land on every node).
|
|
func (d *DistributedBackendManager) IsDistributed() bool { return true }
|
|
|
|
// CheckUpgrades checks for available backend upgrades across the cluster.
|
|
//
|
|
// The previous implementation delegated to d.local, which called
|
|
// ListSystemBackends on the frontend — but in distributed mode the frontend
|
|
// has no backends installed locally, so the upgrade loop never ran and the UI
|
|
// never surfaced any upgrades. We now feed the cluster-wide aggregation
|
|
// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so
|
|
// digest-based detection actually works and cluster drift is visible.
|
|
func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) {
|
|
installed, err := d.ListBackends()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// systemState is used by AvailableBackends (gallery paths + meta-backend
|
|
// resolution). The `installed` argument is what the old code got wrong —
|
|
// it used to come from the empty frontend filesystem.
|
|
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
|
|
}
|