mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-19 14:17:21 -04:00
feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend
Master now sends to backend.upgrade for force-reinstall, with a nats.ErrNoResponders fallback to the legacy backend.install Force=true path so a rolling update with a new master + an old worker still converges. The Force parameter leaves the public Go API surface entirely — only the internal fallback sets it on the wire. InstallBackend timeout drops 5min -> 3min (most replies are sub-second since the worker short-circuits on already-running or already-installed). UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi gallery pulls. Updates the admin install HTTP endpoint (core/http/endpoints/localai/nodes.go) to the new signature too. router_test.go's fakeUnloader does not yet implement the new interface shape; Task 3.2 will catch it up before the next package-level test run. Assisted-by: Claude:claude-opus-4-7 Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -407,10 +407,10 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
|
|||||||
}
|
}
|
||||||
// Admin-driven backend install: not tied to a specific replica slot
|
// Admin-driven backend install: not tied to a specific replica slot
|
||||||
// (no model is being loaded). Pass replica 0 to match the worker's
|
// (no model is being loaded). Pass replica 0 to match the worker's
|
||||||
// admin process-key convention (`backend#0`). force=false so the
|
// admin process-key convention (`backend#0`). The worker's fast path
|
||||||
// worker's fast path takes over if the backend is already running —
|
// takes over if the backend is already running — upgrades go through
|
||||||
// upgrades go through the dedicated /api/backends/upgrade path.
|
// the dedicated /api/backends/upgrade path on backend.upgrade.
|
||||||
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0, false)
|
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
|
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
|
||||||
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
|
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
|
||||||
|
|||||||
@@ -339,7 +339,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
|
|||||||
// Admin-driven backend install: not tied to a specific replica slot.
|
// Admin-driven backend install: not tied to a specific replica slot.
|
||||||
// Pass replica 0 — the worker's processKey is "backend#0" when no
|
// Pass replica 0 — the worker's processKey is "backend#0" when no
|
||||||
// modelID is supplied, matching pre-PR4 behavior.
|
// modelID is supplied, matching pre-PR4 behavior.
|
||||||
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, false)
|
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -354,18 +354,18 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
|
|||||||
return result.Err()
|
return result.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
|
// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
|
||||||
// from the gallery). Unlike Install/Delete, upgrade only targets the nodes
|
// force-reinstall path doesn't head-of-line-block routine model loads on
|
||||||
// that already report this backend as installed — fanning out to every node
|
// the same worker. Only nodes that already report this backend as installed
|
||||||
// would ask workers to "upgrade" something they never had, which fails at
|
// are targeted — fanning out to every node would ask workers to "upgrade"
|
||||||
// the gallery (e.g. a darwin/arm64 worker has no platform variant for a
|
// something they never had, which fails at the gallery (e.g. a darwin/arm64
|
||||||
// linux-only backend) and leaves a forever-retrying pending_backend_ops row.
|
// worker has no platform variant for a linux-only backend) and leaves a
|
||||||
|
// forever-retrying pending_backend_ops row.
|
||||||
//
|
//
|
||||||
// force=true on the install call is what distinguishes upgrade from install:
|
// Rolling-update fallback: when a worker returns nats.ErrNoResponders on
|
||||||
// the worker stops the live process for this backend, overwrites the on-disk
|
// backend.upgrade, we try the legacy backend.install Force=true path so a
|
||||||
// artifact, and restarts. Without it, the worker's "already running" fast
|
// new master + old worker still converges. Drop the fallback once every
|
||||||
// path turns every backend.install into a no-op and the gallery's drift
|
// worker in the fleet is on 2026-05-08 or newer.
|
||||||
// detection never converges.
|
|
||||||
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
|
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
|
||||||
galleriesJSON, _ := json.Marshal(d.backendGalleries)
|
galleriesJSON, _ := json.Marshal(d.backendGalleries)
|
||||||
|
|
||||||
@@ -383,8 +383,20 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
|
|||||||
}
|
}
|
||||||
|
|
||||||
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
|
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
|
||||||
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0, true)
|
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// Rolling-update fallback: an older worker doesn't know
|
||||||
|
// backend.upgrade. Try the legacy install-with-force path.
|
||||||
|
if errors.Is(err, nats.ErrNoResponders) {
|
||||||
|
instReply, instErr := d.adapter.installWithForceFallback(node.ID, name, string(galleriesJSON), "", "", "", 0)
|
||||||
|
if instErr != nil {
|
||||||
|
return instErr
|
||||||
|
}
|
||||||
|
if !instReply.Success {
|
||||||
|
return fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !reply.Success {
|
if !reply.Success {
|
||||||
|
|||||||
@@ -187,18 +187,36 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
|
|||||||
switch op.Op {
|
switch op.Op {
|
||||||
case OpBackendDelete:
|
case OpBackendDelete:
|
||||||
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
|
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
|
||||||
case OpBackendInstall, OpBackendUpgrade:
|
case OpBackendInstall:
|
||||||
// Pending-op drain for admin install/upgrade — not a per-replica
|
// Pending-op drain for admin install — not a per-replica load.
|
||||||
// load. Replica 0 is the conventional admin slot. Upgrade ops set
|
// Replica 0 is the conventional admin slot. Install is idempotent:
|
||||||
// force=true so the worker reinstalls the artifact and restarts
|
// the worker short-circuits if the backend is already running.
|
||||||
// the live process; install ops keep the existing fast-path
|
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
|
||||||
// semantics for the case where the backend is already running.
|
|
||||||
force := op.Op == OpBackendUpgrade
|
|
||||||
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, force)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
applyErr = err
|
applyErr = err
|
||||||
} else if !reply.Success {
|
} else if !reply.Success {
|
||||||
applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error)
|
applyErr = fmt.Errorf("install failed: %s", reply.Error)
|
||||||
|
}
|
||||||
|
case OpBackendUpgrade:
|
||||||
|
// Pending-op drain for admin upgrade — fires backend.upgrade so
|
||||||
|
// the slow re-pull doesn't head-of-line-block install traffic on
|
||||||
|
// the same worker. Falls back to the legacy backend.install
|
||||||
|
// Force=true path on nats.ErrNoResponders for old workers that
|
||||||
|
// don't subscribe to backend.upgrade yet (rolling-update window).
|
||||||
|
reply, err := rc.adapter.UpgradeBackend(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, nats.ErrNoResponders) {
|
||||||
|
instReply, instErr := rc.adapter.installWithForceFallback(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
|
||||||
|
if instErr != nil {
|
||||||
|
applyErr = instErr
|
||||||
|
} else if !instReply.Success {
|
||||||
|
applyErr = fmt.Errorf("upgrade (legacy fallback) failed: %s", instReply.Error)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
applyErr = err
|
||||||
|
}
|
||||||
|
} else if !reply.Success {
|
||||||
|
applyErr = fmt.Errorf("upgrade failed: %s", reply.Error)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID)
|
xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID)
|
||||||
|
|||||||
@@ -673,10 +673,10 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
|
|||||||
return "", fmt.Errorf("no NATS connection for backend installation")
|
return "", fmt.Errorf("no NATS connection for backend installation")
|
||||||
}
|
}
|
||||||
|
|
||||||
// force=false: routine load, the worker's fast-path "already running →
|
// Routine load: the worker's fast-path "already running → return current
|
||||||
// return current address" is correct here. Upgrades go through
|
// address" is correct here. Upgrades go through
|
||||||
// DistributedBackendManager.UpgradeBackend which sets force=true.
|
// DistributedBackendManager.UpgradeBackend on the backend.upgrade subject.
|
||||||
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, false)
|
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,12 +17,19 @@ type backendStopRequest struct {
|
|||||||
// NodeCommandSender abstracts NATS-based commands to worker nodes.
|
// NodeCommandSender abstracts NATS-based commands to worker nodes.
|
||||||
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
|
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
|
||||||
//
|
//
|
||||||
// The `force` parameter on InstallBackend is set by the upgrade path to make
|
// InstallBackend is idempotent: the worker short-circuits if the backend is
|
||||||
// the worker re-run the gallery install (overwriting the on-disk artifact) and
|
// already running for the requested (modelID, replica) slot. Routine model
|
||||||
// restart any live process for that backend. Routine installs and load events
|
// loads and admin installs both call this.
|
||||||
// pass force=false so an already-running process short-circuits as before.
|
//
|
||||||
|
// UpgradeBackend is the destructive force-reinstall path: the worker stops
|
||||||
|
// every live process for the backend, re-pulls the gallery artifact, and
|
||||||
|
// replies. Caller (DistributedBackendManager.UpgradeBackend) handles
|
||||||
|
// rolling-update fallback to the legacy install Force=true path on
|
||||||
|
// nats.ErrNoResponders for old workers that don't subscribe to the new
|
||||||
|
// backend.upgrade subject.
|
||||||
type NodeCommandSender interface {
|
type NodeCommandSender interface {
|
||||||
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error)
|
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
|
||||||
|
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
|
||||||
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
|
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
|
||||||
ListBackends(nodeID string) (*messaging.BackendListReply, error)
|
ListBackends(nodeID string) (*messaging.BackendListReply, error)
|
||||||
StopBackend(nodeID, backend string) error
|
StopBackend(nodeID, backend string) error
|
||||||
@@ -75,22 +82,21 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// InstallBackend sends a backend.install request-reply to a worker node.
|
// InstallBackend sends a backend.install request-reply to a worker node.
|
||||||
// The worker installs the backend from gallery (if not already installed),
|
// Idempotent on the worker: if the (modelID, replica) process is already
|
||||||
// starts the gRPC process, and replies when ready.
|
// running, the worker short-circuits and returns its address; if the binary
|
||||||
|
// is on disk, the worker just spawns a process; only a missing binary
|
||||||
|
// triggers a full gallery pull.
|
||||||
//
|
//
|
||||||
// replicaIndex selects which replica slot the worker should use as its
|
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
|
||||||
// process key — distinct slots run on distinct ports so multiple replicas of
|
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
|
||||||
// the same model can coexist on a fat node. Pass 0 for single-replica.
|
// case while still failing fast enough to surface real worker hangs.
|
||||||
//
|
//
|
||||||
// force=true is the upgrade path: the worker stops any live process for this
|
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead —
|
||||||
// backend, overwrites the on-disk artifact via gallery install, and restarts.
|
// it lives on a different NATS subject so it cannot head-of-line-block
|
||||||
// Routine installs and load events pass force=false to keep the existing
|
// routine load traffic on the same worker.
|
||||||
// "already running → return current address" fast path.
|
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
|
||||||
//
|
|
||||||
// Timeout: 5 minutes (gallery install can take a while).
|
|
||||||
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) {
|
|
||||||
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||||||
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "force", force)
|
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
|
||||||
|
|
||||||
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||||
Backend: backendType,
|
Backend: backendType,
|
||||||
@@ -100,8 +106,50 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
|
|||||||
Name: name,
|
Name: name,
|
||||||
Alias: alias,
|
Alias: alias,
|
||||||
ReplicaIndex: int32(replicaIndex),
|
ReplicaIndex: int32(replicaIndex),
|
||||||
Force: force,
|
}, 3*time.Minute)
|
||||||
}, 5*time.Minute)
|
}
|
||||||
|
|
||||||
|
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
|
||||||
|
// The worker stops every live process for this backend, force-reinstalls
|
||||||
|
// from the gallery (overwriting the on-disk artifact), and replies. The
|
||||||
|
// next routine InstallBackend call spawns a fresh process with the new
|
||||||
|
// binary — upgrade itself does not start a process.
|
||||||
|
//
|
||||||
|
// Timeout: 15 minutes. Real-world worst case observed: 8–10 minutes for
|
||||||
|
// large CUDA-l4t backend images on Jetson over WiFi.
|
||||||
|
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
|
||||||
|
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
|
||||||
|
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
|
||||||
|
|
||||||
|
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
|
||||||
|
Backend: backendType,
|
||||||
|
BackendGalleries: galleriesJSON,
|
||||||
|
URI: uri,
|
||||||
|
Name: name,
|
||||||
|
Alias: alias,
|
||||||
|
ReplicaIndex: int32(replicaIndex),
|
||||||
|
}, 15*time.Minute)
|
||||||
|
}
|
||||||
|
|
||||||
|
// installWithForceFallback is the rolling-update fallback used by
|
||||||
|
// DistributedBackendManager.UpgradeBackend when backend.upgrade returns
|
||||||
|
// nats.ErrNoResponders (the worker is on a pre-2026-05-08 build that
|
||||||
|
// doesn't subscribe to the new subject). It re-fires the legacy
|
||||||
|
// backend.install with Force=true. Drop this once every worker is on
|
||||||
|
// 2026-05-08 or newer.
|
||||||
|
func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
|
||||||
|
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||||||
|
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
|
||||||
|
|
||||||
|
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||||
|
Backend: backendType,
|
||||||
|
BackendGalleries: galleriesJSON,
|
||||||
|
URI: uri,
|
||||||
|
Name: name,
|
||||||
|
Alias: alias,
|
||||||
|
ReplicaIndex: int32(replicaIndex),
|
||||||
|
Force: true,
|
||||||
|
}, 15*time.Minute)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListBackends queries a worker node for its installed backends via NATS request-reply.
|
// ListBackends queries a worker node for its installed backends via NATS request-reply.
|
||||||
|
|||||||
31
core/services/nodes/unloader_upgrade_test.go
Normal file
31
core/services/nodes/unloader_upgrade_test.go
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
package nodes
|
||||||
|
|
||||||
|
import (
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
"github.com/mudler/LocalAI/core/services/messaging"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
|
||||||
|
It("fires a NATS request to the backend.upgrade subject and returns the reply", func() {
|
||||||
|
mc := newScriptedMessagingClient()
|
||||||
|
nodeID := "node-x"
|
||||||
|
|
||||||
|
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
|
||||||
|
messaging.BackendUpgradeReply{Success: true})
|
||||||
|
|
||||||
|
adapter := NewRemoteUnloaderAdapter(nil, mc)
|
||||||
|
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(reply.Success).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns the underlying error when the subject has no responders", func() {
|
||||||
|
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
|
||||||
|
|
||||||
|
adapter := NewRemoteUnloaderAdapter(nil, mc)
|
||||||
|
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
})
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user