Files
LocalAI/core/services/worker/install.go
LocalAI [bot] e5d7b84216 fix(distributed): split NATS backend.upgrade off install + dedup loads (#9717)
* feat(messaging): add backend.upgrade NATS subject + payload types

Splits the slow force-reinstall path off backend.install so it can run on
its own subscription goroutine, eliminating head-of-line blocking between
routine model loads and full gallery upgrades.

Wire-level Force flag on BackendInstallRequest is kept for one release as
the rolling-update fallback target; doc note marks it deprecated.

Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed/worker): add per-backend mutex helper to backendSupervisor

Different backend names lock independently; same backend serializes. This
is the synchronization primitive used by the upcoming concurrent install
handler — without it, wrapping the NATS callback in a goroutine would
race the gallery directory when two requests target the same backend.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(distributed/worker): run backend.install handler in a goroutine

NATS subscriptions deliver messages serially on a single per-subscription
goroutine. With a synchronous install handler, a multi-minute gallery
download would head-of-line-block every other install request to the
same worker — manifesting upstream as a 5-minute "nats: timeout" on
unrelated routine model loads.

The body now runs in its own goroutine, with a per-backend mutex
(lockBackend) protecting the gallery directory from concurrent operations
on the same backend. Different backend names install in parallel.

Backward-compat: req.Force=true is still honored here, so an older master
that hasn't been updated to send on backend.upgrade keeps working.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed/worker): subscribe to backend.upgrade as a separate path

Slow force-reinstall now lives on its own NATS subscription, so a
multi-minute gallery pull cannot head-of-line-block the routine
backend.install handler on the same worker. Same per-backend mutex
guards both — concurrent install + upgrade for the same backend
serialize at the gallery directory; different backends are independent.

upgradeBackend stops every live process for the backend, force-installs
from gallery, and re-registers. It does not start a new process — the
next backend.install will spawn one with the freshly-pulled binary.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend

Master now sends to backend.upgrade for force-reinstall, with a
nats.ErrNoResponders fallback to the legacy backend.install Force=true
path so a rolling update with a new master + an old worker still
converges. The Force parameter leaves the public Go API surface
entirely — only the internal fallback sets it on the wire.

InstallBackend timeout drops 5min -> 3min (most replies are sub-second
since the worker short-circuits on already-running or already-installed).
UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi
gallery pulls.

Updates the admin install HTTP endpoint
(core/http/endpoints/localai/nodes.go) to the new signature too.

router_test.go's fakeUnloader does not yet implement the new interface
shape; Task 3.2 will catch it up before the next package-level test run.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): update fakeUnloader for new NodeCommandSender shape

InstallBackend lost its force bool param (Force is not part of the public
Go API anymore — only the internal upgrade-fallback path sets it on the
wire). UpgradeBackend gained a method. Fake records both call slices and
provides an installHook concurrency seam for upcoming singleflight tests.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): cover UpgradeBackend's new subject + rolling-update fallback

Task 3.1 changed the master to publish UpgradeBackend on the new
backend.upgrade subject; the existing UpgradeBackend tests scripted the
old install subject and so all 3 began failing as expected. Updates them
to script SubjectNodeBackendUpgrade with BackendUpgradeReply.

Adds two new specs for the rolling-update fallback:
  - ErrNoResponders on backend.upgrade triggers a backend.install
    Force=true retry on the same node.
  - Non-NoResponders errors propagate to the caller unchanged.

scriptedMessagingClient gains scriptNoResponders (real nats sentinel) and
scriptReplyMatching (predicate-matched canned reply, used to assert that
the fallback path actually sets Force=true on the install retry).

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(distributed): coalesce concurrent identical backend.install via singleflight

Six simultaneous chat completions for the same not-yet-loaded model were
observed firing six independent NATS install requests, each serializing
through the worker's per-subscription goroutine and amplifying queue
depth. SmartRouter now wraps the NATS round-trip in a singleflight.Group
keyed by (nodeID, backend, modelID, replica): N concurrent identical
loads share one round-trip and one reply.

Distinct (modelID, replica) keys still fire independent calls, so
multi-replica scaling and multi-model fan-out are unaffected.

fakeUnloader gains a sync.Mutex around its recording slices to keep
concurrent test goroutines race-clean.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(e2e/distributed): drop force arg from InstallBackend test calls

Two e2e test call sites still passed the trailing force bool that was
removed from RemoteUnloaderAdapter.InstallBackend in 9bde76d7. Caught
by golangci-lint typecheck on the upgrade-split branch (master CI was
already green because these tests don't run in the standard test path).

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(distributed): extract worker business logic to core/services/worker

core/cli/worker.go grew to 1212 lines after the backend.upgrade split.
The CLI package was carrying backendSupervisor, NATS lifecycle handlers,
gallery install/upgrade orchestration, S3 file staging, and registration
helpers — all distributed-worker business logic that doesn't belong in
the cobra surface.

Move it to a new core/services/worker package, mirroring the existing
core/services/{nodes,messaging,galleryop} pattern. core/cli/worker.go
shrinks to ~19 lines: a kong-tagged shim that embeds worker.Config and
delegates Run.

No behavior change. All symbols stay unexported except Config and Run.
The three worker-specific tests (addr/replica/concurrency) move with
the code via git mv so history follows them.

Files split as:
  worker.go        - Run entry point
  config.go        - Config struct (kong tags retained, kong not imported)
  supervisor.go    - backendProcess, backendSupervisor, process lifecycle
  install.go       - installBackend, upgradeBackend, findBackend, lockBackend
  lifecycle.go     - subscribeLifecycleEvents (verbatim, decomposition is
                     a follow-up commit)
  file_staging.go  - subscribeFileStaging, isPathAllowed
  registration.go  - advertiseAddr, registrationBody, heartbeatBody, etc.
  reply.go         - replyJSON
  process_helpers.go - readLastLinesFromFile

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(distributed/worker): decompose subscribeLifecycleEvents into per-event handlers

The 226-line subscribeLifecycleEvents method packed eight NATS subscriptions
inline. Each grew context-shaped doc comments mixed with subscription
plumbing, making it hard to read any one handler without scrolling past the
others. Extract each handler into its own method on *backendSupervisor; the
subscriber becomes a thin 8-line dispatcher.

No behavior change: each method body is byte-equivalent to its corresponding
inline goroutine + handler. Doc comments that were attached to the inline
SubscribeReply calls migrate to the new method godocs.

Adding the next NATS subject is now a 2-line patch to the dispatcher plus
one new method, instead of grafting onto a monolith.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-08 16:24:54 +02:00

226 lines
8.9 KiB
Go

package worker
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/xlog"
)
// buildProcessKey is the supervisor's stable identifier for a backend gRPC
// process. It includes the replica index so the same model can run multiple
// processes on a worker simultaneously without colliding on the same map slot
// or port. The "#N" suffix is purely internal — the controller never reads it.
func buildProcessKey(modelID, backend string, replicaIndex int) string {
base := modelID
if base == "" {
base = backend
}
return fmt.Sprintf("%s#%d", base, replicaIndex)
}
// installBackend handles the backend.install flow. force=true is the
// upgrade path; force=false is the routine load path.
//
// The caller is responsible for holding s.lockBackend(req.Backend) for
// the duration of the call so the gallery directory isn't raced.
//
// 1. If already running for this (model, replica) slot AND force is false,
// return existing address (the fast path used by routine load events that
// just want to know which port a backend already serves on).
// 2. If force is true, stop any process(es) currently using this backend
// so the gallery install can replace the on-disk artifact and the freshly
// started process picks up the new binary. This is the upgrade path —
// without it, every backend.install we receive after the first hits the
// fast path and silently no-ops, leaving the cluster on a stale build.
// 3. Install backend from gallery (force passed through so existing artifacts
// get overwritten on upgrade).
// 4. Find backend binary
// 5. Start gRPC process on a new port
//
// Returns the gRPC address of the backend process.
//
// ProcessKey includes the replica index so a worker with MaxReplicasPerModel>1
// can host multiple processes for the same model on distinct ports. Old
// controllers (no replica_index in the request) implicitly target replica 0,
// which preserves single-replica behavior.
func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest, force bool) (string, error) {
processKey := buildProcessKey(req.ModelID, req.Backend, int(req.ReplicaIndex))
if !force {
// Fast path: already running for this model+replica → return existing
// address. Verify liveness before trusting the cached entry: a process
// that died without the supervisor noticing leaves a stale (key, addr)
// pair, and getAddr would otherwise hand the controller an address
// that immediately ECONNREFUSEDs. The reconciler then marks the
// replica failed, retries the install, the supervisor says "already
// running" again, and the cluster loops on a dead replica forever.
if addr := s.getAddr(processKey); addr != "" {
if s.isRunning(processKey) {
xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
return addr, nil
}
xlog.Warn("Stale process entry for backend (dead process); cleaning up before reinstall",
"backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
s.stopBackendExact(processKey)
}
} else {
// Upgrade path: stop every live process that shares this backend so the
// gallery install can overwrite the on-disk artifact and the restarted
// process picks up the new binary. resolveProcessKeys catches peer
// replicas of the same backend (whisper#0, whisper#1, ...) on workers
// configured with MaxReplicasPerModel>1. We also stop the exact
// processKey from the request tuple — keys created with an explicit
// modelID don't share the bare-name prefix the resolver matches, but
// they're still using the old binary and need to come down. Both calls
// are no-ops on missing keys.
toStop := s.resolveProcessKeys(req.Backend)
toStop = append(toStop, processKey)
for _, key := range toStop {
xlog.Info("Force install: stopping running backend before reinstall",
"backend", req.Backend, "processKey", key)
s.stopBackendExact(key)
}
}
// Parse galleries from request (override local config if provided)
galleries := s.galleries
if req.BackendGalleries != "" {
var reqGalleries []config.Gallery
if err := json.Unmarshal([]byte(req.BackendGalleries), &reqGalleries); err == nil {
galleries = reqGalleries
}
}
// On upgrade, run the gallery install path even if the binary already
// exists on disk: findBackend would otherwise short-circuit and we'd
// restart the same stale binary. The force flag passed to
// InstallBackendFromGallery makes it overwrite the existing artifact.
backendPath := ""
if !force {
backendPath = s.findBackend(req.Backend)
}
if backendPath == "" {
if req.URI != "" {
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", force)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
} else {
xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", force)
if err := gallery.InstallBackendFromGallery(
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, force,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
}
// Re-register after install and retry
gallery.RegisterBackends(s.systemState, s.ml)
backendPath = s.findBackend(req.Backend)
}
if backendPath == "" {
return "", fmt.Errorf("backend %q not found after install attempt", req.Backend)
}
xlog.Info("Found backend binary", "path", backendPath, "processKey", processKey)
// Start the gRPC process on a new port (keyed by model, not just backend)
return s.startBackend(processKey, backendPath)
}
// upgradeBackend stops every running process for `backend`, force-reinstalls
// from gallery (overwriting the on-disk artifact), and re-registers backends.
// It does NOT start any new gRPC process — the next routine model load via
// backend.install will spawn a fresh process picking up the new binary.
//
// The caller is responsible for holding s.lockBackend(req.Backend).
func (s *backendSupervisor) upgradeBackend(req messaging.BackendUpgradeRequest) error {
// Stop every live process for this backend (peer replicas + the bare
// processKey). Same logic as the force branch in installBackend.
toStop := s.resolveProcessKeys(req.Backend)
toStop = append(toStop, buildProcessKey("", req.Backend, int(req.ReplicaIndex)))
for _, key := range toStop {
xlog.Info("Upgrade: stopping running backend before reinstall",
"backend", req.Backend, "processKey", key)
s.stopBackendExact(key)
}
galleries := s.galleries
if req.BackendGalleries != "" {
var reqGalleries []config.Gallery
if err := json.Unmarshal([]byte(req.BackendGalleries), &reqGalleries); err == nil {
galleries = reqGalleries
}
}
if req.URI != "" {
xlog.Info("Upgrading backend from external URI", "backend", req.Backend, "uri", req.URI)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias,
); err != nil {
return fmt.Errorf("upgrading backend from external URI: %w", err)
}
} else {
xlog.Info("Upgrading backend from gallery", "backend", req.Backend)
if err := gallery.InstallBackendFromGallery(
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, true, /* force */
); err != nil {
return fmt.Errorf("upgrading backend from gallery: %w", err)
}
}
gallery.RegisterBackends(s.systemState, s.ml)
return nil
}
// findBackend looks for the backend binary in the backends path and system path.
func (s *backendSupervisor) findBackend(backend string) string {
candidates := []string{
filepath.Join(s.cfg.BackendsPath, backend),
filepath.Join(s.cfg.BackendsPath, backend, backend),
filepath.Join(s.cfg.BackendsSystemPath, backend),
filepath.Join(s.cfg.BackendsSystemPath, backend, backend),
}
if uri := s.ml.GetExternalBackend(backend); uri != "" {
if fi, err := os.Stat(uri); err == nil && !fi.IsDir() {
return uri
}
}
for _, path := range candidates {
fi, err := os.Stat(path)
if err == nil && !fi.IsDir() {
return path
}
}
return ""
}
// lockBackend returns a release function for a per-backend mutex. Different
// backend names lock independently. The first caller for a name allocates
// the mutex under s.mu; subsequent callers for the same name reuse it.
func (s *backendSupervisor) lockBackend(name string) func() {
s.mu.Lock()
if s.backendLocks == nil {
s.backendLocks = make(map[string]*sync.Mutex)
}
m, ok := s.backendLocks[name]
if !ok {
m = &sync.Mutex{}
s.backendLocks[name] = m
}
s.mu.Unlock()
m.Lock()
return m.Unlock
}