mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-16 20:52:08 -04:00
* feat(messaging): add backend.upgrade NATS subject + payload types
Splits the slow force-reinstall path off backend.install so it can run on
its own subscription goroutine, eliminating head-of-line blocking between
routine model loads and full gallery upgrades.
Wire-level Force flag on BackendInstallRequest is kept for one release as
the rolling-update fallback target; doc note marks it deprecated.
Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): add per-backend mutex helper to backendSupervisor
Different backend names lock independently; same backend serializes. This
is the synchronization primitive used by the upcoming concurrent install
handler — without it, wrapping the NATS callback in a goroutine would
race the gallery directory when two requests target the same backend.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed/worker): run backend.install handler in a goroutine
NATS subscriptions deliver messages serially on a single per-subscription
goroutine. With a synchronous install handler, a multi-minute gallery
download would head-of-line-block every other install request to the
same worker — manifesting upstream as a 5-minute "nats: timeout" on
unrelated routine model loads.
The body now runs in its own goroutine, with a per-backend mutex
(lockBackend) protecting the gallery directory from concurrent operations
on the same backend. Different backend names install in parallel.
Backward-compat: req.Force=true is still honored here, so an older master
that hasn't been updated to send on backend.upgrade keeps working.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed/worker): subscribe to backend.upgrade as a separate path
Slow force-reinstall now lives on its own NATS subscription, so a
multi-minute gallery pull cannot head-of-line-block the routine
backend.install handler on the same worker. Same per-backend mutex
guards both — concurrent install + upgrade for the same backend
serialize at the gallery directory; different backends are independent.
upgradeBackend stops every live process for the backend, force-installs
from gallery, and re-registers. It does not start a new process — the
next backend.install will spawn one with the freshly-pulled binary.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* feat(distributed): add UpgradeBackend on NodeCommandSender; drop Force from InstallBackend
Master now sends to backend.upgrade for force-reinstall, with a
nats.ErrNoResponders fallback to the legacy backend.install Force=true
path so a rolling update with a new master + an old worker still
converges. The Force parameter leaves the public Go API surface
entirely — only the internal fallback sets it on the wire.
InstallBackend timeout drops 5min -> 3min (most replies are sub-second
since the worker short-circuits on already-running or already-installed).
UpgradeBackend timeout is 15min, sized for real-world Jetson-on-WiFi
gallery pulls.
Updates the admin install HTTP endpoint
(core/http/endpoints/localai/nodes.go) to the new signature too.
router_test.go's fakeUnloader does not yet implement the new interface
shape; Task 3.2 will catch it up before the next package-level test run.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): update fakeUnloader for new NodeCommandSender shape
InstallBackend lost its force bool param (Force is not part of the public
Go API anymore — only the internal upgrade-fallback path sets it on the
wire). UpgradeBackend gained a method. Fake records both call slices and
provides an installHook concurrency seam for upcoming singleflight tests.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(distributed): cover UpgradeBackend's new subject + rolling-update fallback
Task 3.1 changed the master to publish UpgradeBackend on the new
backend.upgrade subject; the existing UpgradeBackend tests scripted the
old install subject and so all 3 began failing as expected. Updates them
to script SubjectNodeBackendUpgrade with BackendUpgradeReply.
Adds two new specs for the rolling-update fallback:
- ErrNoResponders on backend.upgrade triggers a backend.install
Force=true retry on the same node.
- Non-NoResponders errors propagate to the caller unchanged.
scriptedMessagingClient gains scriptNoResponders (real nats sentinel) and
scriptReplyMatching (predicate-matched canned reply, used to assert that
the fallback path actually sets Force=true on the install retry).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* fix(distributed): coalesce concurrent identical backend.install via singleflight
Six simultaneous chat completions for the same not-yet-loaded model were
observed firing six independent NATS install requests, each serializing
through the worker's per-subscription goroutine and amplifying queue
depth. SmartRouter now wraps the NATS round-trip in a singleflight.Group
keyed by (nodeID, backend, modelID, replica): N concurrent identical
loads share one round-trip and one reply.
Distinct (modelID, replica) keys still fire independent calls, so
multi-replica scaling and multi-model fan-out are unaffected.
fakeUnloader gains a sync.Mutex around its recording slices to keep
concurrent test goroutines race-clean.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* test(e2e/distributed): drop force arg from InstallBackend test calls
Two e2e test call sites still passed the trailing force bool that was
removed from RemoteUnloaderAdapter.InstallBackend in 9bde76d7. Caught
by golangci-lint typecheck on the upgrade-split branch (master CI was
already green because these tests don't run in the standard test path).
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed): extract worker business logic to core/services/worker
core/cli/worker.go grew to 1212 lines after the backend.upgrade split.
The CLI package was carrying backendSupervisor, NATS lifecycle handlers,
gallery install/upgrade orchestration, S3 file staging, and registration
helpers — all distributed-worker business logic that doesn't belong in
the cobra surface.
Move it to a new core/services/worker package, mirroring the existing
core/services/{nodes,messaging,galleryop} pattern. core/cli/worker.go
shrinks to ~19 lines: a kong-tagged shim that embeds worker.Config and
delegates Run.
No behavior change. All symbols stay unexported except Config and Run.
The three worker-specific tests (addr/replica/concurrency) move with
the code via git mv so history follows them.
Files split as:
worker.go - Run entry point
config.go - Config struct (kong tags retained, kong not imported)
supervisor.go - backendProcess, backendSupervisor, process lifecycle
install.go - installBackend, upgradeBackend, findBackend, lockBackend
lifecycle.go - subscribeLifecycleEvents (verbatim, decomposition is
a follow-up commit)
file_staging.go - subscribeFileStaging, isPathAllowed
registration.go - advertiseAddr, registrationBody, heartbeatBody, etc.
reply.go - replyJSON
process_helpers.go - readLastLinesFromFile
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(distributed/worker): decompose subscribeLifecycleEvents into per-event handlers
The 226-line subscribeLifecycleEvents method packed eight NATS subscriptions
inline. Each grew context-shaped doc comments mixed with subscription
plumbing, making it hard to read any one handler without scrolling past the
others. Extract each handler into its own method on *backendSupervisor; the
subscriber becomes a thin 8-line dispatcher.
No behavior change: each method body is byte-equivalent to its corresponding
inline goroutine + handler. Doc comments that were attached to the inline
SubscribeReply calls migrate to the new method godocs.
Adding the next NATS subject is now a 2-line patch to the dispatcher plus
one new method, instead of grafting onto a monolith.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
---------
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
189 lines
6.5 KiB
Go
189 lines
6.5 KiB
Go
package distributed_test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync/atomic"
|
|
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/LocalAI/core/services/nodes"
|
|
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
|
|
pgdriver "gorm.io/driver/postgres"
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/logger"
|
|
)
|
|
|
|
var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), func() {
|
|
var (
|
|
infra *TestInfra
|
|
db *gorm.DB
|
|
registry *nodes.NodeRegistry
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
infra = SetupInfra("localai_lifecycle_test")
|
|
|
|
var err error
|
|
db, err = gorm.Open(pgdriver.Open(infra.PGURL), &gorm.Config{
|
|
Logger: logger.Default.LogMode(logger.Silent),
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
registry, err = nodes.NewNodeRegistry(db)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
})
|
|
|
|
Context("NATS backend.install events", func() {
|
|
It("should send backend.install request-reply to a specific node", func() {
|
|
node := &nodes.BackendNode{
|
|
Name: "gpu-node-1", Address: "h1:50051",
|
|
}
|
|
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
|
|
|
// Simulate worker subscribing to backend.install and replying success
|
|
infra.NC.SubscribeReply(messaging.SubjectNodeBackendInstall(node.ID), func(data []byte, reply func([]byte)) {
|
|
var req messaging.BackendInstallRequest
|
|
json.Unmarshal(data, &req)
|
|
Expect(req.Backend).To(Equal("llama-cpp"))
|
|
|
|
resp := messaging.BackendInstallReply{Success: true}
|
|
respData, _ := json.Marshal(resp)
|
|
reply(respData)
|
|
})
|
|
|
|
FlushNATS(infra.NC)
|
|
|
|
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
|
|
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(installReply.Success).To(BeTrue())
|
|
})
|
|
|
|
It("should propagate error from worker on failed install", func() {
|
|
node := &nodes.BackendNode{
|
|
Name: "fail-node", Address: "h1:50051",
|
|
}
|
|
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
|
|
|
// Simulate worker replying with error
|
|
infra.NC.SubscribeReply(messaging.SubjectNodeBackendInstall(node.ID), func(data []byte, reply func([]byte)) {
|
|
resp := messaging.BackendInstallReply{Success: false, Error: "backend not found"}
|
|
respData, _ := json.Marshal(resp)
|
|
reply(respData)
|
|
})
|
|
|
|
FlushNATS(infra.NC)
|
|
|
|
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
|
|
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(installReply.Success).To(BeFalse())
|
|
Expect(installReply.Error).To(ContainSubstring("backend not found"))
|
|
})
|
|
})
|
|
|
|
Context("NATS backend.stop events (model unload)", func() {
|
|
It("should send backend.stop to nodes hosting the model", func() {
|
|
node := &nodes.BackendNode{
|
|
Name: "gpu-node-2", Address: "h2:50051",
|
|
}
|
|
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
|
Expect(registry.SetNodeModel(context.Background(), node.ID, "whisper-large", 0, "loaded", "", 0)).To(Succeed())
|
|
|
|
var stopReceived atomic.Int32
|
|
sub, err := infra.NC.Subscribe(messaging.SubjectNodeBackendStop(node.ID), func(data []byte) {
|
|
stopReceived.Add(1)
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer sub.Unsubscribe()
|
|
|
|
FlushNATS(infra.NC)
|
|
|
|
// Frontend calls UnloadRemoteModel (triggered by UI "Stop" or WatchDog)
|
|
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
|
|
Expect(adapter.UnloadRemoteModel("whisper-large")).To(Succeed())
|
|
|
|
Eventually(func() int32 { return stopReceived.Load() }, "5s").Should(Equal(int32(1)))
|
|
|
|
// Model should be removed from registry
|
|
nodesWithModel, _ := registry.FindNodesWithModel(context.Background(), "whisper-large")
|
|
Expect(nodesWithModel).To(BeEmpty())
|
|
})
|
|
|
|
It("should send backend.stop to all nodes hosting the model", func() {
|
|
node1 := &nodes.BackendNode{Name: "n1", Address: "h1:50051"}
|
|
node2 := &nodes.BackendNode{Name: "n2", Address: "h2:50051"}
|
|
registry.Register(context.Background(), node1, true)
|
|
registry.Register(context.Background(), node2, true)
|
|
registry.SetNodeModel(context.Background(), node1.ID, "shared-model", 0, "loaded", "", 0)
|
|
registry.SetNodeModel(context.Background(), node2.ID, "shared-model", 0, "loaded", "", 0)
|
|
|
|
var count atomic.Int32
|
|
sub1, _ := infra.NC.Subscribe(messaging.SubjectNodeBackendStop(node1.ID), func(data []byte) {
|
|
count.Add(1)
|
|
})
|
|
sub2, _ := infra.NC.Subscribe(messaging.SubjectNodeBackendStop(node2.ID), func(data []byte) {
|
|
count.Add(1)
|
|
})
|
|
defer sub1.Unsubscribe()
|
|
defer sub2.Unsubscribe()
|
|
|
|
FlushNATS(infra.NC)
|
|
|
|
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
|
|
adapter.UnloadRemoteModel("shared-model")
|
|
|
|
Eventually(func() int32 { return count.Load() }, "5s").Should(Equal(int32(2)))
|
|
})
|
|
|
|
It("should be no-op for models not on any node", func() {
|
|
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
|
|
Expect(adapter.UnloadRemoteModel("nonexistent-model")).To(Succeed())
|
|
})
|
|
})
|
|
|
|
Context("NATS node stop events (full shutdown)", func() {
|
|
It("should publish stop event to a node", func() {
|
|
node := &nodes.BackendNode{
|
|
Name: "stop-me", Address: "h3:50051",
|
|
}
|
|
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
|
|
|
|
var stopped atomic.Int32
|
|
sub, err := infra.NC.Subscribe(messaging.SubjectNodeStop(node.ID), func(data []byte) {
|
|
stopped.Add(1)
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer sub.Unsubscribe()
|
|
|
|
FlushNATS(infra.NC)
|
|
|
|
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
|
|
Expect(adapter.StopNode(node.ID)).To(Succeed())
|
|
|
|
Eventually(func() int32 { return stopped.Load() }, "5s").Should(Equal(int32(1)))
|
|
})
|
|
})
|
|
|
|
Context("NATS subject naming", func() {
|
|
It("should generate correct backend lifecycle subjects", func() {
|
|
Expect(messaging.SubjectNodeBackendInstall("node-abc")).To(Equal("nodes.node-abc.backend.install"))
|
|
Expect(messaging.SubjectNodeBackendStop("node-abc")).To(Equal("nodes.node-abc.backend.stop"))
|
|
Expect(messaging.SubjectNodeStop("node-abc")).To(Equal("nodes.node-abc.stop"))
|
|
})
|
|
})
|
|
|
|
// Design note: LoadModel is a direct gRPC call to node.Address, NOT a NATS event.
|
|
// NATS is used for backend.install (install + start process) and backend.stop.
|
|
// The SmartRouter calls grpc.NewClient(node.Address).LoadModel() directly.
|
|
//
|
|
// Flow:
|
|
// 1. NATS backend.install → worker installs backend + starts gRPC process
|
|
// 2. SmartRouter.Route() → gRPC LoadModel(node.Address) directly
|
|
// 3. [inference via gRPC]
|
|
// 4. NATS backend.stop → worker stops gRPC process
|
|
})
|