Files
LocalAI/core/services/nodes/unloader_test.go
LocalAI [bot] 92dea961c2 fix: distributed backend reinstall/upgrade UI stuck on 'reinstalling' (#10214)
* fix(galleryop): self-evict terminal ops from OpCache.GetStatus

The processingBackends map (the UI 'reinstalling' spinner source) only cleared
an op when a client polled /api/backends/job/:uid. The Manage-page Reinstall and
Upgrade buttons never poll, so completed installs leaked into processingBackends
forever and the backend card spun 'reinstalling' even though the install had
finished. Evict terminal ops on the list read instead; DeleteUUID already
broadcasts the eviction so peer replicas converge.

Reproduced on a live 5-node distributed cluster: 5 backends sat in
processingBackends with underlying jobs reporting completed:true,progress:100.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(nodes): clear pending backend ops behind offline/draining nodes

ListDuePendingBackendOps filters status=healthy, so a backend op queued against
a node that went offline (stale heartbeat) or draining (admin action) was never
retried, aged out, or deleted - it leaked forever and kept the UI operation
spinning. Add DeleteStalePendingBackendOps and run it each reconcile pass:
draining nodes are cleared immediately (model rows already purged), offline
nodes once their heartbeat is older than a grace window (blip protection).

Reproduced on a live cluster: orphaned llama-cpp install rows targeting an
offline (nvidia-thor) and a draining (mac-mini-m4) node sat at attempts=0
indefinitely.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(nodes): stream per-node progress during backend upgrade

The install dispatch subscribed to a per-op progress subject and streamed
per-node download ticks; the upgrade dispatch did a bare 15-minute blocking
NATS round-trip with no subscription, so the UI showed progress:0 the whole
time (the 'reinstalling but nothing happens' report on a slow node).

Thread the op ID through BackendManager.UpgradeBackend -> the distributed
manager -> the adapter, and have the adapter subscribe to the per-op progress
subject before the request (extracted into a shared subscribeProgress helper
reused by install/upgrade/force-fallback). The worker's upgradeBackend now
creates the same DebouncedInstallProgressPublisher installBackend uses. An
upgrade is a force-reinstall, so it reuses SubjectNodeBackendInstallProgress
rather than minting a new subject - no new NATS permission, no new
rolling-update compat surface. Reconciler-driven retries pass empty
opID/onProgress and stay on the silent path.

Reproduced on a live cluster: upgrade of llama-cpp-development on agx-orin-slow
sat at progress:0 for 4+ minutes with no per-node feedback.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(galleryop): persist cancellation + periodically reap orphaned ops

Two distributed gaps surfaced when a replica was killed mid-upgrade on a live
cluster, leaving the backend stuck 'processing' in the UI forever:

1. CancelOperation flipped the in-memory status to cancelled and broadcast a
   NATS event but never persisted the terminal status. On the next replica
   restart the still-active row re-hydrated straight back into
   processingBackends and the UI spun again. It now calls store.Cancel(id) so
   the cancel survives a restart.

2. CleanStale (which marks abandoned active ops failed) only ran once on
   startup, so an op orphaned AFTER startup - its owning replica's foreground
   handler goroutine gone - was never reaped until the next restart. Add
   GalleryService.ReapStaleOperations and run it on a 15m ticker (CleanStale
   now returns the reaped count for observability).

Neither is covered by the OpCache self-evict fix: an orphaned op never reaches
Processed, so it would never self-evict.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(review): address self-review findings on the distributed install fixes

Three findings from an adversarial review of this branch:

1. CRITICAL - OpCache.GetStatus crashed under concurrent load. m.Map() returns
   the live internal map by reference, so deleting from it on the read path was
   an unsynchronized write to a map four HTTP handlers poll every ~1s -> a
   'concurrent map writes' fatal. Rewritten to iterate a Keys() snapshot, build
   a fresh result map, and apply evictions via the locked DeleteUUID after the
   loop. Added a -race concurrency regression guard.

2. HIGH - GetStatus evicted failed ops too, hiding them from /api/operations
   and breaking the dismiss-failed-op flow (the panel keeps Error != nil ops so
   the admin can read the error and click Dismiss). Eviction now fires only for
   terminal ops with Error == nil (success/cancelled); failures are retained.

3. MEDIUM - DeleteStalePendingBackendOps missed StatusUnhealthy nodes. A node
   marked unhealthy on a NATS ErrNoResponders never transitions to offline
   (health.go skips re-marking it), so its pending ops leaked exactly like the
   offline case. Unhealthy is now reaped via the same stale-heartbeat grace path
   (a fresh-heartbeat node is recovering and keeps its op).

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(review-2): don't evict the still-installing soft-path; don't spin on failed ops

Second review pass found two issues:

1. MEDIUM (Go) - OpCache.GetStatus evicted the ErrWorkerStillInstalling
   soft-path op. That op is deliberately Processed=true with no error to show a
   yellow in-progress state when a worker timed out the NATS round-trip but is
   still installing in the background; the reconciler confirms the real outcome
   later. Evicting it (and broadcasting OpEnd + marking the DB completed) hid an
   install that may still fail. Eviction is now scoped to a clean success
   (progress 100 + 'completed', matching the job-poll's historical condition) or
   a cancellation - the soft-path (progress != 100) and failures are kept.

2. MEDIUM (React) - the Backends gallery card rendered ANY operation as an
   'Installing...' spinner, so a failed op (now intentionally kept in the list
   for the OperationsBar error + Dismiss) spun forever. Exclude errored ops from
   the card spinner, mirroring Models.jsx (isInstalling already excludes
   op.error). The error + Dismiss still surface in the global OperationsBar.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(ui): refresh Manage backends table when an operation settles

The Manage backends table fetched installed backends only on mount/after delete
and checked upgrades only on tab activation. After a reinstall/upgrade completed
neither re-ran, so the installed-version cell and the 'update available' badge
stayed stale until the user switched tabs - the op looked like it 'did nothing'.

Watch the operations list (via useOperations) and re-fetch installed backends +
available upgrades whenever the count settles, mirroring the operations.length
watch Backends.jsx already uses. Consolidates the prior tab-activation upgrades
check into the same effect.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-08 10:03:02 +02:00

359 lines
12 KiB
Go

package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/nats-io/nats.go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
)
// --- Fakes ---
// fakeModelLocator implements ModelLocator with configurable node lists.
type fakeModelLocator struct {
nodes []BackendNode
findErr error
removedPairs []modelNodePair // records RemoveNodeModel calls
}
type modelNodePair struct {
nodeID string
modelName string
}
func (f *fakeModelLocator) FindNodesWithModel(_ context.Context, _ string) ([]BackendNode, error) {
return f.nodes, f.findErr
}
func (f *fakeModelLocator) RemoveNodeModel(_ context.Context, nodeID, modelName string, _ int) error {
f.removedPairs = append(f.removedPairs, modelNodePair{nodeID, modelName})
return nil
}
func (f *fakeModelLocator) RemoveAllNodeModelReplicas(_ context.Context, nodeID, modelName string) error {
f.removedPairs = append(f.removedPairs, modelNodePair{nodeID, modelName})
return nil
}
// fakeMessagingClient implements messaging.MessagingClient, recording Publish
// and Request calls so we can assert on subjects and payloads.
type fakeMessagingClient struct {
mu sync.Mutex
published []publishCall
publishErr error // error to return from Publish
requestReply []byte
requestErr error
requestCalls []requestCall
}
type publishCall struct {
Subject string
Data []byte
}
type requestCall struct {
Subject string
Data []byte
Timeout time.Duration
}
func (f *fakeMessagingClient) Publish(subject string, data any) error {
f.mu.Lock()
defer f.mu.Unlock()
var raw []byte
if data != nil {
var err error
raw, err = json.Marshal(data)
if err != nil {
return err
}
}
f.published = append(f.published, publishCall{Subject: subject, Data: raw})
return f.publishErr
}
func (f *fakeMessagingClient) Subscribe(_ string, _ func([]byte)) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) QueueSubscribe(_ string, _ string, _ func([]byte)) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) QueueSubscribeReply(_ string, _ string, _ func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) SubscribeReply(_ string, _ func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data, Timeout: timeout})
return f.requestReply, f.requestErr
}
func (f *fakeMessagingClient) IsConnected() bool { return true }
func (f *fakeMessagingClient) Close() {}
type fakeSubscription struct{}
func (f *fakeSubscription) Unsubscribe() error { return nil }
// --- Tests ---
var _ = Describe("RemoteUnloaderAdapter", func() {
var (
locator *fakeModelLocator
mc *fakeMessagingClient
adapter *RemoteUnloaderAdapter
)
BeforeEach(func() {
locator = &fakeModelLocator{}
mc = &fakeMessagingClient{}
adapter = NewRemoteUnloaderAdapter(locator, mc, 3*time.Minute, 15*time.Minute)
})
Describe("UnloadRemoteModel", func() {
It("with no nodes returns nil", func() {
locator.nodes = nil
Expect(adapter.UnloadRemoteModel("my-model")).To(Succeed())
Expect(mc.published).To(BeEmpty())
})
It("broadcasts to all nodes with model", func() {
locator.nodes = []BackendNode{
{ID: "node-1", Name: "worker-1"},
{ID: "node-2", Name: "worker-2"},
}
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
// Should have published a StopBackend for each node.
Expect(mc.published).To(HaveLen(2))
Expect(mc.published[0].Subject).To(Equal(messaging.SubjectNodeBackendStop("node-1")))
Expect(mc.published[1].Subject).To(Equal(messaging.SubjectNodeBackendStop("node-2")))
// Should have removed the model from each node in the registry.
Expect(locator.removedPairs).To(HaveLen(2))
Expect(locator.removedPairs[0]).To(Equal(modelNodePair{"node-1", "llama"}))
Expect(locator.removedPairs[1]).To(Equal(modelNodePair{"node-2", "llama"}))
})
It("continues when one node fails", func() {
locator.nodes = []BackendNode{
{ID: "node-fail", Name: "worker-fail"},
{ID: "node-ok", Name: "worker-ok"},
}
// Use a messaging client that fails the first Publish call only.
failOnce := &failOnceMessagingClient{inner: mc, failOn: 0}
adapter = NewRemoteUnloaderAdapter(locator, failOnce, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
// The second node should still have been processed.
// The first node's StopBackend errored, so RemoveNodeModel was NOT called for it.
// The second node's StopBackend succeeded, so RemoveNodeModel WAS called.
Expect(locator.removedPairs).To(HaveLen(1))
Expect(locator.removedPairs[0].nodeID).To(Equal("node-ok"))
})
})
Describe("StopBackend", func() {
It("with empty backend publishes nil payload", func() {
Expect(adapter.StopBackend("node-1", "")).To(Succeed())
Expect(mc.published).To(HaveLen(1))
Expect(mc.published[0].Subject).To(Equal(messaging.SubjectNodeBackendStop("node-1")))
Expect(mc.published[0].Data).To(BeNil())
})
It("with backend name publishes JSON", func() {
Expect(adapter.StopBackend("node-1", "llama-backend")).To(Succeed())
Expect(mc.published).To(HaveLen(1))
var payload struct {
Backend string `json:"backend"`
}
Expect(json.Unmarshal(mc.published[0].Data, &payload)).To(Succeed())
Expect(payload.Backend).To(Equal("llama-backend"))
})
})
Describe("StopNode", func() {
It("publishes to correct subject", func() {
Expect(adapter.StopNode("node-abc")).To(Succeed())
Expect(mc.published).To(HaveLen(1))
Expect(mc.published[0].Subject).To(Equal(messaging.SubjectNodeStop("node-abc")))
Expect(mc.published[0].Data).To(BeNil())
})
})
Describe("DeleteModelFiles", func() {
It("with no nodes returns nil", func() {
locator.nodes = nil
Expect(adapter.DeleteModelFiles("my-model")).To(Succeed())
})
It("continues on failure", func() {
locator.nodes = []BackendNode{
{ID: "node-1", Name: "w1"},
{ID: "node-2", Name: "w2"},
}
// Request will fail for all calls.
mc.requestErr = fmt.Errorf("timeout")
Expect(adapter.DeleteModelFiles("my-model")).To(Succeed())
// Both nodes attempted.
Expect(mc.requestCalls).To(HaveLen(2))
Expect(mc.requestCalls[0].Subject).To(Equal(messaging.SubjectNodeModelDelete("node-1")))
Expect(mc.requestCalls[1].Subject).To(Equal(messaging.SubjectNodeModelDelete("node-2")))
})
})
})
// failOnceMessagingClient wraps fakeMessagingClient but fails the Publish call
// at index failOn (0-based) and succeeds all others.
type failOnceMessagingClient struct {
inner *fakeMessagingClient
failOn int
callIdx int
mu sync.Mutex
}
func (f *failOnceMessagingClient) Publish(subject string, data any) error {
f.mu.Lock()
idx := f.callIdx
f.callIdx++
f.mu.Unlock()
if idx == f.failOn {
return fmt.Errorf("simulated failure")
}
return f.inner.Publish(subject, data)
}
func (f *failOnceMessagingClient) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
return f.inner.Subscribe(subject, handler)
}
func (f *failOnceMessagingClient) QueueSubscribe(subject, queue string, handler func([]byte)) (messaging.Subscription, error) {
return f.inner.QueueSubscribe(subject, queue, handler)
}
func (f *failOnceMessagingClient) QueueSubscribeReply(subject, queue string, handler func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return f.inner.QueueSubscribeReply(subject, queue, handler)
}
func (f *failOnceMessagingClient) SubscribeReply(subject string, handler func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return f.inner.SubscribeReply(subject, handler)
}
func (f *failOnceMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
return f.inner.Request(subject, data, timeout)
}
func (f *failOnceMessagingClient) IsConnected() bool { return true }
func (f *failOnceMessagingClient) Close() {}
var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
It("passes the configured install timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(7 * time.Minute))
})
It("passes the configured upgrade timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendUpgrade("n1"), messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute))
})
})
var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() {
It("wraps nats.ErrTimeout from InstallBackend in galleryop.ErrWorkerStillInstalling", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
})
It("does NOT wrap non-timeout errors", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse())
Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue())
})
})
var _ = Describe("RemoteUnloaderAdapter install progress streaming", func() {
It("forwards BackendInstallProgressEvent values into the onProgress callback when the worker publishes them", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
mc.scheduleProgressPublish("n1", "op-abc", []messaging.BackendInstallProgressEvent{
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "500 MB", Total: "1 GB", Percentage: 50},
})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
var (
received []messaging.BackendInstallProgressEvent
mu sync.Mutex
)
onProgress := func(ev messaging.BackendInstallProgressEvent) {
mu.Lock()
defer mu.Unlock()
received = append(received, ev)
}
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "op-abc", onProgress)
Expect(err).ToNot(HaveOccurred())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(received)
}, "1s").Should(Equal(2))
})
It("does NOT subscribe when onProgress is nil (reconciler retry path)", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.subscribeCalls()).To(BeEmpty(),
"reconciler-driven retries must not subscribe to the per-op progress subject")
})
})