fix(nodes): clear pending backend ops behind offline/draining nodes

ListDuePendingBackendOps filters status=healthy, so a backend op queued against
a node that went offline (stale heartbeat) or draining (admin action) was never
retried, aged out, or deleted - it leaked forever and kept the UI operation
spinning. Add DeleteStalePendingBackendOps and run it each reconcile pass:
draining nodes are cleared immediately (model rows already purged), offline
nodes once their heartbeat is older than a grace window (blip protection).

Reproduced on a live cluster: orphaned llama-cpp install rows targeting an
offline (nvidia-thor) and a draining (mac-mini-m4) node sat at attempts=0
indefinitely.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-07 22:37:24 +00:00
parent 9a7ebc1151
commit 47fa847d55
3 changed files with 148 additions and 0 deletions

View File

@@ -0,0 +1,109 @@
package nodes
import (
"context"
"runtime"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/testutil"
)
// These specs reproduce the distributed "pending ops behind dead nodes leak
// forever" bug. ListDuePendingBackendOps only returns rows whose node is
// StatusHealthy, so an op queued against a node that goes offline (heartbeat
// stale) or draining (admin action) is never retried, never aged out, and
// never deleted. On a live cluster these rows sat at attempts=0 indefinitely
// and kept the UI operation alive. DeleteStalePendingBackendOps garbage-collects
// them: draining nodes immediately (models already purged), offline nodes only
// after a grace window so a brief heartbeat blip does not nuke in-flight work.
var _ = Describe("DeleteStalePendingBackendOps", func() {
var (
registry *NodeRegistry
ctx context.Context
)
BeforeEach(func() {
if runtime.GOOS == "darwin" {
Skip("testcontainers requires Docker, not available on macOS CI")
}
db := testutil.SetupTestDB()
var err error
registry, err = NewNodeRegistry(db)
Expect(err).ToNot(HaveOccurred())
ctx = context.Background()
})
// registerBackend registers an auto-approved backend node and returns its ID.
registerBackend := func(name, address string) string {
node := &BackendNode{Name: name, NodeType: NodeTypeBackend, Address: address}
Expect(registry.Register(ctx, node, true)).To(Succeed())
fetched, err := registry.GetByName(ctx, name)
Expect(err).ToNot(HaveOccurred())
return fetched.ID
}
// setHeartbeat forces a node's last_heartbeat (Register/MarkOffline leave it
// at "now"; we age it to simulate a node that went silent a while ago).
setHeartbeat := func(nodeID string, t time.Time) {
Expect(registry.db.WithContext(ctx).Model(&BackendNode{}).
Where("id = ?", nodeID).
Update("last_heartbeat", t).Error).To(Succeed())
}
pendingCountFor := func(nodeID string) int64 {
var n int64
Expect(registry.db.WithContext(ctx).Model(&PendingBackendOp{}).
Where("node_id = ?", nodeID).Count(&n).Error).To(Succeed())
return n
}
It("clears ops behind an offline node whose heartbeat is past the grace window", func() {
dead := registerBackend("nvidia-thor", "10.0.0.9:50051")
Expect(registry.UpsertPendingBackendOp(ctx, dead, "llama-cpp-development", OpBackendInstall, nil)).To(Succeed())
Expect(registry.MarkOffline(ctx, dead)).To(Succeed())
setHeartbeat(dead, time.Now().Add(-1*time.Hour))
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(1)))
Expect(pendingCountFor(dead)).To(Equal(int64(0)))
})
It("clears ops behind a draining node immediately, even with a fresh heartbeat", func() {
// Mirrors the live mac-mini-m4 case: draining but still heartbeating.
drain := registerBackend("mac-mini-m4", "10.0.0.3:50051")
Expect(registry.UpsertPendingBackendOp(ctx, drain, "llama-cpp-development", OpBackendInstall, nil)).To(Succeed())
Expect(registry.MarkDraining(ctx, drain)).To(Succeed())
setHeartbeat(drain, time.Now()) // fresh heartbeat
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(1)))
Expect(pendingCountFor(drain)).To(Equal(int64(0)))
})
It("keeps ops behind a node that only just went offline (within grace)", func() {
blip := registerBackend("agx-orin", "10.0.0.4:50051")
Expect(registry.UpsertPendingBackendOp(ctx, blip, "parakeet-cpp-development", OpBackendInstall, nil)).To(Succeed())
Expect(registry.MarkOffline(ctx, blip)).To(Succeed())
setHeartbeat(blip, time.Now().Add(-1*time.Minute)) // gone only 1m, grace 10m
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(0)))
Expect(pendingCountFor(blip)).To(Equal(int64(1)))
})
It("keeps ops behind a healthy node", func() {
healthy := registerBackend("dgx-spark", "10.0.0.1:50051")
Expect(registry.UpsertPendingBackendOp(ctx, healthy, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed())
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(0)))
Expect(pendingCountFor(healthy)).To(Equal(int64(1)))
})
})

View File

@@ -189,6 +189,13 @@ func (rc *ReplicaReconciler) reconcileState(ctx context.Context) {
// passed on nodes that are currently healthy. On success the row is deleted;
// on failure attempts++ and next_retry_at moves out via exponential backoff.
func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// Garbage-collect ops behind nodes that went offline/draining. These are
// invisible to ListDuePendingBackendOps (which filters status=healthy), so
// without this sweep they leak forever and keep the UI operation spinning.
if _, err := rc.registry.DeleteStalePendingBackendOps(ctx, stalePendingBackendOpGrace); err != nil {
xlog.Warn("Reconciler: failed to clear stale pending backend ops", "error", err)
}
ops, err := rc.registry.ListDuePendingBackendOps(ctx)
if err != nil {
xlog.Warn("Reconciler: failed to list pending backend ops", "error", err)
@@ -293,6 +300,13 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// amount of further retrying will help.
const maxPendingBackendOpAttempts = 10
// stalePendingBackendOpGrace is how long a node may be offline before its
// pending backend ops are garbage-collected. Draining nodes are cleared
// immediately regardless of this window (see DeleteStalePendingBackendOps).
// ListDuePendingBackendOps never surfaces ops behind non-healthy nodes, so
// without this sweep they would leak forever and keep the UI op spinning.
const stalePendingBackendOpGrace = 15 * time.Minute
// probeLoadedModels gRPC-health-checks model addresses that the DB says are
// loaded. If a model's backend process is gone (OOM, crash, manual restart)
// we remove the row so ghosts don't linger. Only probes rows older than

View File

@@ -1776,6 +1776,31 @@ func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) erro
return nil
}
// DeleteStalePendingBackendOps garbage-collects pending backend ops whose target
// node can never drain them. ListDuePendingBackendOps only returns rows behind a
// StatusHealthy node, so ops behind a node that went offline or draining are
// otherwise never retried, aged out, or deleted — they leak forever and keep the
// UI operation spinning. Draining nodes are cleared immediately (an explicit
// admin action; their model rows are already purged). Offline nodes are cleared
// only once their last heartbeat is older than `grace`, so a brief heartbeat blip
// does not nuke an install that is still legitimately in flight. Returns the
// number of rows deleted.
func (r *NodeRegistry) DeleteStalePendingBackendOps(ctx context.Context, grace time.Duration) (int64, error) {
cutoff := time.Now().Add(-grace)
res := r.db.WithContext(ctx).
Where(`node_id IN (SELECT id FROM backend_nodes WHERE status = ?)
OR node_id IN (SELECT id FROM backend_nodes WHERE status = ? AND last_heartbeat <= ?)`,
StatusDraining, StatusOffline, cutoff).
Delete(&PendingBackendOp{})
if res.Error != nil {
return 0, fmt.Errorf("deleting stale pending backend ops: %w", res.Error)
}
if res.RowsAffected > 0 {
xlog.Info("Cleared pending backend ops behind non-healthy nodes", "deleted", res.RowsAffected)
}
return res.RowsAffected, nil
}
// RecordPendingBackendOpFailure bumps Attempts, captures the error, and
// pushes NextRetryAt out with exponential backoff capped at 15 minutes.
func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error {