mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 04:56:52 -04:00
* feat(messaging): add backend.upgrade NATS subject + payload types
Splits the slow force-reinstall path off backend.install so it can run on
its own subscription goroutine, eliminating head-of-line blocking between
routine model loads and full gallery upgrades.
Wire-level Force flag on BackendInstallRequest is kept for one release as
the rolling-update fallback target; doc note marks it deprecated.
Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): add per-backend mutex helper to backendSupervisor
Different backend names lock independently; same backend serializes. This
is the synchronization primitive used by the upcoming concurrent install
handler — without it, wrapping the NATS callback in a goroutine would
race the gallery directory when two requests target the same backend.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed/worker): run backend.install handler in a goroutine
NATS subscriptions deliver messages serially on a single per-subscription
goroutine. With a synchronous install handler, a multi-minute gallery
download would head-of-line-block every other install request to the
same worker — manifesting upstream as a 5-minute "nats: timeout" on
unrelated routine model loads.
The body now runs in its own goroutine, with a per-backend mutex
(lockBackend) protecting the gallery directory from concurrent operations
on the same backend. Different backend names install in parallel.
Backward-compat: req.Force=true is still honored here, so an older master
that hasn't been updated to send on backend.upgrade keeps working.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): subscribe to backend.upgrade as a separate path
Slow force-reinstall now lives on its own NATS subscription, so a
multi-minute gallery pull cannot head-of-line-block the routine
backend.install handler on the same worker. Same per-backend mutex
guards both — concurrent install + upgrade for the same backend
serialize at the gallery directory; different backends are independent.
upgradeBackend stops every live process for the backend, force-installs
from gallery, and re-registers. It does not start a new process — the
next backend.install will spawn one with the freshly-pulled binary.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend
Master now sends to backend.upgrade for force-reinstall, with a
nats.ErrNoResponders fallback to the legacy backend.install Force=true
path so a rolling update with a new master + an old worker still
converges. The Force parameter leaves the public Go API surface
entirely — only the internal fallback sets it on the wire.
InstallBackend timeout drops 5min -> 3min (most replies are sub-second
since the worker short-circuits on already-running or already-installed).
UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi
gallery pulls.
Updates the admin install HTTP endpoint
(core/http/endpoints/localai/nodes.go) to the new signature too.
router_test.go's fakeUnloader does not yet implement the new interface
shape; Task 3.2 will catch it up before the next package-level test run.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): update fakeUnloader for new NodeCommandSender shape
InstallBackend lost its force bool param (Force is not part of the public
Go API anymore — only the internal upgrade-fallback path sets it on the
wire). UpgradeBackend gained a method. Fake records both call slices and
provides an installHook concurrency seam for upcoming singleflight tests.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): cover UpgradeBackend's new subject + rolling-update fallback
Task 3.1 changed the master to publish UpgradeBackend on the new
backend.upgrade subject; the existing UpgradeBackend tests scripted the
old install subject and so all 3 began failing as expected. Updates them
to script SubjectNodeBackendUpgrade with BackendUpgradeReply.
Adds two new specs for the rolling-update fallback:
- ErrNoResponders on backend.upgrade triggers a backend.install
Force=true retry on the same node.
- Non-NoResponders errors propagate to the caller unchanged.
scriptedMessagingClient gains scriptNoResponders (real nats sentinel) and
scriptReplyMatching (predicate-matched canned reply, used to assert that
the fallback path actually sets Force=true on the install retry).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed): coalesce concurrent identical backend.install via singleflight
Six simultaneous chat completions for the same not-yet-loaded model were
observed firing six independent NATS install requests, each serializing
through the worker's per-subscription goroutine and amplifying queue
depth. SmartRouter now wraps the NATS round-trip in a singleflight.Group
keyed by (nodeID, backend, modelID, replica): N concurrent identical
loads share one round-trip and one reply.
Distinct (modelID, replica) keys still fire independent calls, so
multi-replica scaling and multi-model fan-out are unaffected.
fakeUnloader gains a sync.Mutex around its recording slices to keep
concurrent test goroutines race-clean.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(e2e/distributed): drop force arg from InstallBackend test calls
Two e2e test call sites still passed the trailing force bool that was
removed from RemoteUnloaderAdapter.InstallBackend in 9bde76d7. Caught
by golangci-lint typecheck on the upgrade-split branch (master CI was
already green because these tests don't run in the standard test path).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed): extract worker business logic to core/services/worker
core/cli/worker.go grew to 1212 lines after the backend.upgrade split.
The CLI package was carrying backendSupervisor, NATS lifecycle handlers,
gallery install/upgrade orchestration, S3 file staging, and registration
helpers — all distributed-worker business logic that doesn't belong in
the cobra surface.
Move it to a new core/services/worker package, mirroring the existing
core/services/{nodes,messaging,galleryop} pattern. core/cli/worker.go
shrinks to ~19 lines: a kong-tagged shim that embeds worker.Config and
delegates Run.
No behavior change. All symbols stay unexported except Config and Run.
The three worker-specific tests (addr/replica/concurrency) move with
the code via git mv so history follows them.
Files split as:
worker.go - Run entry point
config.go - Config struct (kong tags retained, kong not imported)
supervisor.go - backendProcess, backendSupervisor, process lifecycle
install.go - installBackend, upgradeBackend, findBackend, lockBackend
lifecycle.go - subscribeLifecycleEvents (verbatim, decomposition is
a follow-up commit)
file_staging.go - subscribeFileStaging, isPathAllowed
registration.go - advertiseAddr, registrationBody, heartbeatBody, etc.
reply.go - replyJSON
process_helpers.go - readLastLinesFromFile
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed/worker): decompose subscribeLifecycleEvents into per-event handlers
The 226-line subscribeLifecycleEvents method packed eight NATS subscriptions
inline. Each grew context-shaped doc comments mixed with subscription
plumbing, making it hard to read any one handler without scrolling past the
others. Extract each handler into its own method on *backendSupervisor; the
subscriber becomes a thin 8-line dispatcher.
No behavior change: each method body is byte-equivalent to its corresponding
inline goroutine + handler. Doc comments that were attached to the inline
SubscribeReply calls migrate to the new method godocs.
Adding the next NATS subject is now a 2-line patch to the dispatcher plus
one new method, instead of grafting onto a monolith.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
---------
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
226 lines
8.9 KiB
Go
226 lines
8.9 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/services/galleryop"
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// buildProcessKey is the supervisor's stable identifier for a backend gRPC
|
|
// process. It includes the replica index so the same model can run multiple
|
|
// processes on a worker simultaneously without colliding on the same map slot
|
|
// or port. The "#N" suffix is purely internal — the controller never reads it.
|
|
func buildProcessKey(modelID, backend string, replicaIndex int) string {
|
|
base := modelID
|
|
if base == "" {
|
|
base = backend
|
|
}
|
|
return fmt.Sprintf("%s#%d", base, replicaIndex)
|
|
}
|
|
|
|
// installBackend handles the backend.install flow. force=true is the
|
|
// upgrade path; force=false is the routine load path.
|
|
//
|
|
// The caller is responsible for holding s.lockBackend(req.Backend) for
|
|
// the duration of the call so the gallery directory isn't raced.
|
|
//
|
|
// 1. If already running for this (model, replica) slot AND force is false,
|
|
// return existing address (the fast path used by routine load events that
|
|
// just want to know which port a backend already serves on).
|
|
// 2. If force is true, stop any process(es) currently using this backend
|
|
// so the gallery install can replace the on-disk artifact and the freshly
|
|
// started process picks up the new binary. This is the upgrade path —
|
|
// without it, every backend.install we receive after the first hits the
|
|
// fast path and silently no-ops, leaving the cluster on a stale build.
|
|
// 3. Install backend from gallery (force passed through so existing artifacts
|
|
// get overwritten on upgrade).
|
|
// 4. Find backend binary
|
|
// 5. Start gRPC process on a new port
|
|
//
|
|
// Returns the gRPC address of the backend process.
|
|
//
|
|
// ProcessKey includes the replica index so a worker with MaxReplicasPerModel>1
|
|
// can host multiple processes for the same model on distinct ports. Old
|
|
// controllers (no replica_index in the request) implicitly target replica 0,
|
|
// which preserves single-replica behavior.
|
|
func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest, force bool) (string, error) {
|
|
processKey := buildProcessKey(req.ModelID, req.Backend, int(req.ReplicaIndex))
|
|
|
|
if !force {
|
|
// Fast path: already running for this model+replica → return existing
|
|
// address. Verify liveness before trusting the cached entry: a process
|
|
// that died without the supervisor noticing leaves a stale (key, addr)
|
|
// pair, and getAddr would otherwise hand the controller an address
|
|
// that immediately ECONNREFUSEDs. The reconciler then marks the
|
|
// replica failed, retries the install, the supervisor says "already
|
|
// running" again, and the cluster loops on a dead replica forever.
|
|
if addr := s.getAddr(processKey); addr != "" {
|
|
if s.isRunning(processKey) {
|
|
xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
|
|
return addr, nil
|
|
}
|
|
xlog.Warn("Stale process entry for backend (dead process); cleaning up before reinstall",
|
|
"backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
|
|
s.stopBackendExact(processKey)
|
|
}
|
|
} else {
|
|
// Upgrade path: stop every live process that shares this backend so the
|
|
// gallery install can overwrite the on-disk artifact and the restarted
|
|
// process picks up the new binary. resolveProcessKeys catches peer
|
|
// replicas of the same backend (whisper#0, whisper#1, ...) on workers
|
|
// configured with MaxReplicasPerModel>1. We also stop the exact
|
|
// processKey from the request tuple — keys created with an explicit
|
|
// modelID don't share the bare-name prefix the resolver matches, but
|
|
// they're still using the old binary and need to come down. Both calls
|
|
// are no-ops on missing keys.
|
|
toStop := s.resolveProcessKeys(req.Backend)
|
|
toStop = append(toStop, processKey)
|
|
for _, key := range toStop {
|
|
xlog.Info("Force install: stopping running backend before reinstall",
|
|
"backend", req.Backend, "processKey", key)
|
|
s.stopBackendExact(key)
|
|
}
|
|
}
|
|
|
|
// Parse galleries from request (override local config if provided)
|
|
galleries := s.galleries
|
|
if req.BackendGalleries != "" {
|
|
var reqGalleries []config.Gallery
|
|
if err := json.Unmarshal([]byte(req.BackendGalleries), &reqGalleries); err == nil {
|
|
galleries = reqGalleries
|
|
}
|
|
}
|
|
|
|
// On upgrade, run the gallery install path even if the binary already
|
|
// exists on disk: findBackend would otherwise short-circuit and we'd
|
|
// restart the same stale binary. The force flag passed to
|
|
// InstallBackendFromGallery makes it overwrite the existing artifact.
|
|
backendPath := ""
|
|
if !force {
|
|
backendPath = s.findBackend(req.Backend)
|
|
}
|
|
if backendPath == "" {
|
|
if req.URI != "" {
|
|
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", force)
|
|
if err := galleryop.InstallExternalBackend(
|
|
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias,
|
|
); err != nil {
|
|
return "", fmt.Errorf("installing backend from gallery: %w", err)
|
|
}
|
|
} else {
|
|
xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", force)
|
|
if err := gallery.InstallBackendFromGallery(
|
|
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, force,
|
|
); err != nil {
|
|
return "", fmt.Errorf("installing backend from gallery: %w", err)
|
|
}
|
|
}
|
|
// Re-register after install and retry
|
|
gallery.RegisterBackends(s.systemState, s.ml)
|
|
backendPath = s.findBackend(req.Backend)
|
|
}
|
|
|
|
if backendPath == "" {
|
|
return "", fmt.Errorf("backend %q not found after install attempt", req.Backend)
|
|
}
|
|
|
|
xlog.Info("Found backend binary", "path", backendPath, "processKey", processKey)
|
|
|
|
// Start the gRPC process on a new port (keyed by model, not just backend)
|
|
return s.startBackend(processKey, backendPath)
|
|
}
|
|
|
|
// upgradeBackend stops every running process for `backend`, force-reinstalls
|
|
// from gallery (overwriting the on-disk artifact), and re-registers backends.
|
|
// It does NOT start any new gRPC process — the next routine model load via
|
|
// backend.install will spawn a fresh process picking up the new binary.
|
|
//
|
|
// The caller is responsible for holding s.lockBackend(req.Backend).
|
|
func (s *backendSupervisor) upgradeBackend(req messaging.BackendUpgradeRequest) error {
|
|
// Stop every live process for this backend (peer replicas + the bare
|
|
// processKey). Same logic as the force branch in installBackend.
|
|
toStop := s.resolveProcessKeys(req.Backend)
|
|
toStop = append(toStop, buildProcessKey("", req.Backend, int(req.ReplicaIndex)))
|
|
for _, key := range toStop {
|
|
xlog.Info("Upgrade: stopping running backend before reinstall",
|
|
"backend", req.Backend, "processKey", key)
|
|
s.stopBackendExact(key)
|
|
}
|
|
|
|
galleries := s.galleries
|
|
if req.BackendGalleries != "" {
|
|
var reqGalleries []config.Gallery
|
|
if err := json.Unmarshal([]byte(req.BackendGalleries), &reqGalleries); err == nil {
|
|
galleries = reqGalleries
|
|
}
|
|
}
|
|
|
|
if req.URI != "" {
|
|
xlog.Info("Upgrading backend from external URI", "backend", req.Backend, "uri", req.URI)
|
|
if err := galleryop.InstallExternalBackend(
|
|
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias,
|
|
); err != nil {
|
|
return fmt.Errorf("upgrading backend from external URI: %w", err)
|
|
}
|
|
} else {
|
|
xlog.Info("Upgrading backend from gallery", "backend", req.Backend)
|
|
if err := gallery.InstallBackendFromGallery(
|
|
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, true, /* force */
|
|
); err != nil {
|
|
return fmt.Errorf("upgrading backend from gallery: %w", err)
|
|
}
|
|
}
|
|
|
|
gallery.RegisterBackends(s.systemState, s.ml)
|
|
return nil
|
|
}
|
|
|
|
// findBackend looks for the backend binary in the backends path and system path.
|
|
func (s *backendSupervisor) findBackend(backend string) string {
|
|
candidates := []string{
|
|
filepath.Join(s.cfg.BackendsPath, backend),
|
|
filepath.Join(s.cfg.BackendsPath, backend, backend),
|
|
filepath.Join(s.cfg.BackendsSystemPath, backend),
|
|
filepath.Join(s.cfg.BackendsSystemPath, backend, backend),
|
|
}
|
|
if uri := s.ml.GetExternalBackend(backend); uri != "" {
|
|
if fi, err := os.Stat(uri); err == nil && !fi.IsDir() {
|
|
return uri
|
|
}
|
|
}
|
|
for _, path := range candidates {
|
|
fi, err := os.Stat(path)
|
|
if err == nil && !fi.IsDir() {
|
|
return path
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// lockBackend returns a release function for a per-backend mutex. Different
|
|
// backend names lock independently. The first caller for a name allocates
|
|
// the mutex under s.mu; subsequent callers for the same name reuse it.
|
|
func (s *backendSupervisor) lockBackend(name string) func() {
|
|
s.mu.Lock()
|
|
if s.backendLocks == nil {
|
|
s.backendLocks = make(map[string]*sync.Mutex)
|
|
}
|
|
m, ok := s.backendLocks[name]
|
|
if !ok {
|
|
m = &sync.Mutex{}
|
|
s.backendLocks[name] = m
|
|
}
|
|
s.mu.Unlock()
|
|
m.Lock()
|
|
return m.Unlock
|
|
}
|