feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend

Master now sends to backend.upgrade for force-reinstall, with a
nats.ErrNoResponders fallback to the legacy backend.install Force=true
path so a rolling update with a new master + an old worker still
converges. The Force parameter leaves the public Go API surface
entirely — only the internal fallback sets it on the wire.

InstallBackend timeout drops 5min -> 3min (most replies are sub-second
since the worker short-circuits on already-running or already-installed).
UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi
gallery pulls.

Updates the admin install HTTP endpoint
(core/http/endpoints/localai/nodes.go) to the new signature too.

router_test.go's fakeUnloader does not yet implement the new interface
shape; Task 3.2 will catch it up before the next package-level test run.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-08 08:44:35 +00:00
parent 0f6cd8e58a
commit 9bde76d720
6 changed files with 159 additions and 50 deletions

View File

@@ -407,10 +407,10 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
} }
// Admin-driven backend install: not tied to a specific replica slot // Admin-driven backend install: not tied to a specific replica slot
// (no model is being loaded). Pass replica 0 to match the worker's // (no model is being loaded). Pass replica 0 to match the worker's
// admin process-key convention (`backend#0`). force=false so the // admin process-key convention (`backend#0`). The worker's fast path
// worker's fast path takes over if the backend is already running — // takes over if the backend is already running — upgrades go through
// upgrades go through the dedicated /api/backends/upgrade path. // the dedicated /api/backends/upgrade path on backend.upgrade.
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0, false) reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)
if err != nil { if err != nil {
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err) xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node")) return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))

View File

@@ -339,7 +339,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
// Admin-driven backend install: not tied to a specific replica slot. // Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 — the worker's processKey is "backend#0" when no // Pass replica 0 — the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior. // modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, false) reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
if err != nil { if err != nil {
return err return err
} }
@@ -354,18 +354,18 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
return result.Err() return result.Err()
} }
// UpgradeBackend reuses the install NATS subject (the worker re-downloads // UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
// from the gallery). Unlike Install/Delete, upgrade only targets the nodes // force-reinstall path doesn't head-of-line-block routine model loads on
// that already report this backend as installed — fanning out to every node // the same worker. Only nodes that already report this backend as installed
// would ask workers to "upgrade" something they never had, which fails at // are targeted — fanning out to every node would ask workers to "upgrade"
// the gallery (e.g. a darwin/arm64 worker has no platform variant for a // something they never had, which fails at the gallery (e.g. a darwin/arm64
// linux-only backend) and leaves a forever-retrying pending_backend_ops row. // worker has no platform variant for a linux-only backend) and leaves a
// forever-retrying pending_backend_ops row.
// //
// force=true on the install call is what distinguishes upgrade from install: // Rolling-update fallback: when a worker returns nats.ErrNoResponders on
// the worker stops the live process for this backend, overwrites the on-disk // backend.upgrade, we try the legacy backend.install Force=true path so a
// artifact, and restarts. Without it, the worker's "already running" fast // new master + old worker still converges. Drop the fallback once every
// path turns every backend.install into a no-op and the gallery's drift // worker in the fleet is on 2026-05-08 or newer.
// detection never converges.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(d.backendGalleries) galleriesJSON, _ := json.Marshal(d.backendGalleries)
@@ -383,8 +383,20 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
} }
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error { result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0, true) reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
if err != nil { if err != nil {
// Rolling-update fallback: an older worker doesn't know
// backend.upgrade. Try the legacy install-with-force path.
if errors.Is(err, nats.ErrNoResponders) {
instReply, instErr := d.adapter.installWithForceFallback(node.ID, name, string(galleriesJSON), "", "", "", 0)
if instErr != nil {
return instErr
}
if !instReply.Success {
return fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error)
}
return nil
}
return err return err
} }
if !reply.Success { if !reply.Success {

View File

@@ -187,18 +187,36 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
switch op.Op { switch op.Op {
case OpBackendDelete: case OpBackendDelete:
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend) _, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
case OpBackendInstall, OpBackendUpgrade: case OpBackendInstall:
// Pending-op drain for admin install/upgrade — not a per-replica // Pending-op drain for admin install — not a per-replica load.
// load. Replica 0 is the conventional admin slot. Upgrade ops set // Replica 0 is the conventional admin slot. Install is idempotent:
// force=true so the worker reinstalls the artifact and restarts // the worker short-circuits if the backend is already running.
// the live process; install ops keep the existing fast-path reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
// semantics for the case where the backend is already running.
force := op.Op == OpBackendUpgrade
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, force)
if err != nil { if err != nil {
applyErr = err applyErr = err
} else if !reply.Success { } else if !reply.Success {
applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error) applyErr = fmt.Errorf("install failed: %s", reply.Error)
}
case OpBackendUpgrade:
// Pending-op drain for admin upgrade — fires backend.upgrade so
// the slow re-pull doesn't head-of-line-block install traffic on
// the same worker. Falls back to the legacy backend.install
// Force=true path on nats.ErrNoResponders for old workers that
// don't subscribe to backend.upgrade yet (rolling-update window).
reply, err := rc.adapter.UpgradeBackend(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
instReply, instErr := rc.adapter.installWithForceFallback(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
if instErr != nil {
applyErr = instErr
} else if !instReply.Success {
applyErr = fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error)
}
} else {
applyErr = err
}
} else if !reply.Success {
applyErr = fmt.Errorf("upgrade failed: %s", reply.Error)
} }
default: default:
xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID) xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID)

View File

@@ -673,10 +673,10 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
return "", fmt.Errorf("no NATS connection for backend installation") return "", fmt.Errorf("no NATS connection for backend installation")
} }
// force=false: routine load, the worker's fast-path "already running → // Routine load: the worker's fast-path "already running → return current
// return current address" is correct here. Upgrades go through // address" is correct here. Upgrades go through
// DistributedBackendManager.UpgradeBackend which sets force=true. // DistributedBackendManager.UpgradeBackend on the backend.upgrade subject.
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, false) reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@@ -17,12 +17,19 @@ type backendStopRequest struct {
// NodeCommandSender abstracts NATS-based commands to worker nodes. // NodeCommandSender abstracts NATS-based commands to worker nodes.
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter. // Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
// //
// The `force` parameter on InstallBackend is set by the upgrade path to make // InstallBackend is idempotent: the worker short-circuits if the backend is
// the worker re-run the gallery install (overwriting the on-disk artifact) and // already running for the requested (modelID, replica) slot. Routine model
// restart any live process for that backend. Routine installs and load events // loads and admin installs both call this.
// pass force=false so an already-running process short-circuits as before. //
// UpgradeBackend is the destructive force-reinstall path: the worker stops
// every live process for the backend, re-pulls the gallery artifact, and
// replies. Caller (DistributedBackendManager.UpgradeBackend) handles
// rolling-update fallback to the legacy install Force=true path on
// nats.ErrNoResponders for old workers that don't subscribe to the new
// backend.upgrade subject.
type NodeCommandSender interface { type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error) ListBackends(nodeID string) (*messaging.BackendListReply, error)
StopBackend(nodeID, backend string) error StopBackend(nodeID, backend string) error
@@ -75,22 +82,21 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
} }
// InstallBackend sends a backend.install request-reply to a worker node. // InstallBackend sends a backend.install request-reply to a worker node.
// The worker installs the backend from gallery (if not already installed), // Idempotent on the worker: if the (modelID, replica) process is already
// starts the gRPC process, and replies when ready. // running, the worker short-circuits and returns its address; if the binary
// is on disk, the worker just spawns a process; only a missing binary
// triggers a full gallery pull.
// //
// replicaIndex selects which replica slot the worker should use as its // Timeout: 3 minutes. Most calls return in under 2 seconds (process already
// process key — distinct slots run on distinct ports so multiple replicas of // running). The 3-minute ceiling covers the cold-binary spawn-after-download
// the same model can coexist on a fat node. Pass 0 for single-replica. // case while still failing fast enough to surface real worker hangs.
// //
// force=true is the upgrade path: the worker stops any live process for this // For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead —
// backend, overwrites the on-disk artifact via gallery install, and restarts. // it lives on a different NATS subject so it cannot head-of-line-block
// Routine installs and load events pass force=false to keep the existing // routine load traffic on the same worker.
// "already running → return current address" fast path. func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
//
// Timeout: 5 minutes (gallery install can take a while).
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID) subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "force", force) xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType, Backend: backendType,
@@ -100,8 +106,50 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
Name: name, Name: name,
Alias: alias, Alias: alias,
ReplicaIndex: int32(replicaIndex), ReplicaIndex: int32(replicaIndex),
Force: force, }, 3*time.Minute)
}, 5*time.Minute) }
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
// The worker stops every live process for this backend, force-reinstalls
// from the gallery (overwriting the on-disk artifact), and replies. The
// next routine InstallBackend call spawns a fresh process with the new
// binary — upgrade itself does not start a process.
//
// Timeout: 15 minutes. Real-world worst case observed: 810 minutes for
// large CUDA-l4t backend images on Jetson over WiFi.
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 15*time.Minute)
}
// installWithForceFallback is the rolling-update fallback used by
// DistributedBackendManager.UpgradeBackend when backend.upgrade returns
// nats.ErrNoResponders (the worker is on a pre-2026-05-08 build that
// doesn't subscribe to the new subject). It re-fires the legacy
// backend.install with Force=true. Drop this once every worker is on
// 2026-05-08 or newer.
func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: true,
}, 15*time.Minute)
} }
// ListBackends queries a worker node for its installed backends via NATS request-reply. // ListBackends queries a worker node for its installed backends via NATS request-reply.

View File

@@ -0,0 +1,31 @@
package nodes
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
It("fires a NATS request to the backend.upgrade subject and returns the reply", func() {
mc := newScriptedMessagingClient()
nodeID := "node-x"
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(reply.Success).To(BeTrue())
})
It("returns the underlying error when the subject has no responders", func() {
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
adapter := NewRemoteUnloaderAdapter(nil, mc)
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
Expect(err).To(HaveOccurred())
})
})