Files
LocalAI/tests/e2e/distributed/node_lifecycle_test.go
LocalAI [bot] 447c186089 fix(distributed): make backend upgrade actually re-install on workers (#9708)
* fix(distributed): make backend upgrade actually re-install on workers

UpgradeBackend dispatched a vanilla backend.install NATS event to every
node hosting the backend. The worker's installBackend short-circuits on
"already running for this (model, replica) slot" and returns the
existing address — so the gallery install path was skipped, no artifact
was re-downloaded, no metadata was written. The frontend's drift
detection then re-flagged the same backends every cycle (installedDigest
stays empty → mismatch → "Backend upgrade available (new build)") while
"Backend upgraded successfully" landed in the logs at the same time.
The user-visible symptom: clicking "Upgrade All" silently does nothing
and the same N backends sit on the upgrade list forever.

Two coupled fixes, one PR:

1. Force flag on backend.install. Add `Force bool` to
   BackendInstallRequest and thread it through NodeCommandSender ->
   RemoteUnloaderAdapter. UpgradeBackend (and the reconciler's pending-op
   drain when retrying an upgrade) sets force=true; routine load events
   and admin install endpoints keep force=false. On the worker, force=true
   stops every live process that uses this backend (resolveProcessKeys
   for peer replicas, plus the exact request processKey), skips the
   findBackend short-circuit, and passes force=true into
   gallery.InstallBackendFromGallery so the on-disk artifact is
   overwritten. After the gallery install completes, startBackend brings
   up a fresh process at the same processKey on a new port.

2. Liveness check on the fast path. installBackend's "already running"
   branch read getAddr without verifying the process was alive, so a
   gRPC backend that died without the supervisor noticing left a stale
   (key, addr) entry. The reconciler then dialed that address, got
   ECONNREFUSED, marked the replica failed, retried install — and the
   supervisor said "already running addr=…" again. Loop forever, exactly
   what we observed on a node whose llama-cpp process had died but whose
   supervisor record persisted. Verify s.isRunning(processKey) before
   trusting getAddr; if the entry is stale, stopBackendExact cleans up
   and we fall through to a real install.

Backwards-compatible: the new Force field is omitempty, older workers
ignore it (their default behavior matches force=false). The signature
change on NodeCommandSender.InstallBackend is internal-only.

Verified: unit tests in core/services/nodes pass (108s suite). The
pre-existing core/backend build break (proto regen pending for
word-level timestamps) blocks core/cli and core/http/endpoints/localai
package tests but is unrelated to this change.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7 [Claude Code]

* test(e2e/distributed): pass force=false to adapter.InstallBackend

NodeCommandSender.InstallBackend gained a final force bool in the
upgrade-force commit; the e2e distributed lifecycle tests still called
the old 8-arg signature and broke compilation. These tests exercise the
routine install path (single replica, default behavior), so force=false
preserves their existing semantics.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-07 17:28:14 +02:00

189 lines
6.5 KiB
Go

package distributed_test
import (
"context"
"encoding/json"
"sync/atomic"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
pgdriver "gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), func() {
var (
infra *TestInfra
db *gorm.DB
registry *nodes.NodeRegistry
)
BeforeEach(func() {
infra = SetupInfra("localai_lifecycle_test")
var err error
db, err = gorm.Open(pgdriver.Open(infra.PGURL), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
Expect(err).ToNot(HaveOccurred())
registry, err = nodes.NewNodeRegistry(db)
Expect(err).ToNot(HaveOccurred())
})
Context("NATS backend.install events", func() {
It("should send backend.install request-reply to a specific node", func() {
node := &nodes.BackendNode{
Name: "gpu-node-1", Address: "h1:50051",
}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
// Simulate worker subscribing to backend.install and replying success
infra.NC.SubscribeReply(messaging.SubjectNodeBackendInstall(node.ID), func(data []byte, reply func([]byte)) {
var req messaging.BackendInstallRequest
json.Unmarshal(data, &req)
Expect(req.Backend).To(Equal("llama-cpp"))
resp := messaging.BackendInstallReply{Success: true}
respData, _ := json.Marshal(resp)
reply(respData)
})
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, false)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeTrue())
})
It("should propagate error from worker on failed install", func() {
node := &nodes.BackendNode{
Name: "fail-node", Address: "h1:50051",
}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
// Simulate worker replying with error
infra.NC.SubscribeReply(messaging.SubjectNodeBackendInstall(node.ID), func(data []byte, reply func([]byte)) {
resp := messaging.BackendInstallReply{Success: false, Error: "backend not found"}
respData, _ := json.Marshal(resp)
reply(respData)
})
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, false)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeFalse())
Expect(installReply.Error).To(ContainSubstring("backend not found"))
})
})
Context("NATS backend.stop events (model unload)", func() {
It("should send backend.stop to nodes hosting the model", func() {
node := &nodes.BackendNode{
Name: "gpu-node-2", Address: "h2:50051",
}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
Expect(registry.SetNodeModel(context.Background(), node.ID, "whisper-large", 0, "loaded", "", 0)).To(Succeed())
var stopReceived atomic.Int32
sub, err := infra.NC.Subscribe(messaging.SubjectNodeBackendStop(node.ID), func(data []byte) {
stopReceived.Add(1)
})
Expect(err).ToNot(HaveOccurred())
defer sub.Unsubscribe()
FlushNATS(infra.NC)
// Frontend calls UnloadRemoteModel (triggered by UI "Stop" or WatchDog)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
Expect(adapter.UnloadRemoteModel("whisper-large")).To(Succeed())
Eventually(func() int32 { return stopReceived.Load() }, "5s").Should(Equal(int32(1)))
// Model should be removed from registry
nodesWithModel, _ := registry.FindNodesWithModel(context.Background(), "whisper-large")
Expect(nodesWithModel).To(BeEmpty())
})
It("should send backend.stop to all nodes hosting the model", func() {
node1 := &nodes.BackendNode{Name: "n1", Address: "h1:50051"}
node2 := &nodes.BackendNode{Name: "n2", Address: "h2:50051"}
registry.Register(context.Background(), node1, true)
registry.Register(context.Background(), node2, true)
registry.SetNodeModel(context.Background(), node1.ID, "shared-model", 0, "loaded", "", 0)
registry.SetNodeModel(context.Background(), node2.ID, "shared-model", 0, "loaded", "", 0)
var count atomic.Int32
sub1, _ := infra.NC.Subscribe(messaging.SubjectNodeBackendStop(node1.ID), func(data []byte) {
count.Add(1)
})
sub2, _ := infra.NC.Subscribe(messaging.SubjectNodeBackendStop(node2.ID), func(data []byte) {
count.Add(1)
})
defer sub1.Unsubscribe()
defer sub2.Unsubscribe()
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter.UnloadRemoteModel("shared-model")
Eventually(func() int32 { return count.Load() }, "5s").Should(Equal(int32(2)))
})
It("should be no-op for models not on any node", func() {
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
Expect(adapter.UnloadRemoteModel("nonexistent-model")).To(Succeed())
})
})
Context("NATS node stop events (full shutdown)", func() {
It("should publish stop event to a node", func() {
node := &nodes.BackendNode{
Name: "stop-me", Address: "h3:50051",
}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
var stopped atomic.Int32
sub, err := infra.NC.Subscribe(messaging.SubjectNodeStop(node.ID), func(data []byte) {
stopped.Add(1)
})
Expect(err).ToNot(HaveOccurred())
defer sub.Unsubscribe()
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
Expect(adapter.StopNode(node.ID)).To(Succeed())
Eventually(func() int32 { return stopped.Load() }, "5s").Should(Equal(int32(1)))
})
})
Context("NATS subject naming", func() {
It("should generate correct backend lifecycle subjects", func() {
Expect(messaging.SubjectNodeBackendInstall("node-abc")).To(Equal("nodes.node-abc.backend.install"))
Expect(messaging.SubjectNodeBackendStop("node-abc")).To(Equal("nodes.node-abc.backend.stop"))
Expect(messaging.SubjectNodeStop("node-abc")).To(Equal("nodes.node-abc.stop"))
})
})
// Design note: LoadModel is a direct gRPC call to node.Address, NOT a NATS event.
// NATS is used for backend.install (install + start process) and backend.stop.
// The SmartRouter calls grpc.NewClient(node.Address).LoadModel() directly.
//
// Flow:
// 1. NATS backend.install → worker installs backend + starts gRPC process
// 2. SmartRouter.Route() → gRPC LoadModel(node.Address) directly
// 3. [inference via gRPC]
// 4. NATS backend.stop → worker stops gRPC process
})