From 9bde76d720ef044d5cb803d72da62479f1db8515 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 8 May 2026 08:44:35 +0000 Subject: [PATCH] feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Master now sends to backend.upgrade for force-reinstall, with a nats.ErrNoResponders fallback to the legacy backend.install Force=true path so a rolling update with a new master + an old worker still converges. The Force parameter leaves the public Go API surface entirely — only the internal fallback sets it on the wire. InstallBackend timeout drops 5min -> 3min (most replies are sub-second since the worker short-circuits on already-running or already-installed). UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi gallery pulls. Updates the admin install HTTP endpoint (core/http/endpoints/localai/nodes.go) to the new signature too. router_test.go's fakeUnloader does not yet implement the new interface shape; Task 3.2 will catch it up before the next package-level test run. Assisted-by: Claude:claude-opus-4-7 Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/localai/nodes.go | 8 +- core/services/nodes/managers_distributed.go | 38 ++++++--- core/services/nodes/reconciler.go | 36 ++++++-- core/services/nodes/router.go | 8 +- core/services/nodes/unloader.go | 88 +++++++++++++++----- core/services/nodes/unloader_upgrade_test.go | 31 +++++++ 6 files changed, 159 insertions(+), 50 deletions(-) create mode 100644 core/services/nodes/unloader_upgrade_test.go diff --git a/core/http/endpoints/localai/nodes.go b/core/http/endpoints/localai/nodes.go index 4cc8643bf..9b622acf5 100644 --- a/core/http/endpoints/localai/nodes.go +++ b/core/http/endpoints/localai/nodes.go @@ -407,10 +407,10 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler } // Admin-driven backend install: not tied to a specific replica slot // (no model is being loaded). Pass replica 0 to match the worker's - // admin process-key convention (`backend#0`). force=false so the - // worker's fast path takes over if the backend is already running — - // upgrades go through the dedicated /api/backends/upgrade path. - reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0, false) + // admin process-key convention (`backend#0`). The worker's fast path + // takes over if the backend is already running — upgrades go through + // the dedicated /api/backends/upgrade path on backend.upgrade. + reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0) if err != nil { xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err) return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node")) diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index c8d7bf48b..e5c99d9b7 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -339,7 +339,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall // Admin-driven backend install: not tied to a specific replica slot. // Pass replica 0 — the worker's processKey is "backend#0" when no // modelID is supplied, matching pre-PR4 behavior. - reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, false) + reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0) if err != nil { return err } @@ -354,18 +354,18 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall return result.Err() } -// UpgradeBackend reuses the install NATS subject (the worker re-downloads -// from the gallery). Unlike Install/Delete, upgrade only targets the nodes -// that already report this backend as installed — fanning out to every node -// would ask workers to "upgrade" something they never had, which fails at -// the gallery (e.g. a darwin/arm64 worker has no platform variant for a -// linux-only backend) and leaves a forever-retrying pending_backend_ops row. +// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow +// force-reinstall path doesn't head-of-line-block routine model loads on +// the same worker. Only nodes that already report this backend as installed +// are targeted — fanning out to every node would ask workers to "upgrade" +// something they never had, which fails at the gallery (e.g. a darwin/arm64 +// worker has no platform variant for a linux-only backend) and leaves a +// forever-retrying pending_backend_ops row. // -// force=true on the install call is what distinguishes upgrade from install: -// the worker stops the live process for this backend, overwrites the on-disk -// artifact, and restarts. Without it, the worker's "already running" fast -// path turns every backend.install into a no-op and the gallery's drift -// detection never converges. +// Rolling-update fallback: when a worker returns nats.ErrNoResponders on +// backend.upgrade, we try the legacy backend.install Force=true path so a +// new master + old worker still converges. Drop the fallback once every +// worker in the fleet is on 2026-05-08 or newer. func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { galleriesJSON, _ := json.Marshal(d.backendGalleries) @@ -383,8 +383,20 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str } result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error { - reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0, true) + reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0) if err != nil { + // Rolling-update fallback: an older worker doesn't know + // backend.upgrade. Try the legacy install-with-force path. + if errors.Is(err, nats.ErrNoResponders) { + instReply, instErr := d.adapter.installWithForceFallback(node.ID, name, string(galleriesJSON), "", "", "", 0) + if instErr != nil { + return instErr + } + if !instReply.Success { + return fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error) + } + return nil + } return err } if !reply.Success { diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 298d4e752..72e1c441c 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -187,18 +187,36 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { switch op.Op { case OpBackendDelete: _, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend) - case OpBackendInstall, OpBackendUpgrade: - // Pending-op drain for admin install/upgrade — not a per-replica - // load. Replica 0 is the conventional admin slot. Upgrade ops set - // force=true so the worker reinstalls the artifact and restarts - // the live process; install ops keep the existing fast-path - // semantics for the case where the backend is already running. - force := op.Op == OpBackendUpgrade - reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, force) + case OpBackendInstall: + // Pending-op drain for admin install — not a per-replica load. + // Replica 0 is the conventional admin slot. Install is idempotent: + // the worker short-circuits if the backend is already running. + reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0) if err != nil { applyErr = err } else if !reply.Success { - applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error) + applyErr = fmt.Errorf("install failed: %s", reply.Error) + } + case OpBackendUpgrade: + // Pending-op drain for admin upgrade — fires backend.upgrade so + // the slow re-pull doesn't head-of-line-block install traffic on + // the same worker. Falls back to the legacy backend.install + // Force=true path on nats.ErrNoResponders for old workers that + // don't subscribe to backend.upgrade yet (rolling-update window). + reply, err := rc.adapter.UpgradeBackend(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0) + if err != nil { + if errors.Is(err, nats.ErrNoResponders) { + instReply, instErr := rc.adapter.installWithForceFallback(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0) + if instErr != nil { + applyErr = instErr + } else if !instReply.Success { + applyErr = fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error) + } + } else { + applyErr = err + } + } else if !reply.Success { + applyErr = fmt.Errorf("upgrade failed: %s", reply.Error) } default: xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID) diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index 751b33f0e..e8dbd9916 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -673,10 +673,10 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod return "", fmt.Errorf("no NATS connection for backend installation") } - // force=false: routine load, the worker's fast-path "already running → - // return current address" is correct here. Upgrades go through - // DistributedBackendManager.UpgradeBackend which sets force=true. - reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, false) + // Routine load: the worker's fast-path "already running → return current + // address" is correct here. Upgrades go through + // DistributedBackendManager.UpgradeBackend on the backend.upgrade subject. + reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex) if err != nil { return "", err } diff --git a/core/services/nodes/unloader.go b/core/services/nodes/unloader.go index 677964481..611a34dee 100644 --- a/core/services/nodes/unloader.go +++ b/core/services/nodes/unloader.go @@ -17,12 +17,19 @@ type backendStopRequest struct { // NodeCommandSender abstracts NATS-based commands to worker nodes. // Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter. // -// The `force` parameter on InstallBackend is set by the upgrade path to make -// the worker re-run the gallery install (overwriting the on-disk artifact) and -// restart any live process for that backend. Routine installs and load events -// pass force=false so an already-running process short-circuits as before. +// InstallBackend is idempotent: the worker short-circuits if the backend is +// already running for the requested (modelID, replica) slot. Routine model +// loads and admin installs both call this. +// +// UpgradeBackend is the destructive force-reinstall path: the worker stops +// every live process for the backend, re-pulls the gallery artifact, and +// replies. Caller (DistributedBackendManager.UpgradeBackend) handles +// rolling-update fallback to the legacy install Force=true path on +// nats.ErrNoResponders for old workers that don't subscribe to the new +// backend.upgrade subject. type NodeCommandSender interface { - InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) + InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) + UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) ListBackends(nodeID string) (*messaging.BackendListReply, error) StopBackend(nodeID, backend string) error @@ -75,22 +82,21 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error { } // InstallBackend sends a backend.install request-reply to a worker node. -// The worker installs the backend from gallery (if not already installed), -// starts the gRPC process, and replies when ready. +// Idempotent on the worker: if the (modelID, replica) process is already +// running, the worker short-circuits and returns its address; if the binary +// is on disk, the worker just spawns a process; only a missing binary +// triggers a full gallery pull. // -// replicaIndex selects which replica slot the worker should use as its -// process key — distinct slots run on distinct ports so multiple replicas of -// the same model can coexist on a fat node. Pass 0 for single-replica. +// Timeout: 3 minutes. Most calls return in under 2 seconds (process already +// running). The 3-minute ceiling covers the cold-binary spawn-after-download +// case while still failing fast enough to surface real worker hangs. // -// force=true is the upgrade path: the worker stops any live process for this -// backend, overwrites the on-disk artifact via gallery install, and restarts. -// Routine installs and load events pass force=false to keep the existing -// "already running → return current address" fast path. -// -// Timeout: 5 minutes (gallery install can take a while). -func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) { +// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead — +// it lives on a different NATS subject so it cannot head-of-line-block +// routine load traffic on the same worker. +func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) { subject := messaging.SubjectNodeBackendInstall(nodeID) - xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "force", force) + xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex) return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ Backend: backendType, @@ -100,8 +106,50 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal Name: name, Alias: alias, ReplicaIndex: int32(replicaIndex), - Force: force, - }, 5*time.Minute) + }, 3*time.Minute) +} + +// UpgradeBackend sends a backend.upgrade request-reply to a worker node. +// The worker stops every live process for this backend, force-reinstalls +// from the gallery (overwriting the on-disk artifact), and replies. The +// next routine InstallBackend call spawns a fresh process with the new +// binary — upgrade itself does not start a process. +// +// Timeout: 15 minutes. Real-world worst case observed: 8–10 minutes for +// large CUDA-l4t backend images on Jetson over WiFi. +func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) { + subject := messaging.SubjectNodeBackendUpgrade(nodeID) + xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex) + + return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{ + Backend: backendType, + BackendGalleries: galleriesJSON, + URI: uri, + Name: name, + Alias: alias, + ReplicaIndex: int32(replicaIndex), + }, 15*time.Minute) +} + +// installWithForceFallback is the rolling-update fallback used by +// DistributedBackendManager.UpgradeBackend when backend.upgrade returns +// nats.ErrNoResponders (the worker is on a pre-2026-05-08 build that +// doesn't subscribe to the new subject). It re-fires the legacy +// backend.install with Force=true. Drop this once every worker is on +// 2026-05-08 or newer. +func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) { + subject := messaging.SubjectNodeBackendInstall(nodeID) + xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType) + + return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ + Backend: backendType, + BackendGalleries: galleriesJSON, + URI: uri, + Name: name, + Alias: alias, + ReplicaIndex: int32(replicaIndex), + Force: true, + }, 15*time.Minute) } // ListBackends queries a worker node for its installed backends via NATS request-reply. diff --git a/core/services/nodes/unloader_upgrade_test.go b/core/services/nodes/unloader_upgrade_test.go new file mode 100644 index 000000000..2261dd02e --- /dev/null +++ b/core/services/nodes/unloader_upgrade_test.go @@ -0,0 +1,31 @@ +package nodes + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" +) + +var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() { + It("fires a NATS request to the backend.upgrade subject and returns the reply", func() { + mc := newScriptedMessagingClient() + nodeID := "node-x" + + mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID), + messaging.BackendUpgradeReply{Success: true}) + + adapter := NewRemoteUnloaderAdapter(nil, mc) + reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0) + Expect(err).ToNot(HaveOccurred()) + Expect(reply.Success).To(BeTrue()) + }) + + It("returns the underlying error when the subject has no responders", func() { + mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention + + adapter := NewRemoteUnloaderAdapter(nil, mc) + _, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0) + Expect(err).To(HaveOccurred()) + }) +})