diff --git a/core/services/nodes/pending_op_cleanup_test.go b/core/services/nodes/pending_op_cleanup_test.go new file mode 100644 index 000000000..e70337874 --- /dev/null +++ b/core/services/nodes/pending_op_cleanup_test.go @@ -0,0 +1,109 @@ +package nodes + +import ( + "context" + "runtime" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/testutil" +) + +// These specs reproduce the distributed "pending ops behind dead nodes leak +// forever" bug. ListDuePendingBackendOps only returns rows whose node is +// StatusHealthy, so an op queued against a node that goes offline (heartbeat +// stale) or draining (admin action) is never retried, never aged out, and +// never deleted. On a live cluster these rows sat at attempts=0 indefinitely +// and kept the UI operation alive. DeleteStalePendingBackendOps garbage-collects +// them: draining nodes immediately (models already purged), offline nodes only +// after a grace window so a brief heartbeat blip does not nuke in-flight work. +var _ = Describe("DeleteStalePendingBackendOps", func() { + var ( + registry *NodeRegistry + ctx context.Context + ) + + BeforeEach(func() { + if runtime.GOOS == "darwin" { + Skip("testcontainers requires Docker, not available on macOS CI") + } + db := testutil.SetupTestDB() + var err error + registry, err = NewNodeRegistry(db) + Expect(err).ToNot(HaveOccurred()) + ctx = context.Background() + }) + + // registerBackend registers an auto-approved backend node and returns its ID. + registerBackend := func(name, address string) string { + node := &BackendNode{Name: name, NodeType: NodeTypeBackend, Address: address} + Expect(registry.Register(ctx, node, true)).To(Succeed()) + fetched, err := registry.GetByName(ctx, name) + Expect(err).ToNot(HaveOccurred()) + return fetched.ID + } + + // setHeartbeat forces a node's last_heartbeat (Register/MarkOffline leave it + // at "now"; we age it to simulate a node that went silent a while ago). + setHeartbeat := func(nodeID string, t time.Time) { + Expect(registry.db.WithContext(ctx).Model(&BackendNode{}). + Where("id = ?", nodeID). + Update("last_heartbeat", t).Error).To(Succeed()) + } + + pendingCountFor := func(nodeID string) int64 { + var n int64 + Expect(registry.db.WithContext(ctx).Model(&PendingBackendOp{}). + Where("node_id = ?", nodeID).Count(&n).Error).To(Succeed()) + return n + } + + It("clears ops behind an offline node whose heartbeat is past the grace window", func() { + dead := registerBackend("nvidia-thor", "10.0.0.9:50051") + Expect(registry.UpsertPendingBackendOp(ctx, dead, "llama-cpp-development", OpBackendInstall, nil)).To(Succeed()) + Expect(registry.MarkOffline(ctx, dead)).To(Succeed()) + setHeartbeat(dead, time.Now().Add(-1*time.Hour)) + + removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(removed).To(Equal(int64(1))) + Expect(pendingCountFor(dead)).To(Equal(int64(0))) + }) + + It("clears ops behind a draining node immediately, even with a fresh heartbeat", func() { + // Mirrors the live mac-mini-m4 case: draining but still heartbeating. + drain := registerBackend("mac-mini-m4", "10.0.0.3:50051") + Expect(registry.UpsertPendingBackendOp(ctx, drain, "llama-cpp-development", OpBackendInstall, nil)).To(Succeed()) + Expect(registry.MarkDraining(ctx, drain)).To(Succeed()) + setHeartbeat(drain, time.Now()) // fresh heartbeat + + removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(removed).To(Equal(int64(1))) + Expect(pendingCountFor(drain)).To(Equal(int64(0))) + }) + + It("keeps ops behind a node that only just went offline (within grace)", func() { + blip := registerBackend("agx-orin", "10.0.0.4:50051") + Expect(registry.UpsertPendingBackendOp(ctx, blip, "parakeet-cpp-development", OpBackendInstall, nil)).To(Succeed()) + Expect(registry.MarkOffline(ctx, blip)).To(Succeed()) + setHeartbeat(blip, time.Now().Add(-1*time.Minute)) // gone only 1m, grace 10m + + removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(removed).To(Equal(int64(0))) + Expect(pendingCountFor(blip)).To(Equal(int64(1))) + }) + + It("keeps ops behind a healthy node", func() { + healthy := registerBackend("dgx-spark", "10.0.0.1:50051") + Expect(registry.UpsertPendingBackendOp(ctx, healthy, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed()) + + removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(removed).To(Equal(int64(0))) + Expect(pendingCountFor(healthy)).To(Equal(int64(1))) + }) +}) diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index bf3cfd18f..8d583176e 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -189,6 +189,13 @@ func (rc *ReplicaReconciler) reconcileState(ctx context.Context) { // passed on nodes that are currently healthy. On success the row is deleted; // on failure attempts++ and next_retry_at moves out via exponential backoff. func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { + // Garbage-collect ops behind nodes that went offline/draining. These are + // invisible to ListDuePendingBackendOps (which filters status=healthy), so + // without this sweep they leak forever and keep the UI operation spinning. + if _, err := rc.registry.DeleteStalePendingBackendOps(ctx, stalePendingBackendOpGrace); err != nil { + xlog.Warn("Reconciler: failed to clear stale pending backend ops", "error", err) + } + ops, err := rc.registry.ListDuePendingBackendOps(ctx) if err != nil { xlog.Warn("Reconciler: failed to list pending backend ops", "error", err) @@ -293,6 +300,13 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { // amount of further retrying will help. const maxPendingBackendOpAttempts = 10 +// stalePendingBackendOpGrace is how long a node may be offline before its +// pending backend ops are garbage-collected. Draining nodes are cleared +// immediately regardless of this window (see DeleteStalePendingBackendOps). +// ListDuePendingBackendOps never surfaces ops behind non-healthy nodes, so +// without this sweep they would leak forever and keep the UI op spinning. +const stalePendingBackendOpGrace = 15 * time.Minute + // probeLoadedModels gRPC-health-checks model addresses that the DB says are // loaded. If a model's backend process is gone (OOM, crash, manual restart) // we remove the row so ghosts don't linger. Only probes rows older than diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index 88e71b20e..6c7c97f12 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -1776,6 +1776,31 @@ func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) erro return nil } +// DeleteStalePendingBackendOps garbage-collects pending backend ops whose target +// node can never drain them. ListDuePendingBackendOps only returns rows behind a +// StatusHealthy node, so ops behind a node that went offline or draining are +// otherwise never retried, aged out, or deleted — they leak forever and keep the +// UI operation spinning. Draining nodes are cleared immediately (an explicit +// admin action; their model rows are already purged). Offline nodes are cleared +// only once their last heartbeat is older than `grace`, so a brief heartbeat blip +// does not nuke an install that is still legitimately in flight. Returns the +// number of rows deleted. +func (r *NodeRegistry) DeleteStalePendingBackendOps(ctx context.Context, grace time.Duration) (int64, error) { + cutoff := time.Now().Add(-grace) + res := r.db.WithContext(ctx). + Where(`node_id IN (SELECT id FROM backend_nodes WHERE status = ?) + OR node_id IN (SELECT id FROM backend_nodes WHERE status = ? AND last_heartbeat <= ?)`, + StatusDraining, StatusOffline, cutoff). + Delete(&PendingBackendOp{}) + if res.Error != nil { + return 0, fmt.Errorf("deleting stale pending backend ops: %w", res.Error) + } + if res.RowsAffected > 0 { + xlog.Info("Cleared pending backend ops behind non-healthy nodes", "deleted", res.RowsAffected) + } + return res.RowsAffected, nil +} + // RecordPendingBackendOpFailure bumps Attempts, captures the error, and // pushes NextRetryAt out with exponential backoff capped at 15 minutes. func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error {