mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-19 14:17:21 -04:00
feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend
Master now sends to backend.upgrade for force-reinstall, with a nats.ErrNoResponders fallback to the legacy backend.install Force=true path so a rolling update with a new master + an old worker still converges. The Force parameter leaves the public Go API surface entirely — only the internal fallback sets it on the wire. InstallBackend timeout drops 5min -> 3min (most replies are sub-second since the worker short-circuits on already-running or already-installed). UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi gallery pulls. Updates the admin install HTTP endpoint (core/http/endpoints/localai/nodes.go) to the new signature too. router_test.go's fakeUnloader does not yet implement the new interface shape; Task 3.2 will catch it up before the next package-level test run. Assisted-by: Claude:claude-opus-4-7 Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -407,10 +407,10 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
|
||||
}
|
||||
// Admin-driven backend install: not tied to a specific replica slot
|
||||
// (no model is being loaded). Pass replica 0 to match the worker's
|
||||
// admin process-key convention (`backend#0`). force=false so the
|
||||
// worker's fast path takes over if the backend is already running —
|
||||
// upgrades go through the dedicated /api/backends/upgrade path.
|
||||
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0, false)
|
||||
// admin process-key convention (`backend#0`). The worker's fast path
|
||||
// takes over if the backend is already running — upgrades go through
|
||||
// the dedicated /api/backends/upgrade path on backend.upgrade.
|
||||
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)
|
||||
if err != nil {
|
||||
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
|
||||
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
|
||||
|
||||
@@ -339,7 +339,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
|
||||
// Admin-driven backend install: not tied to a specific replica slot.
|
||||
// Pass replica 0 — the worker's processKey is "backend#0" when no
|
||||
// modelID is supplied, matching pre-PR4 behavior.
|
||||
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, false)
|
||||
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -354,18 +354,18 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
|
||||
return result.Err()
|
||||
}
|
||||
|
||||
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
|
||||
// from the gallery). Unlike Install/Delete, upgrade only targets the nodes
|
||||
// that already report this backend as installed — fanning out to every node
|
||||
// would ask workers to "upgrade" something they never had, which fails at
|
||||
// the gallery (e.g. a darwin/arm64 worker has no platform variant for a
|
||||
// linux-only backend) and leaves a forever-retrying pending_backend_ops row.
|
||||
// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
|
||||
// force-reinstall path doesn't head-of-line-block routine model loads on
|
||||
// the same worker. Only nodes that already report this backend as installed
|
||||
// are targeted — fanning out to every node would ask workers to "upgrade"
|
||||
// something they never had, which fails at the gallery (e.g. a darwin/arm64
|
||||
// worker has no platform variant for a linux-only backend) and leaves a
|
||||
// forever-retrying pending_backend_ops row.
|
||||
//
|
||||
// force=true on the install call is what distinguishes upgrade from install:
|
||||
// the worker stops the live process for this backend, overwrites the on-disk
|
||||
// artifact, and restarts. Without it, the worker's "already running" fast
|
||||
// path turns every backend.install into a no-op and the gallery's drift
|
||||
// detection never converges.
|
||||
// Rolling-update fallback: when a worker returns nats.ErrNoResponders on
|
||||
// backend.upgrade, we try the legacy backend.install Force=true path so a
|
||||
// new master + old worker still converges. Drop the fallback once every
|
||||
// worker in the fleet is on 2026-05-08 or newer.
|
||||
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
|
||||
galleriesJSON, _ := json.Marshal(d.backendGalleries)
|
||||
|
||||
@@ -383,8 +383,20 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
|
||||
}
|
||||
|
||||
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
|
||||
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0, true)
|
||||
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
|
||||
if err != nil {
|
||||
// Rolling-update fallback: an older worker doesn't know
|
||||
// backend.upgrade. Try the legacy install-with-force path.
|
||||
if errors.Is(err, nats.ErrNoResponders) {
|
||||
instReply, instErr := d.adapter.installWithForceFallback(node.ID, name, string(galleriesJSON), "", "", "", 0)
|
||||
if instErr != nil {
|
||||
return instErr
|
||||
}
|
||||
if !instReply.Success {
|
||||
return fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
if !reply.Success {
|
||||
|
||||
@@ -187,18 +187,36 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
|
||||
switch op.Op {
|
||||
case OpBackendDelete:
|
||||
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
|
||||
case OpBackendInstall, OpBackendUpgrade:
|
||||
// Pending-op drain for admin install/upgrade — not a per-replica
|
||||
// load. Replica 0 is the conventional admin slot. Upgrade ops set
|
||||
// force=true so the worker reinstalls the artifact and restarts
|
||||
// the live process; install ops keep the existing fast-path
|
||||
// semantics for the case where the backend is already running.
|
||||
force := op.Op == OpBackendUpgrade
|
||||
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, force)
|
||||
case OpBackendInstall:
|
||||
// Pending-op drain for admin install — not a per-replica load.
|
||||
// Replica 0 is the conventional admin slot. Install is idempotent:
|
||||
// the worker short-circuits if the backend is already running.
|
||||
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
|
||||
if err != nil {
|
||||
applyErr = err
|
||||
} else if !reply.Success {
|
||||
applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error)
|
||||
applyErr = fmt.Errorf("install failed: %s", reply.Error)
|
||||
}
|
||||
case OpBackendUpgrade:
|
||||
// Pending-op drain for admin upgrade — fires backend.upgrade so
|
||||
// the slow re-pull doesn't head-of-line-block install traffic on
|
||||
// the same worker. Falls back to the legacy backend.install
|
||||
// Force=true path on nats.ErrNoResponders for old workers that
|
||||
// don't subscribe to backend.upgrade yet (rolling-update window).
|
||||
reply, err := rc.adapter.UpgradeBackend(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
|
||||
if err != nil {
|
||||
if errors.Is(err, nats.ErrNoResponders) {
|
||||
instReply, instErr := rc.adapter.installWithForceFallback(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
|
||||
if instErr != nil {
|
||||
applyErr = instErr
|
||||
} else if !instReply.Success {
|
||||
applyErr = fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error)
|
||||
}
|
||||
} else {
|
||||
applyErr = err
|
||||
}
|
||||
} else if !reply.Success {
|
||||
applyErr = fmt.Errorf("upgrade failed: %s", reply.Error)
|
||||
}
|
||||
default:
|
||||
xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID)
|
||||
|
||||
@@ -673,10 +673,10 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
|
||||
return "", fmt.Errorf("no NATS connection for backend installation")
|
||||
}
|
||||
|
||||
// force=false: routine load, the worker's fast-path "already running →
|
||||
// return current address" is correct here. Upgrades go through
|
||||
// DistributedBackendManager.UpgradeBackend which sets force=true.
|
||||
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, false)
|
||||
// Routine load: the worker's fast-path "already running → return current
|
||||
// address" is correct here. Upgrades go through
|
||||
// DistributedBackendManager.UpgradeBackend on the backend.upgrade subject.
|
||||
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -17,12 +17,19 @@ type backendStopRequest struct {
|
||||
// NodeCommandSender abstracts NATS-based commands to worker nodes.
|
||||
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
|
||||
//
|
||||
// The `force` parameter on InstallBackend is set by the upgrade path to make
|
||||
// the worker re-run the gallery install (overwriting the on-disk artifact) and
|
||||
// restart any live process for that backend. Routine installs and load events
|
||||
// pass force=false so an already-running process short-circuits as before.
|
||||
// InstallBackend is idempotent: the worker short-circuits if the backend is
|
||||
// already running for the requested (modelID, replica) slot. Routine model
|
||||
// loads and admin installs both call this.
|
||||
//
|
||||
// UpgradeBackend is the destructive force-reinstall path: the worker stops
|
||||
// every live process for the backend, re-pulls the gallery artifact, and
|
||||
// replies. Caller (DistributedBackendManager.UpgradeBackend) handles
|
||||
// rolling-update fallback to the legacy install Force=true path on
|
||||
// nats.ErrNoResponders for old workers that don't subscribe to the new
|
||||
// backend.upgrade subject.
|
||||
type NodeCommandSender interface {
|
||||
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error)
|
||||
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
|
||||
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
|
||||
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
|
||||
ListBackends(nodeID string) (*messaging.BackendListReply, error)
|
||||
StopBackend(nodeID, backend string) error
|
||||
@@ -75,22 +82,21 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
|
||||
}
|
||||
|
||||
// InstallBackend sends a backend.install request-reply to a worker node.
|
||||
// The worker installs the backend from gallery (if not already installed),
|
||||
// starts the gRPC process, and replies when ready.
|
||||
// Idempotent on the worker: if the (modelID, replica) process is already
|
||||
// running, the worker short-circuits and returns its address; if the binary
|
||||
// is on disk, the worker just spawns a process; only a missing binary
|
||||
// triggers a full gallery pull.
|
||||
//
|
||||
// replicaIndex selects which replica slot the worker should use as its
|
||||
// process key — distinct slots run on distinct ports so multiple replicas of
|
||||
// the same model can coexist on a fat node. Pass 0 for single-replica.
|
||||
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
|
||||
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
|
||||
// case while still failing fast enough to surface real worker hangs.
|
||||
//
|
||||
// force=true is the upgrade path: the worker stops any live process for this
|
||||
// backend, overwrites the on-disk artifact via gallery install, and restarts.
|
||||
// Routine installs and load events pass force=false to keep the existing
|
||||
// "already running → return current address" fast path.
|
||||
//
|
||||
// Timeout: 5 minutes (gallery install can take a while).
|
||||
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) {
|
||||
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead —
|
||||
// it lives on a different NATS subject so it cannot head-of-line-block
|
||||
// routine load traffic on the same worker.
|
||||
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
|
||||
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||||
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "force", force)
|
||||
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
|
||||
|
||||
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||
Backend: backendType,
|
||||
@@ -100,8 +106,50 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
|
||||
Name: name,
|
||||
Alias: alias,
|
||||
ReplicaIndex: int32(replicaIndex),
|
||||
Force: force,
|
||||
}, 5*time.Minute)
|
||||
}, 3*time.Minute)
|
||||
}
|
||||
|
||||
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
|
||||
// The worker stops every live process for this backend, force-reinstalls
|
||||
// from the gallery (overwriting the on-disk artifact), and replies. The
|
||||
// next routine InstallBackend call spawns a fresh process with the new
|
||||
// binary — upgrade itself does not start a process.
|
||||
//
|
||||
// Timeout: 15 minutes. Real-world worst case observed: 8–10 minutes for
|
||||
// large CUDA-l4t backend images on Jetson over WiFi.
|
||||
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
|
||||
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
|
||||
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
|
||||
|
||||
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
|
||||
Backend: backendType,
|
||||
BackendGalleries: galleriesJSON,
|
||||
URI: uri,
|
||||
Name: name,
|
||||
Alias: alias,
|
||||
ReplicaIndex: int32(replicaIndex),
|
||||
}, 15*time.Minute)
|
||||
}
|
||||
|
||||
// installWithForceFallback is the rolling-update fallback used by
|
||||
// DistributedBackendManager.UpgradeBackend when backend.upgrade returns
|
||||
// nats.ErrNoResponders (the worker is on a pre-2026-05-08 build that
|
||||
// doesn't subscribe to the new subject). It re-fires the legacy
|
||||
// backend.install with Force=true. Drop this once every worker is on
|
||||
// 2026-05-08 or newer.
|
||||
func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
|
||||
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||||
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
|
||||
|
||||
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||
Backend: backendType,
|
||||
BackendGalleries: galleriesJSON,
|
||||
URI: uri,
|
||||
Name: name,
|
||||
Alias: alias,
|
||||
ReplicaIndex: int32(replicaIndex),
|
||||
Force: true,
|
||||
}, 15*time.Minute)
|
||||
}
|
||||
|
||||
// ListBackends queries a worker node for its installed backends via NATS request-reply.
|
||||
|
||||
31
core/services/nodes/unloader_upgrade_test.go
Normal file
31
core/services/nodes/unloader_upgrade_test.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
)
|
||||
|
||||
var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
|
||||
It("fires a NATS request to the backend.upgrade subject and returns the reply", func() {
|
||||
mc := newScriptedMessagingClient()
|
||||
nodeID := "node-x"
|
||||
|
||||
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
|
||||
messaging.BackendUpgradeReply{Success: true})
|
||||
|
||||
adapter := NewRemoteUnloaderAdapter(nil, mc)
|
||||
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(reply.Success).To(BeTrue())
|
||||
})
|
||||
|
||||
It("returns the underlying error when the subject has no responders", func() {
|
||||
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
|
||||
|
||||
adapter := NewRemoteUnloaderAdapter(nil, mc)
|
||||
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user