From e4fec357720a9a300b173a0775ef5fd8a3e75752 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 7 Jun 2026 23:31:53 +0000 Subject: [PATCH] fix(review): address self-review findings on the distributed install fixes Three findings from an adversarial review of this branch: 1. CRITICAL - OpCache.GetStatus crashed under concurrent load. m.Map() returns the live internal map by reference, so deleting from it on the read path was an unsynchronized write to a map four HTTP handlers poll every ~1s -> a 'concurrent map writes' fatal. Rewritten to iterate a Keys() snapshot, build a fresh result map, and apply evictions via the locked DeleteUUID after the loop. Added a -race concurrency regression guard. 2. HIGH - GetStatus evicted failed ops too, hiding them from /api/operations and breaking the dismiss-failed-op flow (the panel keeps Error != nil ops so the admin can read the error and click Dismiss). Eviction now fires only for terminal ops with Error == nil (success/cancelled); failures are retained. 3. MEDIUM - DeleteStalePendingBackendOps missed StatusUnhealthy nodes. A node marked unhealthy on a NATS ErrNoResponders never transitions to offline (health.go skips re-marking it), so its pending ops leaked exactly like the offline case. Unhealthy is now reaped via the same stale-heartbeat grace path (a fresh-heartbeat node is recovering and keeps its op). Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/services/galleryop/opcache_evict_test.go | 31 +++++++++++++++++-- core/services/galleryop/operation.go | 29 ++++++++++++----- .../services/nodes/pending_op_cleanup_test.go | 26 ++++++++++++++++ core/services/nodes/registry.go | 11 +++++-- 4 files changed, 85 insertions(+), 12 deletions(-) diff --git a/core/services/galleryop/opcache_evict_test.go b/core/services/galleryop/opcache_evict_test.go index 568695b52..299868080 100644 --- a/core/services/galleryop/opcache_evict_test.go +++ b/core/services/galleryop/opcache_evict_test.go @@ -43,12 +43,12 @@ var _ = Describe("OpCache.GetStatus eviction", func() { Expect(cache.Exists("llama-cpp")).To(BeFalse()) }) - It("evicts a failed op", func() { + It("keeps a failed op so the operations panel can surface the error and offer Dismiss", func() { cache.SetBackend("piper", "uuid-2") svc.UpdateStatus("uuid-2", &galleryop.OpStatus{Processed: true, Error: errors.New("boom")}) processing, _ := cache.GetStatus() - Expect(processing).NotTo(HaveKey("piper")) - Expect(cache.Exists("piper")).To(BeFalse()) + Expect(processing).To(HaveKeyWithValue("piper", "uuid-2")) + Expect(cache.Exists("piper")).To(BeTrue()) }) It("evicts a cancelled op", func() { @@ -64,4 +64,29 @@ var _ = Describe("OpCache.GetStatus eviction", func() { Expect(processing).To(HaveKeyWithValue("whisper", "uuid-4")) Expect(taskTypes).To(HaveKeyWithValue("whisper", "Waiting")) }) + + // Regression guard: GetStatus is called concurrently by four HTTP handlers + // (~1s poll). An earlier version evicted by deleting from m.Map() — which + // returns the live internal map by reference — causing a fatal + // "concurrent map writes" crash. Run under -race; must not panic or race. + It("is safe under concurrent GetStatus + Set/complete", func() { + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + for i := 0; i < 2000; i++ { + _, _ = cache.GetStatus() + } + close(done) + }() + for i := 0; i < 2000; i++ { + id := "uuid-c" + cache.SetBackend("concurrent-backend", id) + // Half the time mark it completed so GetStatus evicts it. + if i%2 == 0 { + svc.UpdateStatus(id, &galleryop.OpStatus{Processed: true, Progress: 100, Message: "completed"}) + } + _, _ = cache.GetStatus() + } + <-done + }) }) diff --git a/core/services/galleryop/operation.go b/core/services/galleryop/operation.go index ee9407ed3..5b5a8a7cc 100644 --- a/core/services/galleryop/operation.go +++ b/core/services/galleryop/operation.go @@ -408,23 +408,34 @@ func (m *OpCache) Exists(key string) bool { } func (m *OpCache) GetStatus() (map[string]string, map[string]string) { - processingModelsData := m.Map() - taskTypes := map[string]string{} + processingModelsData := map[string]string{} - for k, v := range processingModelsData { + // Iterate a snapshot (Keys() copies) and build a fresh result map. We must + // NOT delete from m.Map() during the range: Map() returns the live internal + // map by reference, so a bare delete here would be an unsynchronized write + // to a map four HTTP handlers read every ~1s — a concurrent-map-write crash. + // Collect evictions and apply them via the locked DeleteUUID after the loop. + var evict []string + for _, k := range m.status.Keys() { + v := m.status.Get(k) + if v == "" { + continue // raced with a concurrent Delete + } status := m.galleryService.GetStatus(v) // Terminal ops must not keep showing as "processing". Cleanup was // previously only triggered by a client polling /api/backends/job/:uid, // but the Manage-page Reinstall/Upgrade buttons never poll, so completed // ops leaked into processingBackends forever and the card spun // "reinstalling" indefinitely. Evict here on the list read (the UI always - // calls this). DeleteUUID broadcasts the eviction so peer replicas converge. - if status != nil && status.Processed { - m.DeleteUUID(v) - delete(processingModelsData, k) + // calls this). We only evict SUCCESS/cancelled terminals (Error == nil): + // failed ops are kept so /api/operations can surface the error and offer + // Dismiss. DeleteUUID broadcasts the eviction so peer replicas converge. + if status != nil && status.Processed && status.Error == nil { + evict = append(evict, v) continue } + processingModelsData[k] = v taskTypes[k] = "Installation" if status != nil && status.Deletion { taskTypes[k] = "Deletion" @@ -433,6 +444,10 @@ func (m *OpCache) GetStatus() (map[string]string, map[string]string) { } } + for _, v := range evict { + m.DeleteUUID(v) + } + return processingModelsData, taskTypes } diff --git a/core/services/nodes/pending_op_cleanup_test.go b/core/services/nodes/pending_op_cleanup_test.go index e70337874..ad8610cc4 100644 --- a/core/services/nodes/pending_op_cleanup_test.go +++ b/core/services/nodes/pending_op_cleanup_test.go @@ -85,6 +85,32 @@ var _ = Describe("DeleteStalePendingBackendOps", func() { Expect(pendingCountFor(drain)).To(Equal(int64(0))) }) + It("clears ops behind an unhealthy node with a stale heartbeat (never ages to offline)", func() { + // A node marked unhealthy on a NATS ErrNoResponders never transitions to + // offline, so its ops must be reaped via the same stale-heartbeat path. + sick := registerBackend("agx-orin-sick", "10.0.0.7:50051") + Expect(registry.UpsertPendingBackendOp(ctx, sick, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed()) + Expect(registry.MarkUnhealthy(ctx, sick)).To(Succeed()) + setHeartbeat(sick, time.Now().Add(-1*time.Hour)) + + removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(removed).To(Equal(int64(1))) + Expect(pendingCountFor(sick)).To(Equal(int64(0))) + }) + + It("keeps ops behind an unhealthy node that is still heartbeating (recovering)", func() { + recovering := registerBackend("agx-orin-flap", "10.0.0.8:50051") + Expect(registry.UpsertPendingBackendOp(ctx, recovering, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed()) + Expect(registry.MarkUnhealthy(ctx, recovering)).To(Succeed()) + setHeartbeat(recovering, time.Now()) // fresh heartbeat → recovering + + removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(removed).To(Equal(int64(0))) + Expect(pendingCountFor(recovering)).To(Equal(int64(1))) + }) + It("keeps ops behind a node that only just went offline (within grace)", func() { blip := registerBackend("agx-orin", "10.0.0.4:50051") Expect(registry.UpsertPendingBackendOp(ctx, blip, "parakeet-cpp-development", OpBackendInstall, nil)).To(Succeed()) diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index 6c7c97f12..822efeb89 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -1787,10 +1787,17 @@ func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) erro // number of rows deleted. func (r *NodeRegistry) DeleteStalePendingBackendOps(ctx context.Context, grace time.Duration) (int64, error) { cutoff := time.Now().Add(-grace) + // Draining nodes are cleared immediately (admin action; model rows already + // purged). Offline AND unhealthy nodes are cleared only once their heartbeat + // is older than the grace window: a node marked unhealthy on a NATS + // ErrNoResponders never transitions to offline (health.go skips re-marking + // it), so without including unhealthy here its ops would leak exactly like + // the offline case. A node with a fresh heartbeat (last_heartbeat > cutoff) + // is recovering and keeps its op for retry. res := r.db.WithContext(ctx). Where(`node_id IN (SELECT id FROM backend_nodes WHERE status = ?) - OR node_id IN (SELECT id FROM backend_nodes WHERE status = ? AND last_heartbeat <= ?)`, - StatusDraining, StatusOffline, cutoff). + OR node_id IN (SELECT id FROM backend_nodes WHERE status IN ? AND last_heartbeat <= ?)`, + StatusDraining, []string{StatusOffline, StatusUnhealthy}, cutoff). Delete(&PendingBackendOp{}) if res.Error != nil { return 0, fmt.Errorf("deleting stale pending backend ops: %w", res.Error)