Files
LocalAI/core/services/nodes/managers_distributed.go
LocalAI [bot] 447c186089 fix(distributed): make backend upgrade actually re-install on workers (#9708)
* fix(distributed): make backend upgrade actually re-install on workers

UpgradeBackend dispatched a vanilla backend.install NATS event to every
node hosting the backend. The worker's installBackend short-circuits on
"already running for this (model, replica) slot" and returns the
existing address — so the gallery install path was skipped, no artifact
was re-downloaded, no metadata was written. The frontend's drift
detection then re-flagged the same backends every cycle (installedDigest
stays empty → mismatch → "Backend upgrade available (new build)") while
"Backend upgraded successfully" landed in the logs at the same time.
The user-visible symptom: clicking "Upgrade All" silently does nothing
and the same N backends sit on the upgrade list forever.

Two coupled fixes, one PR:

1. Force flag on backend.install. Add `Force bool` to
   BackendInstallRequest and thread it through NodeCommandSender ->
   RemoteUnloaderAdapter. UpgradeBackend (and the reconciler's pending-op
   drain when retrying an upgrade) sets force=true; routine load events
   and admin install endpoints keep force=false. On the worker, force=true
   stops every live process that uses this backend (resolveProcessKeys
   for peer replicas, plus the exact request processKey), skips the
   findBackend short-circuit, and passes force=true into
   gallery.InstallBackendFromGallery so the on-disk artifact is
   overwritten. After the gallery install completes, startBackend brings
   up a fresh process at the same processKey on a new port.

2. Liveness check on the fast path. installBackend's "already running"
   branch read getAddr without verifying the process was alive, so a
   gRPC backend that died without the supervisor noticing left a stale
   (key, addr) entry. The reconciler then dialed that address, got
   ECONNREFUSED, marked the replica failed, retried install — and the
   supervisor said "already running addr=…" again. Loop forever, exactly
   what we observed on a node whose llama-cpp process had died but whose
   supervisor record persisted. Verify s.isRunning(processKey) before
   trusting getAddr; if the entry is stale, stopBackendExact cleans up
   and we fall through to a real install.

Backwards-compatible: the new Force field is omitempty, older workers
ignore it (their default behavior matches force=false). The signature
change on NodeCommandSender.InstallBackend is internal-only.

Verified: unit tests in core/services/nodes pass (108s suite). The
pre-existing core/backend build break (proto regen pending for
word-level timestamps) blocks core/cli and core/http/endpoints/localai
package tests but is unrelated to this change.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7 [Claude Code]

* test(e2e/distributed): pass force=false to adapter.InstallBackend

NodeCommandSender.InstallBackend gained a final force bool in the
upgrade-force commit; the e2e distributed lifecycle tests still called
the old 8-arg signature and broke compilation. These tests exercise the
routine install path (single replica, default behavior), so force=false
preserves their existing semantics.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-07 17:28:14 +02:00

424 lines
17 KiB
Go

package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
"github.com/nats-io/nats.go"
)
// DistributedModelManager wraps a local ModelManager and adds NATS fan-out
// for model deletion so worker nodes clean up stale files.
type DistributedModelManager struct {
local galleryop.ModelManager
adapter *RemoteUnloaderAdapter
}
// NewDistributedModelManager creates a DistributedModelManager.
// Backend auto-install is disabled because the frontend node delegates
// inference to workers and never runs backends locally.
func NewDistributedModelManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter) *DistributedModelManager {
local := galleryop.NewLocalModelManager(appConfig, ml)
local.SetAutoInstallBackend(false)
return &DistributedModelManager{
local: local,
adapter: adapter,
}
}
func (d *DistributedModelManager) DeleteModel(name string) error {
err := d.local.DeleteModel(name)
// Best-effort: fan out model.delete to worker nodes
if rcErr := d.adapter.DeleteModelFiles(name); rcErr != nil {
xlog.Warn("Failed to propagate model file deletion to workers", "model", name, "error", rcErr)
}
return err
}
func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryModel, gallery.ModelConfig], progressCb galleryop.ProgressCallback) error {
return d.local.InstallModel(ctx, op, progressCb)
}
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
// for backend deletion so worker nodes clean up stale files.
type DistributedBackendManager struct {
local galleryop.BackendManager
adapter *RemoteUnloaderAdapter
registry *NodeRegistry
backendGalleries []config.Gallery
systemState *system.SystemState
}
// NewDistributedBackendManager creates a DistributedBackendManager.
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
return &DistributedBackendManager{
local: galleryop.NewLocalBackendManager(appConfig, ml),
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
}
}
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
// Returned as part of BackendOpResult so the frontend can surface exactly
// what happened on each worker instead of a single joined error string.
type NodeOpStatus struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"` // "success" | "queued" | "error"
Error string `json:"error,omitempty"`
}
// BackendOpResult aggregates per-node outcomes.
type BackendOpResult struct {
Nodes []NodeOpStatus `json:"nodes"`
}
// Err returns a non-nil error aggregating per-node hard failures
// (Status == "error"). Queued nodes (waiting for reconciler retry) are not
// failures — surfacing them as errors would mislead users about durable
// intent. Used by Install/Upgrade/Delete so reply.Success=false from
// workers reaches OpStatus.Error and the UI, instead of being silently
// dropped on the way up.
func (r BackendOpResult) Err() error {
var failures []string
for _, n := range r.Nodes {
if n.Status == "error" {
failures = append(failures, fmt.Sprintf("%s: %s", n.NodeName, n.Error))
}
}
if len(failures) == 0 {
return nil
}
return errors.New(strings.Join(failures, "; "))
}
// enqueueAndDrainBackendOp is the shared scaffolding for
// delete/install/upgrade. Every non-pending node gets a pending_backend_ops
// row (intent is durable even if the node is offline). Currently-healthy
// nodes get an immediate attempt; success deletes the row, failure records
// the error and leaves the row for the reconciler to retry.
//
// `apply` is the NATS round-trip for one node. Returning an error keeps the
// row in the queue and marks the per-node status as "error"; returning nil
// deletes the row and reports "success". For non-healthy nodes the status
// is "queued" — no attempt is made right now, reconciler will pick it up
// when the node returns.
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
// never had the backend installed to "upgrade" it — such requests fail at the
// gallery (no platform variant) and would otherwise leave a forever-retrying
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet — no intent to apply.
if node.Status == StatusPending {
continue
}
// Backend lifecycle ops only make sense on backend-type workers.
// Agent workers don't subscribe to backend.install/delete/list, so
// enqueueing for them guarantees a forever-retrying row that the
// reconciler can never drain. Silently skip — they aren't consumers.
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
if targetNodeIDs != nil && !targetNodeIDs[node.ID] {
continue
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
})
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
})
continue
}
applyErr := apply(node)
if applyErr == nil {
// Find the row we just upserted and delete it; cheap but requires
// a lookup since UpsertPendingBackendOp doesn't return the ID.
if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil {
xlog.Debug("Failed to clear pending backend op after success", "error", err)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "success",
})
continue
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL — mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
}
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
})
}
return result, nil
}
// findPendingRow looks up the ID of a pending_backend_ops row by its
// composite key. Used to hand off to RecordPendingBackendOpFailure /
// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same
// composite key.
func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) {
var row PendingBackendOp
if err := d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
First(&row).Error; err != nil {
return 0, err
}
return row.ID, nil
}
// deletePendingRow removes the queue row keyed by (nodeID, backend, op).
func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error {
return d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
Delete(&PendingBackendOp{}).Error
}
// DeleteBackend fans out backend deletion to every known node. The previous
// implementation silently skipped non-healthy nodes, which meant zombies
// reappeared once those nodes returned. Now the intent is durable — see
// enqueueAndDrainBackendOp — and the reconciler catches up later.
func (d *DistributedBackendManager) DeleteBackend(name string) error {
// Local delete first (frontend rarely has backends installed in
// distributed mode, but the gallery operation still expects it; ignore
// "not found" which is the common case).
if err := d.local.DeleteBackend(name); err != nil {
if !errors.Is(err, gallery.ErrBackendNotFound) {
return err
}
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
}
ctx := context.Background()
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("delete failed: %s", reply.Error)
}
return nil
})
if err != nil {
return err
}
return result.Err()
}
// DeleteBackendDetailed is the per-node-result variant called by the HTTP
// handler so the UI can render a per-node status drawer. DeleteBackend still
// returns error-only for callers that don't care about node breakdown.
func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) {
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("delete failed: %s", reply.Error)
}
return nil
})
}
// ListBackends aggregates installed backends from all worker nodes, preserving
// per-node attribution. Each SystemBackend.Nodes entry records which node has
// the backend and the version/digest it reports. The top-level Metadata is
// populated from the first node seen so single-node-minded callers still work.
//
// Pending/offline/draining nodes are skipped because they aren't expected to
// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders
// then marks them unhealthy and the loop continues.
func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) {
result := make(gallery.SystemBackends)
allNodes, err := d.registry.List(context.Background())
if err != nil {
return result, err
}
for _, node := range allNodes {
if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining {
continue
}
reply, err := d.adapter.ListBackends(node.ID)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to list backends on worker", "node", node.Name, "error", err)
continue
}
if reply.Error != "" {
xlog.Warn("Worker returned error listing backends", "node", node.Name, "error", reply.Error)
continue
}
for _, b := range reply.Backends {
ref := gallery.NodeBackendRef{
NodeID: node.ID,
NodeName: node.Name,
NodeStatus: node.Status,
Version: b.Version,
Digest: b.Digest,
URI: b.URI,
InstalledAt: b.InstalledAt,
}
entry, exists := result[b.Name]
if !exists {
entry = gallery.SystemBackend{
Name: b.Name,
IsSystem: b.IsSystem,
IsMeta: b.IsMeta,
Metadata: &gallery.BackendMetadata{
Name: b.Name,
InstalledAt: b.InstalledAt,
GalleryURL: b.GalleryURL,
Version: b.Version,
URI: b.URI,
Digest: b.Digest,
},
}
}
entry.Nodes = append(entry.Nodes, ref)
result[b.Name] = entry
}
}
return result, nil
}
// InstallBackend fans out installation through the pending-ops queue so
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
// reply.Success==false is treated as an error so the row stays for retry.
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error {
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 — the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, false)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("install failed: %s", reply.Error)
}
return nil
})
if err != nil {
return err
}
return result.Err()
}
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
// from the gallery). Unlike Install/Delete, upgrade only targets the nodes
// that already report this backend as installed — fanning out to every node
// would ask workers to "upgrade" something they never had, which fails at
// the gallery (e.g. a darwin/arm64 worker has no platform variant for a
// linux-only backend) and leaves a forever-retrying pending_backend_ops row.
//
// force=true on the install call is what distinguishes upgrade from install:
// the worker stops the live process for this backend, overwrites the on-disk
// artifact, and restarts. Without it, the worker's "already running" fast
// path turns every backend.install into a no-op and the gallery's drift
// detection never converges.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(d.backendGalleries)
installed, err := d.ListBackends()
if err != nil {
return fmt.Errorf("failed to list cluster backends: %w", err)
}
entry, ok := installed[name]
if !ok || len(entry.Nodes) == 0 {
return fmt.Errorf("backend %q is not installed on any node", name)
}
targetNodeIDs := make(map[string]bool, len(entry.Nodes))
for _, n := range entry.Nodes {
targetNodeIDs[n.NodeID] = true
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0, true)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("upgrade failed: %s", reply.Error)
}
return nil
})
if err != nil {
return err
}
return result.Err()
}
// IsDistributed reports that installs from this manager fan out across the
// cluster. The HTTP layer reads this to gate hardware-specific installs on
// /api/backends/apply (which would otherwise silently land on every node).
func (d *DistributedBackendManager) IsDistributed() bool { return true }
// CheckUpgrades checks for available backend upgrades across the cluster.
//
// The previous implementation delegated to d.local, which called
// ListSystemBackends on the frontend — but in distributed mode the frontend
// has no backends installed locally, so the upgrade loop never ran and the UI
// never surfaced any upgrades. We now feed the cluster-wide aggregation
// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so
// digest-based detection actually works and cluster drift is visible.
func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) {
installed, err := d.ListBackends()
if err != nil {
return nil, err
}
// systemState is used by AvailableBackends (gallery paths + meta-backend
// resolution). The `installed` argument is what the old code got wrong —
// it used to come from the empty frontend filesystem.
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
}