mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 13:10:23 -04:00
* feat(messaging): add backend.upgrade NATS subject + payload types
Splits the slow force-reinstall path off backend.install so it can run on
its own subscription goroutine, eliminating head-of-line blocking between
routine model loads and full gallery upgrades.
Wire-level Force flag on BackendInstallRequest is kept for one release as
the rolling-update fallback target; doc note marks it deprecated.
Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): add per-backend mutex helper to backendSupervisor
Different backend names lock independently; same backend serializes. This
is the synchronization primitive used by the upcoming concurrent install
handler — without it, wrapping the NATS callback in a goroutine would
race the gallery directory when two requests target the same backend.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed/worker): run backend.install handler in a goroutine
NATS subscriptions deliver messages serially on a single per-subscription
goroutine. With a synchronous install handler, a multi-minute gallery
download would head-of-line-block every other install request to the
same worker — manifesting upstream as a 5-minute "nats: timeout" on
unrelated routine model loads.
The body now runs in its own goroutine, with a per-backend mutex
(lockBackend) protecting the gallery directory from concurrent operations
on the same backend. Different backend names install in parallel.
Backward-compat: req.Force=true is still honored here, so an older master
that hasn't been updated to send on backend.upgrade keeps working.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): subscribe to backend.upgrade as a separate path
Slow force-reinstall now lives on its own NATS subscription, so a
multi-minute gallery pull cannot head-of-line-block the routine
backend.install handler on the same worker. Same per-backend mutex
guards both — concurrent install + upgrade for the same backend
serialize at the gallery directory; different backends are independent.
upgradeBackend stops every live process for the backend, force-installs
from gallery, and re-registers. It does not start a new process — the
next backend.install will spawn one with the freshly-pulled binary.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend
Master now sends to backend.upgrade for force-reinstall, with a
nats.ErrNoResponders fallback to the legacy backend.install Force=true
path so a rolling update with a new master + an old worker still
converges. The Force parameter leaves the public Go API surface
entirely — only the internal fallback sets it on the wire.
InstallBackend timeout drops 5min -> 3min (most replies are sub-second
since the worker short-circuits on already-running or already-installed).
UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi
gallery pulls.
Updates the admin install HTTP endpoint
(core/http/endpoints/localai/nodes.go) to the new signature too.
router_test.go's fakeUnloader does not yet implement the new interface
shape; Task 3.2 will catch it up before the next package-level test run.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): update fakeUnloader for new NodeCommandSender shape
InstallBackend lost its force bool param (Force is not part of the public
Go API anymore — only the internal upgrade-fallback path sets it on the
wire). UpgradeBackend gained a method. Fake records both call slices and
provides an installHook concurrency seam for upcoming singleflight tests.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): cover UpgradeBackend's new subject + rolling-update fallback
Task 3.1 changed the master to publish UpgradeBackend on the new
backend.upgrade subject; the existing UpgradeBackend tests scripted the
old install subject and so all 3 began failing as expected. Updates them
to script SubjectNodeBackendUpgrade with BackendUpgradeReply.
Adds two new specs for the rolling-update fallback:
- ErrNoResponders on backend.upgrade triggers a backend.install
Force=true retry on the same node.
- Non-NoResponders errors propagate to the caller unchanged.
scriptedMessagingClient gains scriptNoResponders (real nats sentinel) and
scriptReplyMatching (predicate-matched canned reply, used to assert that
the fallback path actually sets Force=true on the install retry).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed): coalesce concurrent identical backend.install via singleflight
Six simultaneous chat completions for the same not-yet-loaded model were
observed firing six independent NATS install requests, each serializing
through the worker's per-subscription goroutine and amplifying queue
depth. SmartRouter now wraps the NATS round-trip in a singleflight.Group
keyed by (nodeID, backend, modelID, replica): N concurrent identical
loads share one round-trip and one reply.
Distinct (modelID, replica) keys still fire independent calls, so
multi-replica scaling and multi-model fan-out are unaffected.
fakeUnloader gains a sync.Mutex around its recording slices to keep
concurrent test goroutines race-clean.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(e2e/distributed): drop force arg from InstallBackend test calls
Two e2e test call sites still passed the trailing force bool that was
removed from RemoteUnloaderAdapter.InstallBackend in 9bde76d7. Caught
by golangci-lint typecheck on the upgrade-split branch (master CI was
already green because these tests don't run in the standard test path).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed): extract worker business logic to core/services/worker
core/cli/worker.go grew to 1212 lines after the backend.upgrade split.
The CLI package was carrying backendSupervisor, NATS lifecycle handlers,
gallery install/upgrade orchestration, S3 file staging, and registration
helpers — all distributed-worker business logic that doesn't belong in
the cobra surface.
Move it to a new core/services/worker package, mirroring the existing
core/services/{nodes,messaging,galleryop} pattern. core/cli/worker.go
shrinks to ~19 lines: a kong-tagged shim that embeds worker.Config and
delegates Run.
No behavior change. All symbols stay unexported except Config and Run.
The three worker-specific tests (addr/replica/concurrency) move with
the code via git mv so history follows them.
Files split as:
worker.go - Run entry point
config.go - Config struct (kong tags retained, kong not imported)
supervisor.go - backendProcess, backendSupervisor, process lifecycle
install.go - installBackend, upgradeBackend, findBackend, lockBackend
lifecycle.go - subscribeLifecycleEvents (verbatim, decomposition is
a follow-up commit)
file_staging.go - subscribeFileStaging, isPathAllowed
registration.go - advertiseAddr, registrationBody, heartbeatBody, etc.
reply.go - replyJSON
process_helpers.go - readLastLinesFromFile
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed/worker): decompose subscribeLifecycleEvents into per-event handlers
The 226-line subscribeLifecycleEvents method packed eight NATS subscriptions
inline. Each grew context-shaped doc comments mixed with subscription
plumbing, making it hard to read any one handler without scrolling past the
others. Extract each handler into its own method on *backendSupervisor; the
subscriber becomes a thin 8-line dispatcher.
No behavior change: each method body is byte-equivalent to its corresponding
inline goroutine + handler. Doc comments that were attached to the inline
SubscribeReply calls migrate to the new method godocs.
Adding the next NATS subject is now a 2-line patch to the dispatcher plus
one new method, instead of grafting onto a monolith.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
---------
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
231 lines
10 KiB
Go
231 lines
10 KiB
Go
package nodes
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/mudler/LocalAI/core/services/messaging"
|
||
"github.com/mudler/xlog"
|
||
)
|
||
|
||
// backendStopRequest is the request payload for backend.stop (fire-and-forget).
|
||
type backendStopRequest struct {
|
||
Backend string `json:"backend"`
|
||
}
|
||
|
||
// NodeCommandSender abstracts NATS-based commands to worker nodes.
|
||
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
|
||
//
|
||
// InstallBackend is idempotent: the worker short-circuits if the backend is
|
||
// already running for the requested (modelID, replica) slot. Routine model
|
||
// loads and admin installs both call this.
|
||
//
|
||
// UpgradeBackend is the destructive force-reinstall path: the worker stops
|
||
// every live process for the backend, re-pulls the gallery artifact, and
|
||
// replies. Caller (DistributedBackendManager.UpgradeBackend) handles
|
||
// rolling-update fallback to the legacy install Force=true path on
|
||
// nats.ErrNoResponders for old workers that don't subscribe to the new
|
||
// backend.upgrade subject.
|
||
type NodeCommandSender interface {
|
||
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
|
||
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
|
||
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
|
||
ListBackends(nodeID string) (*messaging.BackendListReply, error)
|
||
StopBackend(nodeID, backend string) error
|
||
UnloadModelOnNode(nodeID, modelName string) error
|
||
}
|
||
|
||
// RemoteUnloaderAdapter implements NodeCommandSender and model.RemoteModelUnloader
|
||
// by publishing NATS events for backend process lifecycle. The worker process
|
||
// subscribes and handles the actual process start/stop.
|
||
//
|
||
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
|
||
// over NATS for remote nodes.
|
||
type RemoteUnloaderAdapter struct {
|
||
registry ModelLocator
|
||
nats messaging.MessagingClient
|
||
}
|
||
|
||
// NewRemoteUnloaderAdapter creates a new adapter.
|
||
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
|
||
return &RemoteUnloaderAdapter{
|
||
registry: registry,
|
||
nats: nats,
|
||
}
|
||
}
|
||
|
||
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
|
||
// to stop their backend process via NATS backend.stop event.
|
||
// The worker process handles: Free() → kill process.
|
||
// This is called by ModelLoader.deleteProcess() when process == nil (remote model).
|
||
func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
|
||
ctx := context.Background()
|
||
nodes, err := a.registry.FindNodesWithModel(ctx, modelName)
|
||
if err != nil || len(nodes) == 0 {
|
||
xlog.Debug("No remote nodes found with model", "model", modelName)
|
||
return nil
|
||
}
|
||
|
||
for _, node := range nodes {
|
||
xlog.Info("Sending NATS backend.stop to node", "model", modelName, "node", node.Name, "nodeID", node.ID)
|
||
if err := a.StopBackend(node.ID, modelName); err != nil {
|
||
xlog.Warn("Failed to send backend.stop", "node", node.Name, "error", err)
|
||
continue
|
||
}
|
||
// Remove every replica of this model on the node — the worker will
|
||
// handle the actual process cleanup.
|
||
a.registry.RemoveAllNodeModelReplicas(ctx, node.ID, modelName)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// InstallBackend sends a backend.install request-reply to a worker node.
|
||
// Idempotent on the worker: if the (modelID, replica) process is already
|
||
// running, the worker short-circuits and returns its address; if the binary
|
||
// is on disk, the worker just spawns a process; only a missing binary
|
||
// triggers a full gallery pull.
|
||
//
|
||
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
|
||
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
|
||
// case while still failing fast enough to surface real worker hangs.
|
||
//
|
||
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead —
|
||
// it lives on a different NATS subject so it cannot head-of-line-block
|
||
// routine load traffic on the same worker.
|
||
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
|
||
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
|
||
|
||
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||
Backend: backendType,
|
||
ModelID: modelID,
|
||
BackendGalleries: galleriesJSON,
|
||
URI: uri,
|
||
Name: name,
|
||
Alias: alias,
|
||
ReplicaIndex: int32(replicaIndex),
|
||
}, 3*time.Minute)
|
||
}
|
||
|
||
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
|
||
// The worker stops every live process for this backend, force-reinstalls
|
||
// from the gallery (overwriting the on-disk artifact), and replies. The
|
||
// next routine InstallBackend call spawns a fresh process with the new
|
||
// binary — upgrade itself does not start a process.
|
||
//
|
||
// Timeout: 15 minutes. Real-world worst case observed: 8–10 minutes for
|
||
// large CUDA-l4t backend images on Jetson over WiFi.
|
||
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
|
||
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
|
||
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
|
||
|
||
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
|
||
Backend: backendType,
|
||
BackendGalleries: galleriesJSON,
|
||
URI: uri,
|
||
Name: name,
|
||
Alias: alias,
|
||
ReplicaIndex: int32(replicaIndex),
|
||
}, 15*time.Minute)
|
||
}
|
||
|
||
// installWithForceFallback is the rolling-update fallback used by
|
||
// DistributedBackendManager.UpgradeBackend when backend.upgrade returns
|
||
// nats.ErrNoResponders (the worker is on a pre-2026-05-08 build that
|
||
// doesn't subscribe to the new subject). It re-fires the legacy
|
||
// backend.install with Force=true. Drop this once every worker is on
|
||
// 2026-05-08 or newer.
|
||
func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
|
||
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
|
||
|
||
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||
Backend: backendType,
|
||
BackendGalleries: galleriesJSON,
|
||
URI: uri,
|
||
Name: name,
|
||
Alias: alias,
|
||
ReplicaIndex: int32(replicaIndex),
|
||
Force: true,
|
||
}, 15*time.Minute)
|
||
}
|
||
|
||
// ListBackends queries a worker node for its installed backends via NATS request-reply.
|
||
func (a *RemoteUnloaderAdapter) ListBackends(nodeID string) (*messaging.BackendListReply, error) {
|
||
subject := messaging.SubjectNodeBackendList(nodeID)
|
||
xlog.Debug("Sending NATS backend.list", "nodeID", nodeID)
|
||
|
||
return messaging.RequestJSON[messaging.BackendListRequest, messaging.BackendListReply](a.nats, subject, messaging.BackendListRequest{}, 30*time.Second)
|
||
}
|
||
|
||
// StopBackend tells a worker node to stop a specific gRPC backend process.
|
||
// If backend is empty, the worker stops ALL backends.
|
||
// The node stays registered and can receive another InstallBackend later.
|
||
func (a *RemoteUnloaderAdapter) StopBackend(nodeID, backend string) error {
|
||
subject := messaging.SubjectNodeBackendStop(nodeID)
|
||
if backend == "" {
|
||
return a.nats.Publish(subject, nil)
|
||
}
|
||
req := struct {
|
||
Backend string `json:"backend"`
|
||
}{Backend: backend}
|
||
return a.nats.Publish(subject, req)
|
||
}
|
||
|
||
// DeleteBackend tells a worker node to delete a backend (stop + remove files).
|
||
func (a *RemoteUnloaderAdapter) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) {
|
||
subject := messaging.SubjectNodeBackendDelete(nodeID)
|
||
xlog.Info("Sending NATS backend.delete", "nodeID", nodeID, "backend", backendName)
|
||
|
||
return messaging.RequestJSON[messaging.BackendDeleteRequest, messaging.BackendDeleteReply](a.nats, subject, messaging.BackendDeleteRequest{Backend: backendName}, 2*time.Minute)
|
||
}
|
||
|
||
// UnloadModelOnNode sends a model.unload request to a specific node.
|
||
// The worker calls gRPC Free() to release GPU memory.
|
||
func (a *RemoteUnloaderAdapter) UnloadModelOnNode(nodeID, modelName string) error {
|
||
subject := messaging.SubjectNodeModelUnload(nodeID)
|
||
xlog.Info("Sending NATS model.unload", "nodeID", nodeID, "model", modelName)
|
||
|
||
reply, err := messaging.RequestJSON[messaging.ModelUnloadRequest, messaging.ModelUnloadReply](a.nats, subject, messaging.ModelUnloadRequest{ModelName: modelName}, 30*time.Second)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !reply.Success {
|
||
return fmt.Errorf("model.unload on node %s: %s", nodeID, reply.Error)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// DeleteModelFiles sends model.delete to all nodes that have the model cached.
|
||
// This removes model files from worker disks.
|
||
func (a *RemoteUnloaderAdapter) DeleteModelFiles(modelName string) error {
|
||
nodes, err := a.registry.FindNodesWithModel(context.Background(), modelName)
|
||
if err != nil || len(nodes) == 0 {
|
||
xlog.Debug("No nodes with model for file deletion", "model", modelName)
|
||
return nil
|
||
}
|
||
|
||
for _, node := range nodes {
|
||
subject := messaging.SubjectNodeModelDelete(node.ID)
|
||
xlog.Info("Sending NATS model.delete", "nodeID", node.ID, "model", modelName)
|
||
|
||
reply, err := messaging.RequestJSON[messaging.ModelDeleteRequest, messaging.ModelDeleteReply](a.nats, subject, messaging.ModelDeleteRequest{ModelName: modelName}, 30*time.Second)
|
||
if err != nil {
|
||
xlog.Warn("model.delete failed on node", "node", node.Name, "error", err)
|
||
continue
|
||
}
|
||
if !reply.Success {
|
||
xlog.Warn("model.delete failed on node", "node", node.Name, "error", reply.Error)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// StopNode tells a worker node to shut down entirely (deregister + exit).
|
||
func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
|
||
subject := messaging.SubjectNodeStop(nodeID)
|
||
return a.nats.Publish(subject, nil)
|
||
}
|