fix(review): address self-review findings on the distributed install fixes

Three findings from an adversarial review of this branch:

1. CRITICAL - OpCache.GetStatus crashed under concurrent load. m.Map() returns
   the live internal map by reference, so deleting from it on the read path was
   an unsynchronized write to a map four HTTP handlers poll every ~1s -> a
   'concurrent map writes' fatal. Rewritten to iterate a Keys() snapshot, build
   a fresh result map, and apply evictions via the locked DeleteUUID after the
   loop. Added a -race concurrency regression guard.

2. HIGH - GetStatus evicted failed ops too, hiding them from /api/operations
   and breaking the dismiss-failed-op flow (the panel keeps Error != nil ops so
   the admin can read the error and click Dismiss). Eviction now fires only for
   terminal ops with Error == nil (success/cancelled); failures are retained.

3. MEDIUM - DeleteStalePendingBackendOps missed StatusUnhealthy nodes. A node
   marked unhealthy on a NATS ErrNoResponders never transitions to offline
   (health.go skips re-marking it), so its pending ops leaked exactly like the
   offline case. Unhealthy is now reaped via the same stale-heartbeat grace path
   (a fresh-heartbeat node is recovering and keeps its op).

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-07 23:31:53 +00:00
parent 05c0a08e24
commit e4fec35772
4 changed files with 85 additions and 12 deletions

View File

@@ -43,12 +43,12 @@ var _ = Describe("OpCache.GetStatus eviction", func() {
Expect(cache.Exists("llama-cpp")).To(BeFalse())
})
It("evicts a failed op", func() {
It("keeps a failed op so the operations panel can surface the error and offer Dismiss", func() {
cache.SetBackend("piper", "uuid-2")
svc.UpdateStatus("uuid-2", &galleryop.OpStatus{Processed: true, Error: errors.New("boom")})
processing, _ := cache.GetStatus()
Expect(processing).NotTo(HaveKey("piper"))
Expect(cache.Exists("piper")).To(BeFalse())
Expect(processing).To(HaveKeyWithValue("piper", "uuid-2"))
Expect(cache.Exists("piper")).To(BeTrue())
})
It("evicts a cancelled op", func() {
@@ -64,4 +64,29 @@ var _ = Describe("OpCache.GetStatus eviction", func() {
Expect(processing).To(HaveKeyWithValue("whisper", "uuid-4"))
Expect(taskTypes).To(HaveKeyWithValue("whisper", "Waiting"))
})
// Regression guard: GetStatus is called concurrently by four HTTP handlers
// (~1s poll). An earlier version evicted by deleting from m.Map() — which
// returns the live internal map by reference — causing a fatal
// "concurrent map writes" crash. Run under -race; must not panic or race.
It("is safe under concurrent GetStatus + Set/complete", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
for i := 0; i < 2000; i++ {
_, _ = cache.GetStatus()
}
close(done)
}()
for i := 0; i < 2000; i++ {
id := "uuid-c"
cache.SetBackend("concurrent-backend", id)
// Half the time mark it completed so GetStatus evicts it.
if i%2 == 0 {
svc.UpdateStatus(id, &galleryop.OpStatus{Processed: true, Progress: 100, Message: "completed"})
}
_, _ = cache.GetStatus()
}
<-done
})
})

View File

@@ -408,23 +408,34 @@ func (m *OpCache) Exists(key string) bool {
}
func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
processingModelsData := m.Map()
taskTypes := map[string]string{}
processingModelsData := map[string]string{}
for k, v := range processingModelsData {
// Iterate a snapshot (Keys() copies) and build a fresh result map. We must
// NOT delete from m.Map() during the range: Map() returns the live internal
// map by reference, so a bare delete here would be an unsynchronized write
// to a map four HTTP handlers read every ~1s — a concurrent-map-write crash.
// Collect evictions and apply them via the locked DeleteUUID after the loop.
var evict []string
for _, k := range m.status.Keys() {
v := m.status.Get(k)
if v == "" {
continue // raced with a concurrent Delete
}
status := m.galleryService.GetStatus(v)
// Terminal ops must not keep showing as "processing". Cleanup was
// previously only triggered by a client polling /api/backends/job/:uid,
// but the Manage-page Reinstall/Upgrade buttons never poll, so completed
// ops leaked into processingBackends forever and the card spun
// "reinstalling" indefinitely. Evict here on the list read (the UI always
// calls this). DeleteUUID broadcasts the eviction so peer replicas converge.
if status != nil && status.Processed {
m.DeleteUUID(v)
delete(processingModelsData, k)
// calls this). We only evict SUCCESS/cancelled terminals (Error == nil):
// failed ops are kept so /api/operations can surface the error and offer
// Dismiss. DeleteUUID broadcasts the eviction so peer replicas converge.
if status != nil && status.Processed && status.Error == nil {
evict = append(evict, v)
continue
}
processingModelsData[k] = v
taskTypes[k] = "Installation"
if status != nil && status.Deletion {
taskTypes[k] = "Deletion"
@@ -433,6 +444,10 @@ func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
}
}
for _, v := range evict {
m.DeleteUUID(v)
}
return processingModelsData, taskTypes
}

View File

@@ -85,6 +85,32 @@ var _ = Describe("DeleteStalePendingBackendOps", func() {
Expect(pendingCountFor(drain)).To(Equal(int64(0)))
})
It("clears ops behind an unhealthy node with a stale heartbeat (never ages to offline)", func() {
// A node marked unhealthy on a NATS ErrNoResponders never transitions to
// offline, so its ops must be reaped via the same stale-heartbeat path.
sick := registerBackend("agx-orin-sick", "10.0.0.7:50051")
Expect(registry.UpsertPendingBackendOp(ctx, sick, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed())
Expect(registry.MarkUnhealthy(ctx, sick)).To(Succeed())
setHeartbeat(sick, time.Now().Add(-1*time.Hour))
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(1)))
Expect(pendingCountFor(sick)).To(Equal(int64(0)))
})
It("keeps ops behind an unhealthy node that is still heartbeating (recovering)", func() {
recovering := registerBackend("agx-orin-flap", "10.0.0.8:50051")
Expect(registry.UpsertPendingBackendOp(ctx, recovering, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed())
Expect(registry.MarkUnhealthy(ctx, recovering)).To(Succeed())
setHeartbeat(recovering, time.Now()) // fresh heartbeat → recovering
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(0)))
Expect(pendingCountFor(recovering)).To(Equal(int64(1)))
})
It("keeps ops behind a node that only just went offline (within grace)", func() {
blip := registerBackend("agx-orin", "10.0.0.4:50051")
Expect(registry.UpsertPendingBackendOp(ctx, blip, "parakeet-cpp-development", OpBackendInstall, nil)).To(Succeed())

View File

@@ -1787,10 +1787,17 @@ func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) erro
// number of rows deleted.
func (r *NodeRegistry) DeleteStalePendingBackendOps(ctx context.Context, grace time.Duration) (int64, error) {
cutoff := time.Now().Add(-grace)
// Draining nodes are cleared immediately (admin action; model rows already
// purged). Offline AND unhealthy nodes are cleared only once their heartbeat
// is older than the grace window: a node marked unhealthy on a NATS
// ErrNoResponders never transitions to offline (health.go skips re-marking
// it), so without including unhealthy here its ops would leak exactly like
// the offline case. A node with a fresh heartbeat (last_heartbeat > cutoff)
// is recovering and keeps its op for retry.
res := r.db.WithContext(ctx).
Where(`node_id IN (SELECT id FROM backend_nodes WHERE status = ?)
OR node_id IN (SELECT id FROM backend_nodes WHERE status = ? AND last_heartbeat <= ?)`,
StatusDraining, StatusOffline, cutoff).
OR node_id IN (SELECT id FROM backend_nodes WHERE status IN ? AND last_heartbeat <= ?)`,
StatusDraining, []string{StatusOffline, StatusUnhealthy}, cutoff).
Delete(&PendingBackendOp{})
if res.Error != nil {
return 0, fmt.Errorf("deleting stale pending backend ops: %w", res.Error)