Files
LocalAI/core/services/nodes/unloader.go
LocalAI [bot] e5d7b84216 fix(distributed): split NATS backend.upgrade off install + dedup loads (#9717)
* feat(messaging): add backend.upgrade NATS subject + payload types

Splits the slow force-reinstall path off backend.install so it can run on
its own subscription goroutine, eliminating head-of-line blocking between
routine model loads and full gallery upgrades.

Wire-level Force flag on BackendInstallRequest is kept for one release as
the rolling-update fallback target; doc note marks it deprecated.

Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed/worker): add per-backend mutex helper to backendSupervisor

Different backend names lock independently; same backend serializes. This
is the synchronization primitive used by the upcoming concurrent install
handler — without it, wrapping the NATS callback in a goroutine would
race the gallery directory when two requests target the same backend.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(distributed/worker): run backend.install handler in a goroutine

NATS subscriptions deliver messages serially on a single per-subscription
goroutine. With a synchronous install handler, a multi-minute gallery
download would head-of-line-block every other install request to the
same worker — manifesting upstream as a 5-minute "nats: timeout" on
unrelated routine model loads.

The body now runs in its own goroutine, with a per-backend mutex
(lockBackend) protecting the gallery directory from concurrent operations
on the same backend. Different backend names install in parallel.

Backward-compat: req.Force=true is still honored here, so an older master
that hasn't been updated to send on backend.upgrade keeps working.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed/worker): subscribe to backend.upgrade as a separate path

Slow force-reinstall now lives on its own NATS subscription, so a
multi-minute gallery pull cannot head-of-line-block the routine
backend.install handler on the same worker. Same per-backend mutex
guards both — concurrent install + upgrade for the same backend
serialize at the gallery directory; different backends are independent.

upgradeBackend stops every live process for the backend, force-installs
from gallery, and re-registers. It does not start a new process — the
next backend.install will spawn one with the freshly-pulled binary.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend

Master now sends to backend.upgrade for force-reinstall, with a
nats.ErrNoResponders fallback to the legacy backend.install Force=true
path so a rolling update with a new master + an old worker still
converges. The Force parameter leaves the public Go API surface
entirely — only the internal fallback sets it on the wire.

InstallBackend timeout drops 5min -> 3min (most replies are sub-second
since the worker short-circuits on already-running or already-installed).
UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi
gallery pulls.

Updates the admin install HTTP endpoint
(core/http/endpoints/localai/nodes.go) to the new signature too.

router_test.go's fakeUnloader does not yet implement the new interface
shape; Task 3.2 will catch it up before the next package-level test run.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): update fakeUnloader for new NodeCommandSender shape

InstallBackend lost its force bool param (Force is not part of the public
Go API anymore — only the internal upgrade-fallback path sets it on the
wire). UpgradeBackend gained a method. Fake records both call slices and
provides an installHook concurrency seam for upcoming singleflight tests.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): cover UpgradeBackend's new subject + rolling-update fallback

Task 3.1 changed the master to publish UpgradeBackend on the new
backend.upgrade subject; the existing UpgradeBackend tests scripted the
old install subject and so all 3 began failing as expected. Updates them
to script SubjectNodeBackendUpgrade with BackendUpgradeReply.

Adds two new specs for the rolling-update fallback:
  - ErrNoResponders on backend.upgrade triggers a backend.install
    Force=true retry on the same node.
  - Non-NoResponders errors propagate to the caller unchanged.

scriptedMessagingClient gains scriptNoResponders (real nats sentinel) and
scriptReplyMatching (predicate-matched canned reply, used to assert that
the fallback path actually sets Force=true on the install retry).

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(distributed): coalesce concurrent identical backend.install via singleflight

Six simultaneous chat completions for the same not-yet-loaded model were
observed firing six independent NATS install requests, each serializing
through the worker's per-subscription goroutine and amplifying queue
depth. SmartRouter now wraps the NATS round-trip in a singleflight.Group
keyed by (nodeID, backend, modelID, replica): N concurrent identical
loads share one round-trip and one reply.

Distinct (modelID, replica) keys still fire independent calls, so
multi-replica scaling and multi-model fan-out are unaffected.

fakeUnloader gains a sync.Mutex around its recording slices to keep
concurrent test goroutines race-clean.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(e2e/distributed): drop force arg from InstallBackend test calls

Two e2e test call sites still passed the trailing force bool that was
removed from RemoteUnloaderAdapter.InstallBackend in 9bde76d7. Caught
by golangci-lint typecheck on the upgrade-split branch (master CI was
already green because these tests don't run in the standard test path).

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(distributed): extract worker business logic to core/services/worker

core/cli/worker.go grew to 1212 lines after the backend.upgrade split.
The CLI package was carrying backendSupervisor, NATS lifecycle handlers,
gallery install/upgrade orchestration, S3 file staging, and registration
helpers — all distributed-worker business logic that doesn't belong in
the cobra surface.

Move it to a new core/services/worker package, mirroring the existing
core/services/{nodes,messaging,galleryop} pattern. core/cli/worker.go
shrinks to ~19 lines: a kong-tagged shim that embeds worker.Config and
delegates Run.

No behavior change. All symbols stay unexported except Config and Run.
The three worker-specific tests (addr/replica/concurrency) move with
the code via git mv so history follows them.

Files split as:
  worker.go        - Run entry point
  config.go        - Config struct (kong tags retained, kong not imported)
  supervisor.go    - backendProcess, backendSupervisor, process lifecycle
  install.go       - installBackend, upgradeBackend, findBackend, lockBackend
  lifecycle.go     - subscribeLifecycleEvents (verbatim, decomposition is
                     a follow-up commit)
  file_staging.go  - subscribeFileStaging, isPathAllowed
  registration.go  - advertiseAddr, registrationBody, heartbeatBody, etc.
  reply.go         - replyJSON
  process_helpers.go - readLastLinesFromFile

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(distributed/worker): decompose subscribeLifecycleEvents into per-event handlers

The 226-line subscribeLifecycleEvents method packed eight NATS subscriptions
inline. Each grew context-shaped doc comments mixed with subscription
plumbing, making it hard to read any one handler without scrolling past the
others. Extract each handler into its own method on *backendSupervisor; the
subscriber becomes a thin 8-line dispatcher.

No behavior change: each method body is byte-equivalent to its corresponding
inline goroutine + handler. Doc comments that were attached to the inline
SubscribeReply calls migrate to the new method godocs.

Adding the next NATS subject is now a 2-line patch to the dispatcher plus
one new method, instead of grafting onto a monolith.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-08 16:24:54 +02:00

231 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package nodes
import (
"context"
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/xlog"
)
// backendStopRequest is the request payload for backend.stop (fire-and-forget).
type backendStopRequest struct {
Backend string `json:"backend"`
}
// NodeCommandSender abstracts NATS-based commands to worker nodes.
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
//
// InstallBackend is idempotent: the worker short-circuits if the backend is
// already running for the requested (modelID, replica) slot. Routine model
// loads and admin installs both call this.
//
// UpgradeBackend is the destructive force-reinstall path: the worker stops
// every live process for the backend, re-pulls the gallery artifact, and
// replies. Caller (DistributedBackendManager.UpgradeBackend) handles
// rolling-update fallback to the legacy install Force=true path on
// nats.ErrNoResponders for old workers that don't subscribe to the new
// backend.upgrade subject.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
StopBackend(nodeID, backend string) error
UnloadModelOnNode(nodeID, modelName string) error
}
// RemoteUnloaderAdapter implements NodeCommandSender and model.RemoteModelUnloader
// by publishing NATS events for backend process lifecycle. The worker process
// subscribes and handles the actual process start/stop.
//
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
// over NATS for remote nodes.
type RemoteUnloaderAdapter struct {
registry ModelLocator
nats messaging.MessagingClient
}
// NewRemoteUnloaderAdapter creates a new adapter.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
return &RemoteUnloaderAdapter{
registry: registry,
nats: nats,
}
}
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
// to stop their backend process via NATS backend.stop event.
// The worker process handles: Free() → kill process.
// This is called by ModelLoader.deleteProcess() when process == nil (remote model).
func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
ctx := context.Background()
nodes, err := a.registry.FindNodesWithModel(ctx, modelName)
if err != nil || len(nodes) == 0 {
xlog.Debug("No remote nodes found with model", "model", modelName)
return nil
}
for _, node := range nodes {
xlog.Info("Sending NATS backend.stop to node", "model", modelName, "node", node.Name, "nodeID", node.ID)
if err := a.StopBackend(node.ID, modelName); err != nil {
xlog.Warn("Failed to send backend.stop", "node", node.Name, "error", err)
continue
}
// Remove every replica of this model on the node — the worker will
// handle the actual process cleanup.
a.registry.RemoveAllNodeModelReplicas(ctx, node.ID, modelName)
}
return nil
}
// InstallBackend sends a backend.install request-reply to a worker node.
// Idempotent on the worker: if the (modelID, replica) process is already
// running, the worker short-circuits and returns its address; if the binary
// is on disk, the worker just spawns a process; only a missing binary
// triggers a full gallery pull.
//
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
// case while still failing fast enough to surface real worker hangs.
//
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead —
// it lives on a different NATS subject so it cannot head-of-line-block
// routine load traffic on the same worker.
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
ModelID: modelID,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 3*time.Minute)
}
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
// The worker stops every live process for this backend, force-reinstalls
// from the gallery (overwriting the on-disk artifact), and replies. The
// next routine InstallBackend call spawns a fresh process with the new
// binary — upgrade itself does not start a process.
//
// Timeout: 15 minutes. Real-world worst case observed: 810 minutes for
// large CUDA-l4t backend images on Jetson over WiFi.
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 15*time.Minute)
}
// installWithForceFallback is the rolling-update fallback used by
// DistributedBackendManager.UpgradeBackend when backend.upgrade returns
// nats.ErrNoResponders (the worker is on a pre-2026-05-08 build that
// doesn't subscribe to the new subject). It re-fires the legacy
// backend.install with Force=true. Drop this once every worker is on
// 2026-05-08 or newer.
func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: true,
}, 15*time.Minute)
}
// ListBackends queries a worker node for its installed backends via NATS request-reply.
func (a *RemoteUnloaderAdapter) ListBackends(nodeID string) (*messaging.BackendListReply, error) {
subject := messaging.SubjectNodeBackendList(nodeID)
xlog.Debug("Sending NATS backend.list", "nodeID", nodeID)
return messaging.RequestJSON[messaging.BackendListRequest, messaging.BackendListReply](a.nats, subject, messaging.BackendListRequest{}, 30*time.Second)
}
// StopBackend tells a worker node to stop a specific gRPC backend process.
// If backend is empty, the worker stops ALL backends.
// The node stays registered and can receive another InstallBackend later.
func (a *RemoteUnloaderAdapter) StopBackend(nodeID, backend string) error {
subject := messaging.SubjectNodeBackendStop(nodeID)
if backend == "" {
return a.nats.Publish(subject, nil)
}
req := struct {
Backend string `json:"backend"`
}{Backend: backend}
return a.nats.Publish(subject, req)
}
// DeleteBackend tells a worker node to delete a backend (stop + remove files).
func (a *RemoteUnloaderAdapter) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) {
subject := messaging.SubjectNodeBackendDelete(nodeID)
xlog.Info("Sending NATS backend.delete", "nodeID", nodeID, "backend", backendName)
return messaging.RequestJSON[messaging.BackendDeleteRequest, messaging.BackendDeleteReply](a.nats, subject, messaging.BackendDeleteRequest{Backend: backendName}, 2*time.Minute)
}
// UnloadModelOnNode sends a model.unload request to a specific node.
// The worker calls gRPC Free() to release GPU memory.
func (a *RemoteUnloaderAdapter) UnloadModelOnNode(nodeID, modelName string) error {
subject := messaging.SubjectNodeModelUnload(nodeID)
xlog.Info("Sending NATS model.unload", "nodeID", nodeID, "model", modelName)
reply, err := messaging.RequestJSON[messaging.ModelUnloadRequest, messaging.ModelUnloadReply](a.nats, subject, messaging.ModelUnloadRequest{ModelName: modelName}, 30*time.Second)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("model.unload on node %s: %s", nodeID, reply.Error)
}
return nil
}
// DeleteModelFiles sends model.delete to all nodes that have the model cached.
// This removes model files from worker disks.
func (a *RemoteUnloaderAdapter) DeleteModelFiles(modelName string) error {
nodes, err := a.registry.FindNodesWithModel(context.Background(), modelName)
if err != nil || len(nodes) == 0 {
xlog.Debug("No nodes with model for file deletion", "model", modelName)
return nil
}
for _, node := range nodes {
subject := messaging.SubjectNodeModelDelete(node.ID)
xlog.Info("Sending NATS model.delete", "nodeID", node.ID, "model", modelName)
reply, err := messaging.RequestJSON[messaging.ModelDeleteRequest, messaging.ModelDeleteReply](a.nats, subject, messaging.ModelDeleteRequest{ModelName: modelName}, 30*time.Second)
if err != nil {
xlog.Warn("model.delete failed on node", "node", node.Name, "error", err)
continue
}
if !reply.Success {
xlog.Warn("model.delete failed on node", "node", node.Name, "error", reply.Error)
}
}
return nil
}
// StopNode tells a worker node to shut down entirely (deregister + exit).
func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
subject := messaging.SubjectNodeStop(nodeID)
return a.nats.Publish(subject, nil)
}