From 447c186089f533acb5c57dbeab5381ac84ca6c2e Mon Sep 17 00:00:00 2001 From: "LocalAI [bot]" <139863280+localai-bot@users.noreply.github.com> Date: Thu, 7 May 2026 17:28:14 +0200 Subject: [PATCH] fix(distributed): make backend upgrade actually re-install on workers (#9708) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(distributed): make backend upgrade actually re-install on workers UpgradeBackend dispatched a vanilla backend.install NATS event to every node hosting the backend. The worker's installBackend short-circuits on "already running for this (model, replica) slot" and returns the existing address — so the gallery install path was skipped, no artifact was re-downloaded, no metadata was written. The frontend's drift detection then re-flagged the same backends every cycle (installedDigest stays empty → mismatch → "Backend upgrade available (new build)") while "Backend upgraded successfully" landed in the logs at the same time. The user-visible symptom: clicking "Upgrade All" silently does nothing and the same N backends sit on the upgrade list forever. Two coupled fixes, one PR: 1. Force flag on backend.install. Add `Force bool` to BackendInstallRequest and thread it through NodeCommandSender -> RemoteUnloaderAdapter. UpgradeBackend (and the reconciler's pending-op drain when retrying an upgrade) sets force=true; routine load events and admin install endpoints keep force=false. On the worker, force=true stops every live process that uses this backend (resolveProcessKeys for peer replicas, plus the exact request processKey), skips the findBackend short-circuit, and passes force=true into gallery.InstallBackendFromGallery so the on-disk artifact is overwritten. After the gallery install completes, startBackend brings up a fresh process at the same processKey on a new port. 2. Liveness check on the fast path. installBackend's "already running" branch read getAddr without verifying the process was alive, so a gRPC backend that died without the supervisor noticing left a stale (key, addr) entry. The reconciler then dialed that address, got ECONNREFUSED, marked the replica failed, retried install — and the supervisor said "already running addr=…" again. Loop forever, exactly what we observed on a node whose llama-cpp process had died but whose supervisor record persisted. Verify s.isRunning(processKey) before trusting getAddr; if the entry is stale, stopBackendExact cleans up and we fall through to a real install. Backwards-compatible: the new Force field is omitempty, older workers ignore it (their default behavior matches force=false). The signature change on NodeCommandSender.InstallBackend is internal-only. Verified: unit tests in core/services/nodes pass (108s suite). The pre-existing core/backend build break (proto regen pending for word-level timestamps) blocks core/cli and core/http/endpoints/localai package tests but is unrelated to this change. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-7 [Claude Code] * test(e2e/distributed): pass force=false to adapter.InstallBackend NodeCommandSender.InstallBackend gained a final force bool in the upgrade-force commit; the e2e distributed lifecycle tests still called the old 8-arg signature and broke compilation. These tests exercise the routine install path (single replica, default behavior), so force=false preserves their existing semantics. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-7 [Claude Code] --------- Signed-off-by: Ettore Di Giacinto Co-authored-by: Ettore Di Giacinto --- core/cli/worker.go | 71 ++++++++++++++++---- core/http/endpoints/localai/nodes.go | 6 +- core/services/messaging/subjects.go | 8 +++ core/services/nodes/managers_distributed.go | 10 ++- core/services/nodes/reconciler.go | 8 ++- core/services/nodes/router.go | 5 +- core/services/nodes/router_test.go | 5 +- core/services/nodes/unloader.go | 17 ++++- tests/e2e/distributed/node_lifecycle_test.go | 4 +- 9 files changed, 107 insertions(+), 27 deletions(-) diff --git a/core/cli/worker.go b/core/cli/worker.go index 032e80a74..2d6ffc605 100644 --- a/core/cli/worker.go +++ b/core/cli/worker.go @@ -664,10 +664,19 @@ func buildProcessKey(modelID, backend string, replicaIndex int) string { } // installBackend handles the backend.install flow: -// 1. If already running for this (model, replica) slot, return existing address -// 2. Install backend from gallery (if not already installed) -// 3. Find backend binary -// 4. Start gRPC process on a new port +// 1. If already running for this (model, replica) slot AND req.Force is false, +// return existing address (the fast path used by routine load events that +// just want to know which port a backend already serves on). +// 2. If req.Force is true, stop any process(es) currently using this backend +// so the gallery install can replace the on-disk artifact and the freshly +// started process picks up the new binary. This is the upgrade path — +// without it, every backend.install we receive after the first hits the +// fast path and silently no-ops, leaving the cluster on a stale build. +// 3. Install backend from gallery (force=req.Force so existing artifacts get +// overwritten on upgrade). +// 4. Find backend binary +// 5. Start gRPC process on a new port +// // Returns the gRPC address of the backend process. // // ProcessKey includes the replica index so a worker with MaxReplicasPerModel>1 @@ -677,10 +686,40 @@ func buildProcessKey(modelID, backend string, replicaIndex int) string { func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest) (string, error) { processKey := buildProcessKey(req.ModelID, req.Backend, int(req.ReplicaIndex)) - // If already running for this model+replica, return its address - if addr := s.getAddr(processKey); addr != "" { - xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr) - return addr, nil + if !req.Force { + // Fast path: already running for this model+replica → return existing + // address. Verify liveness before trusting the cached entry: a process + // that died without the supervisor noticing leaves a stale (key, addr) + // pair, and getAddr would otherwise hand the controller an address + // that immediately ECONNREFUSEDs. The reconciler then marks the + // replica failed, retries the install, the supervisor says "already + // running" again, and the cluster loops on a dead replica forever. + if addr := s.getAddr(processKey); addr != "" { + if s.isRunning(processKey) { + xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr) + return addr, nil + } + xlog.Warn("Stale process entry for backend (dead process); cleaning up before reinstall", + "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr) + s.stopBackendExact(processKey) + } + } else { + // Upgrade path: stop every live process that shares this backend so the + // gallery install can overwrite the on-disk artifact and the restarted + // process picks up the new binary. resolveProcessKeys catches peer + // replicas of the same backend (whisper#0, whisper#1, ...) on workers + // configured with MaxReplicasPerModel>1. We also stop the exact + // processKey from the request tuple — keys created with an explicit + // modelID don't share the bare-name prefix the resolver matches, but + // they're still using the old binary and need to come down. Both calls + // are no-ops on missing keys. + toStop := s.resolveProcessKeys(req.Backend) + toStop = append(toStop, processKey) + for _, key := range toStop { + xlog.Info("Force install: stopping running backend before reinstall", + "backend", req.Backend, "processKey", key) + s.stopBackendExact(key) + } } // Parse galleries from request (override local config if provided) @@ -692,20 +731,26 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest) } } - // Try to find the backend binary - backendPath := s.findBackend(req.Backend) + // On upgrade, run the gallery install path even if the binary already + // exists on disk: findBackend would otherwise short-circuit and we'd + // restart the same stale binary. The force flag passed to + // InstallBackendFromGallery makes it overwrite the existing artifact. + backendPath := "" + if !req.Force { + backendPath = s.findBackend(req.Backend) + } if backendPath == "" { if req.URI != "" { - xlog.Info("Backend not found locally, attempting external install", "backend", req.Backend, "uri", req.URI) + xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", req.Force) if err := galleryop.InstallExternalBackend( context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias, ); err != nil { return "", fmt.Errorf("installing backend from gallery: %w", err) } } else { - xlog.Info("Backend not found locally, attempting gallery install", "backend", req.Backend) + xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", req.Force) if err := gallery.InstallBackendFromGallery( - context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, false, + context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, req.Force, ); err != nil { return "", fmt.Errorf("installing backend from gallery: %w", err) } diff --git a/core/http/endpoints/localai/nodes.go b/core/http/endpoints/localai/nodes.go index 17cccef9a..4cc8643bf 100644 --- a/core/http/endpoints/localai/nodes.go +++ b/core/http/endpoints/localai/nodes.go @@ -407,8 +407,10 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler } // Admin-driven backend install: not tied to a specific replica slot // (no model is being loaded). Pass replica 0 to match the worker's - // admin process-key convention (`backend#0`). - reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0) + // admin process-key convention (`backend#0`). force=false so the + // worker's fast path takes over if the backend is already running — + // upgrades go through the dedicated /api/backends/upgrade path. + reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0, false) if err != nil { xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err) return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node")) diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 530ee7feb..de2e7dcc0 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -137,6 +137,14 @@ type BackendInstallRequest struct { // (single-replica behavior — no collision because the controller never // asks for replica > 0 on a node whose MaxReplicasPerModel is 1). ReplicaIndex int32 `json:"replica_index,omitempty"` + // Force skips the "already running" short-circuit and re-runs the gallery + // install. UpgradeBackend sets this so the worker actually re-downloads the + // artifact, stops the live process, and starts a fresh one — without it, + // the install handler's early return makes upgrades a silent no-op while + // the coordinator's drift detection keeps re-flagging the backend forever. + // Older workers that don't know this field treat it as false (current + // behavior preserved). + Force bool `json:"force,omitempty"` } // BackendInstallReply is the response from a backend.install NATS request. diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 2720cef60..c8d7bf48b 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -339,7 +339,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall // Admin-driven backend install: not tied to a specific replica slot. // Pass replica 0 — the worker's processKey is "backend#0" when no // modelID is supplied, matching pre-PR4 behavior. - reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0) + reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, false) if err != nil { return err } @@ -360,6 +360,12 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall // would ask workers to "upgrade" something they never had, which fails at // the gallery (e.g. a darwin/arm64 worker has no platform variant for a // linux-only backend) and leaves a forever-retrying pending_backend_ops row. +// +// force=true on the install call is what distinguishes upgrade from install: +// the worker stops the live process for this backend, overwrites the on-disk +// artifact, and restarts. Without it, the worker's "already running" fast +// path turns every backend.install into a no-op and the gallery's drift +// detection never converges. func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { galleriesJSON, _ := json.Marshal(d.backendGalleries) @@ -377,7 +383,7 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str } result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error { - reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0) + reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0, true) if err != nil { return err } diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 87d9d3003..298d4e752 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -189,8 +189,12 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { _, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend) case OpBackendInstall, OpBackendUpgrade: // Pending-op drain for admin install/upgrade — not a per-replica - // load. Replica 0 is the conventional admin slot. - reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0) + // load. Replica 0 is the conventional admin slot. Upgrade ops set + // force=true so the worker reinstalls the artifact and restarts + // the live process; install ops keep the existing fast-path + // semantics for the case where the backend is already running. + force := op.Op == OpBackendUpgrade + reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, force) if err != nil { applyErr = err } else if !reply.Success { diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index e78b74378..751b33f0e 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -673,7 +673,10 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod return "", fmt.Errorf("no NATS connection for backend installation") } - reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex) + // force=false: routine load, the worker's fast-path "already running → + // return current address" is correct here. Upgrades go through + // DistributedBackendManager.UpgradeBackend which sets force=true. + reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, false) if err != nil { return "", err } diff --git a/core/services/nodes/router_test.go b/core/services/nodes/router_test.go index f61613d27..a63c0521e 100644 --- a/core/services/nodes/router_test.go +++ b/core/services/nodes/router_test.go @@ -307,10 +307,11 @@ type installCall struct { backend string modelID string replica int + force bool } -func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) { - f.installCalls = append(f.installCalls, installCall{nodeID, backend, modelID, replica}) +func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int, force bool) (*messaging.BackendInstallReply, error) { + f.installCalls = append(f.installCalls, installCall{nodeID, backend, modelID, replica, force}) return f.installReply, f.installErr } diff --git a/core/services/nodes/unloader.go b/core/services/nodes/unloader.go index 95243489c..677964481 100644 --- a/core/services/nodes/unloader.go +++ b/core/services/nodes/unloader.go @@ -16,8 +16,13 @@ type backendStopRequest struct { // NodeCommandSender abstracts NATS-based commands to worker nodes. // Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter. +// +// The `force` parameter on InstallBackend is set by the upgrade path to make +// the worker re-run the gallery install (overwriting the on-disk artifact) and +// restart any live process for that backend. Routine installs and load events +// pass force=false so an already-running process short-circuits as before. type NodeCommandSender interface { - InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) + InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) ListBackends(nodeID string) (*messaging.BackendListReply, error) StopBackend(nodeID, backend string) error @@ -77,10 +82,15 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error { // process key — distinct slots run on distinct ports so multiple replicas of // the same model can coexist on a fat node. Pass 0 for single-replica. // +// force=true is the upgrade path: the worker stops any live process for this +// backend, overwrites the on-disk artifact via gallery install, and restarts. +// Routine installs and load events pass force=false to keep the existing +// "already running → return current address" fast path. +// // Timeout: 5 minutes (gallery install can take a while). -func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) { +func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) { subject := messaging.SubjectNodeBackendInstall(nodeID) - xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex) + xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "force", force) return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ Backend: backendType, @@ -90,6 +100,7 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal Name: name, Alias: alias, ReplicaIndex: int32(replicaIndex), + Force: force, }, 5*time.Minute) } diff --git a/tests/e2e/distributed/node_lifecycle_test.go b/tests/e2e/distributed/node_lifecycle_test.go index ea69e2a2d..83e4c0b72 100644 --- a/tests/e2e/distributed/node_lifecycle_test.go +++ b/tests/e2e/distributed/node_lifecycle_test.go @@ -57,7 +57,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) - installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0) + installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, false) Expect(err).ToNot(HaveOccurred()) Expect(installReply.Success).To(BeTrue()) }) @@ -78,7 +78,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) - installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0) + installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, false) Expect(err).ToNot(HaveOccurred()) Expect(installReply.Success).To(BeFalse()) Expect(installReply.Error).To(ContainSubstring("backend not found"))