From f0ab68e352eae6719f585da2c207d02055a55d3f Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 19 Apr 2026 08:34:57 +0000 Subject: [PATCH] feat(distributed): durable backend fan-out + state reconciliation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two connected problems handled together: 1) Backend delete/install/upgrade used to silently skip non-healthy nodes, so a delete during an outage left a zombie on the offline node once it returned. The fan-out now records intent in a new pending_backend_ops table before attempting the NATS round-trip. Currently-healthy nodes get an immediate attempt; everyone else is queued. Unique index on (node_id, backend, op) means reissuing the same operation refreshes next_retry_at instead of stacking duplicates. 2) Loaded-model state could drift from reality: a worker OOM'd, got killed, or restarted a backend process would leave a node_models row claiming the model was still loaded, feeding ghost entries into the /api/nodes/models listing and the router's scheduling decisions. The existing ReplicaReconciler gains two new passes that run under a fresh KeyStateReconciler advisory lock (non-blocking, so one wedged frontend doesn't freeze the cluster): - drainPendingBackendOps: retries queued ops whose next_retry_at has passed on currently-healthy nodes. Success deletes the row; failure bumps attempts and pushes next_retry_at out with exponential backoff (30s → 15m cap). ErrNoResponders also marks the node unhealthy. - probeLoadedModels: gRPC-HealthChecks addresses the DB thinks are loaded but hasn't seen touched in the last probeStaleAfter (2m). Unreachable addresses are removed from the registry. A pluggable ModelProber lets tests substitute a fake without standing up gRPC. DistributedBackendManager exposes DeleteBackendDetailed so the HTTP handler can surface per-node outcomes ("2 succeeded, 1 queued") to the UI in a follow-up commit; the existing DeleteBackend still returns error-only for callers that don't care about node breakdown. Multi-frontend safety: the state pass uses advisorylock.TryWithLockCtx on a new key so N frontends coordinate — the same pattern the health monitor and replica reconciler already rely on. Single-node mode runs both passes inline (adapter is nil, state drain is a no-op). Tests cover the upsert semantics, backoff math, the probe removing an unreachable model but keeping a reachable one, and filtering by probeStaleAfter. --- core/application/distributed.go | 20 +- core/services/advisorylock/keys.go | 1 + core/services/nodes/managers_distributed.go | 221 +++++++++++++------- core/services/nodes/reconciler.go | 188 +++++++++++++++-- core/services/nodes/reconciler_test.go | 135 ++++++++++++ core/services/nodes/registry.go | 143 ++++++++++++- 6 files changed, 614 insertions(+), 94 deletions(-) diff --git a/core/application/distributed.go b/core/application/distributed.go index 31e87fdab..26d56d121 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut DB: authDB, }) - // Create ReplicaReconciler for auto-scaling model replicas + // Create ReplicaReconciler for auto-scaling model replicas. Adapter + + // RegistrationToken feed the state-reconciliation passes: pending op + // drain uses the adapter, and model health probes use the token to auth + // against workers' gRPC HealthCheck. reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{ - Registry: registry, - Scheduler: router, - Unloader: remoteUnloader, - DB: authDB, - Interval: 30 * time.Second, - ScaleDownDelay: 5 * time.Minute, + Registry: registry, + Scheduler: router, + Unloader: remoteUnloader, + Adapter: remoteUnloader, + RegistrationToken: cfg.Distributed.RegistrationToken, + DB: authDB, + Interval: 30 * time.Second, + ScaleDownDelay: 5 * time.Minute, + ProbeStaleAfter: 2 * time.Minute, }) // Create ModelRouterAdapter to wire into ModelLoader diff --git a/core/services/advisorylock/keys.go b/core/services/advisorylock/keys.go index d5378a5d1..277817229 100644 --- a/core/services/advisorylock/keys.go +++ b/core/services/advisorylock/keys.go @@ -11,4 +11,5 @@ const ( KeyHealthCheck int64 = 104 KeySchemaMigrate int64 = 105 KeyBackendUpgradeCheck int64 = 106 + KeyStateReconciler int64 = 107 ) diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 0934a6282..e524756da 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -68,40 +68,147 @@ func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model } } +// NodeOpStatus is the per-node outcome of a backend lifecycle operation. +// Returned as part of BackendOpResult so the frontend can surface exactly +// what happened on each worker instead of a single joined error string. +type NodeOpStatus struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + Status string `json:"status"` // "success" | "queued" | "error" + Error string `json:"error,omitempty"` +} + +// BackendOpResult aggregates per-node outcomes. +type BackendOpResult struct { + Nodes []NodeOpStatus `json:"nodes"` +} + +// enqueueAndDrainBackendOp is the shared scaffolding for +// delete/install/upgrade. Every non-pending node gets a pending_backend_ops +// row (intent is durable even if the node is offline). Currently-healthy +// nodes get an immediate attempt; success deletes the row, failure records +// the error and leaves the row for the reconciler to retry. +// +// `apply` is the NATS round-trip for one node. Returning an error keeps the +// row in the queue and marks the per-node status as "error"; returning nil +// deletes the row and reports "success". For non-healthy nodes the status +// is "queued" — no attempt is made right now, reconciler will pick it up +// when the node returns. +func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) { + allNodes, err := d.registry.List(ctx) + if err != nil { + return BackendOpResult{}, err + } + + result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))} + for _, node := range allNodes { + // Pending nodes haven't been approved yet — no intent to apply. + if node.Status == StatusPending { + continue + } + if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { + xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "error", + Error: fmt.Sprintf("enqueue failed: %v", err), + }) + continue + } + + if node.Status != StatusHealthy { + // Intent is recorded; reconciler will retry when the node recovers. + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "queued", + Error: fmt.Sprintf("node %s, will retry when healthy", node.Status), + }) + continue + } + + applyErr := apply(node) + if applyErr == nil { + // Find the row we just upserted and delete it; cheap but requires + // a lookup since UpsertPendingBackendOp doesn't return the ID. + if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil { + xlog.Debug("Failed to clear pending backend op after success", "error", err) + } + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "success", + }) + continue + } + + // Record failure for backoff. If it's an ErrNoResponders, the node's + // gone AWOL — mark unhealthy so the router stops picking it too. + errMsg := applyErr.Error() + if errors.Is(applyErr, nats.ErrNoResponders) { + xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) + d.registry.MarkUnhealthy(ctx, node.ID) + } + if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil { + _ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg) + } + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg, + }) + } + return result, nil +} + +// findPendingRow looks up the ID of a pending_backend_ops row by its +// composite key. Used to hand off to RecordPendingBackendOpFailure / +// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same +// composite key. +func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) { + var row PendingBackendOp + if err := d.registry.db.WithContext(ctx). + Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op). + First(&row).Error; err != nil { + return 0, err + } + return row.ID, nil +} + +// deletePendingRow removes the queue row keyed by (nodeID, backend, op). +func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error { + return d.registry.db.WithContext(ctx). + Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op). + Delete(&PendingBackendOp{}).Error +} + +// DeleteBackend fans out backend deletion to every known node. The previous +// implementation silently skipped non-healthy nodes, which meant zombies +// reappeared once those nodes returned. Now the intent is durable — see +// enqueueAndDrainBackendOp — and the reconciler catches up later. func (d *DistributedBackendManager) DeleteBackend(name string) error { - // Try local deletion but ignore "not found" errors — in distributed mode - // the frontend node typically doesn't have backends installed locally; - // they only exist on worker nodes. + // Local delete first (frontend rarely has backends installed in + // distributed mode, but the gallery operation still expects it; ignore + // "not found" which is the common case). if err := d.local.DeleteBackend(name); err != nil { if !errors.Is(err, gallery.ErrBackendNotFound) { return err } xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name) } - // Fan out backend.delete to all healthy nodes - allNodes, listErr := d.registry.List(context.Background()) - if listErr != nil { - xlog.Warn("Failed to list nodes for backend deletion fan-out", "error", listErr) - return listErr + + ctx := context.Background() + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + _, err := d.adapter.DeleteBackend(node.ID, name) + return err + }) + return err +} + +// DeleteBackendDetailed is the per-node-result variant called by the HTTP +// handler so the UI can render a per-node status drawer. DeleteBackend still +// returns error-only for callers that don't care about node breakdown. +func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) { + if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) { + return BackendOpResult{}, err } - var errs []error - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } - if _, delErr := d.adapter.DeleteBackend(node.ID, name); delErr != nil { - if errors.Is(delErr, nats.ErrNoResponders) { - // Node's NATS subscription is gone — likely restarted with a new ID. - // Mark it unhealthy so future fan-outs skip it. - xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - xlog.Warn("Failed to propagate backend deletion to worker", "node", node.Name, "backend", name, "error", delErr) - errs = append(errs, fmt.Errorf("node %s: %w", node.Name, delErr)) - } - } - return errors.Join(errs...) + return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + _, err := d.adapter.DeleteBackend(node.ID, name) + return err + }) } // ListBackends aggregates installed backends from all worker nodes, preserving @@ -170,69 +277,43 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro return result, nil } -// InstallBackend fans out backend installation to all healthy worker nodes. +// InstallBackend fans out installation through the pending-ops queue so +// non-healthy nodes get retried when they come back instead of being silently +// skipped. Reply success from the NATS round-trip deletes the queue row; +// reply.Success==false is treated as an error so the row stays for retry. func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error { - allNodes, err := d.registry.List(context.Background()) - if err != nil { - return err - } - galleriesJSON, _ := json.Marshal(op.Galleries) backendName := op.GalleryElementName - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON)) if err != nil { - if errors.Is(err, nats.ErrNoResponders) { - xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - xlog.Warn("Failed to install backend on worker", "node", node.Name, "backend", backendName, "error", err) - continue + return err } if !reply.Success { - xlog.Warn("Backend install failed on worker", "node", node.Name, "backend", backendName, "error", reply.Error) + return fmt.Errorf("install failed: %s", reply.Error) } - } - return nil + return nil + }) + return err } -// UpgradeBackend fans out a backend upgrade to all healthy worker nodes. -// TODO: Add dedicated NATS subject for upgrade (currently reuses install with force flag) +// UpgradeBackend reuses the install NATS subject (the worker re-downloads +// from the gallery). Same queue semantics as Install/Delete. func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { - allNodes, err := d.registry.List(context.Background()) - if err != nil { - return err - } - galleriesJSON, _ := json.Marshal(d.backendGalleries) - var errs []error - for _, node := range allNodes { - if node.Status != StatusHealthy { - continue - } - // Reuse install endpoint which will re-download the backend (force mode) + _, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON)) if err != nil { - if errors.Is(err, nats.ErrNoResponders) { - xlog.Warn("No NATS responders for node during upgrade, marking unhealthy", "node", node.Name, "nodeID", node.ID) - d.registry.MarkUnhealthy(context.Background(), node.ID) - continue - } - errs = append(errs, fmt.Errorf("node %s: %w", node.Name, err)) - continue + return err } if !reply.Success { - errs = append(errs, fmt.Errorf("node %s: %s", node.Name, reply.Error)) + return fmt.Errorf("upgrade failed: %s", reply.Error) } - } - - return errors.Join(errs...) + return nil + }) + return err } // CheckUpgrades checks for available backend upgrades across the cluster. diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 92ba76edc..6d87165b4 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -3,26 +3,57 @@ package nodes import ( "context" "encoding/json" + "fmt" "time" "github.com/mudler/LocalAI/core/services/advisorylock" + grpcclient "github.com/mudler/LocalAI/pkg/grpc" "github.com/mudler/xlog" "gorm.io/gorm" ) +// ModelProber checks whether a model's backend process is still reachable. +// Defaulted to a gRPC health probe but overridable for tests so we don't +// need to stand up a real server. Returning false without an error means the +// process is reachable but unhealthy (same as a timeout for our purposes). +type ModelProber interface { + IsAlive(ctx context.Context, address string) bool +} + +// grpcModelProber does a 1s HealthCheck on the model's stored gRPC address. +type grpcModelProber struct{ token string } + +func (g grpcModelProber) IsAlive(ctx context.Context, address string) bool { + client := grpcclient.NewClientWithToken(address, false, nil, false, g.token) + probeCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + ok, _ := client.HealthCheck(probeCtx) + return ok +} + // ReplicaReconciler periodically ensures model replica counts match their // scheduling configs. It scales up replicas when below MinReplicas or when // all replicas are busy (up to MaxReplicas), and scales down idle replicas // above MinReplicas. // +// Alongside replica scaling it runs two state-reconciliation passes — draining +// the pending_backend_ops queue and probing loaded models' gRPC addresses to +// orphan ghosts. Both passes are wrapped in the KeyStateReconciler advisory +// lock so N frontends don't stampede. +// // Only processes models with auto-scaling enabled (MinReplicas > 0 or MaxReplicas > 0). type ReplicaReconciler struct { registry *NodeRegistry scheduler ModelScheduler // interface for scheduling new models unloader NodeCommandSender + adapter *RemoteUnloaderAdapter // NATS sender for pending-op drain + prober ModelProber // health probe for model gRPC addrs db *gorm.DB interval time.Duration scaleDownDelay time.Duration + // probeStaleAfter: only probe node_models rows older than this so we + // don't hammer every worker every tick for models we just heard from. + probeStaleAfter time.Duration } // ModelScheduler abstracts the scheduling logic needed by the reconciler. @@ -35,12 +66,21 @@ type ModelScheduler interface { // ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler. type ReplicaReconcilerOptions struct { - Registry *NodeRegistry - Scheduler ModelScheduler - Unloader NodeCommandSender - DB *gorm.DB - Interval time.Duration // default 30s - ScaleDownDelay time.Duration // default 5m + Registry *NodeRegistry + Scheduler ModelScheduler + Unloader NodeCommandSender + // Adapter is the NATS sender used to retry pending backend ops. When nil, + // the state-reconciler pending-drain pass is a no-op (single-node mode). + Adapter *RemoteUnloaderAdapter + // RegistrationToken is used by the default gRPC prober when probing model + // addresses. Matches the worker's token so HealthCheck auth succeeds. + RegistrationToken string + // Prober overrides the default gRPC health probe (used by tests). + Prober ModelProber + DB *gorm.DB + Interval time.Duration // default 30s + ScaleDownDelay time.Duration // default 5m + ProbeStaleAfter time.Duration // default 2m } // NewReplicaReconciler creates a new ReplicaReconciler. @@ -53,13 +93,24 @@ func NewReplicaReconciler(opts ReplicaReconcilerOptions) *ReplicaReconciler { if scaleDownDelay == 0 { scaleDownDelay = 5 * time.Minute } + probeStaleAfter := opts.ProbeStaleAfter + if probeStaleAfter == 0 { + probeStaleAfter = 2 * time.Minute + } + prober := opts.Prober + if prober == nil { + prober = grpcModelProber{token: opts.RegistrationToken} + } return &ReplicaReconciler{ - registry: opts.Registry, - scheduler: opts.Scheduler, - unloader: opts.Unloader, - db: opts.DB, - interval: interval, - scaleDownDelay: scaleDownDelay, + registry: opts.Registry, + scheduler: opts.Scheduler, + unloader: opts.Unloader, + adapter: opts.Adapter, + prober: prober, + db: opts.DB, + interval: interval, + scaleDownDelay: scaleDownDelay, + probeStaleAfter: probeStaleAfter, } } @@ -78,17 +129,122 @@ func (rc *ReplicaReconciler) Run(ctx context.Context) { } } -// reconcileOnce performs a single reconciliation pass. -// Uses an advisory lock so only one frontend instance reconciles at a time. +// reconcileOnce performs a single reconciliation pass. Replica work and +// state-reconciliation work run under *different* advisory locks so multiple +// frontends can share load across passes, and one long-running pass doesn't +// block the other forever if a frontend wedges. func (rc *ReplicaReconciler) reconcileOnce(ctx context.Context) { if rc.db != nil { - lockKey := advisorylock.KeyFromString("replica-reconciler") - _ = advisorylock.WithLockCtx(ctx, rc.db, lockKey, func() error { + replicaKey := advisorylock.KeyFromString("replica-reconciler") + _ = advisorylock.WithLockCtx(ctx, rc.db, replicaKey, func() error { rc.reconcile(ctx) return nil }) + // Try, don't block: if another frontend is already running the state + // pass, this tick is a no-op. Matches the health monitor pattern. + _, _ = advisorylock.TryWithLockCtx(ctx, rc.db, advisorylock.KeyStateReconciler, func() error { + rc.reconcileState(ctx) + return nil + }) } else { rc.reconcile(ctx) + rc.reconcileState(ctx) + } +} + +// reconcileState runs the state-reconciliation passes: drain pending backend +// ops for freshly-healthy nodes, then probe model gRPC addresses to orphan +// ghosts. Both passes are best-effort: a failure on one node doesn't stop +// the rest. +func (rc *ReplicaReconciler) reconcileState(ctx context.Context) { + if rc.adapter != nil { + rc.drainPendingBackendOps(ctx) + } + rc.probeLoadedModels(ctx) +} + +// drainPendingBackendOps retries queued backend ops whose next_retry_at has +// passed on nodes that are currently healthy. On success the row is deleted; +// on failure attempts++ and next_retry_at moves out via exponential backoff. +func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { + ops, err := rc.registry.ListDuePendingBackendOps(ctx) + if err != nil { + xlog.Warn("Reconciler: failed to list pending backend ops", "error", err) + return + } + if len(ops) == 0 { + return + } + xlog.Debug("Reconciler: draining pending backend ops", "count", len(ops)) + + for _, op := range ops { + if err := ctx.Err(); err != nil { + return + } + var applyErr error + switch op.Op { + case OpBackendDelete: + _, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend) + case OpBackendInstall, OpBackendUpgrade: + reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries)) + if err != nil { + applyErr = err + } else if !reply.Success { + applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error) + } + default: + xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID) + continue + } + + if applyErr == nil { + if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil { + xlog.Warn("Reconciler: failed to delete drained op row", "id", op.ID, "error", err) + } else { + xlog.Info("Reconciler: pending backend op applied", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1) + } + continue + } + _ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error()) + xlog.Warn("Reconciler: pending backend op retry failed", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr) + } +} + +// probeLoadedModels gRPC-health-checks model addresses that the DB says are +// loaded. If a model's backend process is gone (OOM, crash, manual restart) +// we remove the row so ghosts don't linger. Only probes rows older than +// probeStaleAfter so we don't hammer every worker every tick for models we +// just heard from. +func (rc *ReplicaReconciler) probeLoadedModels(ctx context.Context) { + var stale []NodeModel + cutoff := time.Now().Add(-rc.probeStaleAfter) + err := rc.registry.db.WithContext(ctx). + Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id"). + Where("node_models.state = ? AND backend_nodes.status = ? AND node_models.updated_at < ? AND node_models.address != ''", + "loaded", StatusHealthy, cutoff). + Find(&stale).Error + if err != nil { + xlog.Warn("Reconciler: failed to list loaded models for probe", "error", err) + return + } + for _, m := range stale { + if err := ctx.Err(); err != nil { + return + } + if rc.prober.IsAlive(ctx, m.Address) { + // Bump updated_at so we don't probe this row again immediately. + _ = rc.registry.db.WithContext(ctx).Model(&NodeModel{}). + Where("id = ?", m.ID).Update("updated_at", time.Now()).Error + continue + } + if err := rc.registry.RemoveNodeModel(ctx, m.NodeID, m.ModelName); err != nil { + xlog.Warn("Reconciler: failed to remove unreachable model", "node", m.NodeID, "model", m.ModelName, "error", err) + continue + } + xlog.Warn("Reconciler: model unreachable, removed from registry", + "node", m.NodeID, "model", m.ModelName, "address", m.Address) } } diff --git a/core/services/nodes/reconciler_test.go b/core/services/nodes/reconciler_test.go index e95f8bcea..52a488a2a 100644 --- a/core/services/nodes/reconciler_test.go +++ b/core/services/nodes/reconciler_test.go @@ -239,3 +239,138 @@ var _ = Describe("ReplicaReconciler", func() { }) }) }) + +// fakeProber lets tests control whether a model's gRPC address "responds". +type fakeProber struct { + alive map[string]bool + calls int +} + +func (f *fakeProber) IsAlive(_ context.Context, address string) bool { + f.calls++ + if f.alive == nil { + return false + } + return f.alive[address] +} + +var _ = Describe("ReplicaReconciler — state reconciliation", func() { + var ( + db *gorm.DB + registry *NodeRegistry + ) + + BeforeEach(func() { + if runtime.GOOS == "darwin" { + Skip("testcontainers requires Docker, not available on macOS CI") + } + db = testutil.SetupTestDB() + var err error + registry, err = NewNodeRegistry(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("probeLoadedModels", func() { + It("removes loaded models whose gRPC address is unreachable", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + // Two loaded models — one stale (will probe), one fresh (skipped). + stale := &NodeModel{ + ID: "stale-1", + NodeID: node.ID, + ModelName: "stale-model", + Address: "10.0.0.1:12345", + State: "loaded", + UpdatedAt: time.Now().Add(-5 * time.Minute), + } + fresh := &NodeModel{ + ID: "fresh-1", + NodeID: node.ID, + ModelName: "fresh-model", + Address: "10.0.0.1:54321", + State: "loaded", + UpdatedAt: time.Now(), // within probeStaleAfter + } + Expect(db.Create(stale).Error).To(Succeed()) + Expect(db.Create(fresh).Error).To(Succeed()) + + prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": false}} + rc := NewReplicaReconciler(ReplicaReconcilerOptions{ + Registry: registry, + DB: db, + Prober: prober, + ProbeStaleAfter: 2 * time.Minute, + }) + + rc.probeLoadedModels(context.Background()) + + // Stale was unreachable — row removed. + var after []NodeModel + Expect(db.Find(&after).Error).To(Succeed()) + Expect(after).To(HaveLen(1)) + Expect(after[0].ModelName).To(Equal("fresh-model")) + // Prober was only called once (the fresh row was filtered out). + Expect(prober.calls).To(Equal(1)) + }) + + It("keeps reachable models and bumps their updated_at", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + stale := &NodeModel{ + ID: "stale-2", + NodeID: node.ID, + ModelName: "alive-model", + Address: "10.0.0.1:12345", + State: "loaded", + UpdatedAt: time.Now().Add(-5 * time.Minute), + } + Expect(db.Create(stale).Error).To(Succeed()) + + prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": true}} + rc := NewReplicaReconciler(ReplicaReconcilerOptions{ + Registry: registry, + DB: db, + Prober: prober, + ProbeStaleAfter: 2 * time.Minute, + }) + + rc.probeLoadedModels(context.Background()) + + var after NodeModel + Expect(db.First(&after, "id = ?", "stale-2").Error).To(Succeed()) + Expect(after.UpdatedAt).To(BeTemporally("~", time.Now(), time.Second)) + }) + }) + + Describe("UpsertPendingBackendOp + RecordPendingBackendOpFailure", func() { + It("upserts on the composite key rather than duplicating rows", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + // Second call for the same (node, backend, op) should not create a + // new row — that's how re-issuing a delete works. + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + + var rows []PendingBackendOp + Expect(db.Find(&rows).Error).To(Succeed()) + Expect(rows).To(HaveLen(1)) + }) + + It("increments attempts and moves next_retry_at out on failure", func() { + node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"} + Expect(registry.Register(context.Background(), node, true)).To(Succeed()) + Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed()) + + var row PendingBackendOp + Expect(db.First(&row).Error).To(Succeed()) + before := row.NextRetryAt + + Expect(registry.RecordPendingBackendOpFailure(context.Background(), row.ID, "boom")).To(Succeed()) + Expect(db.First(&row, row.ID).Error).To(Succeed()) + Expect(row.Attempts).To(Equal(1)) + Expect(row.LastError).To(Equal("boom")) + Expect(row.NextRetryAt).To(BeTemporally(">", before)) + }) + }) +}) diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index f56fcedcc..3894b41c5 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -104,6 +104,36 @@ type NodeWithExtras struct { Labels map[string]string `json:"labels,omitempty"` } +// PendingBackendOp is a durable intent for a backend lifecycle operation +// (delete/install/upgrade) that needs to eventually apply on a specific node. +// +// Without this table, a backend delete against an offline node silently +// dropped: the frontend skipped the node, the node came back later with the +// backend still installed, and the operator saw a zombie. Now the intent is +// recorded regardless of node status; the state reconciler drains the queue +// whenever a node is healthy and removes the row on success. Reissuing the +// same operation while a row exists updates NextRetryAt instead of stacking +// duplicates (see the unique index). +type PendingBackendOp struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + NodeID string `gorm:"index;size:36;not null;uniqueIndex:idx_pending_backend_op,priority:1" json:"node_id"` + Backend string `gorm:"index;size:255;not null;uniqueIndex:idx_pending_backend_op,priority:2" json:"backend"` + Op string `gorm:"size:16;not null;uniqueIndex:idx_pending_backend_op,priority:3" json:"op"` + Galleries []byte `gorm:"type:bytea" json:"-"` // serialized JSON for install/upgrade retries + Attempts int `gorm:"default:0" json:"attempts"` + LastError string `gorm:"type:text" json:"last_error,omitempty"` + CreatedAt time.Time `json:"created_at"` + NextRetryAt time.Time `gorm:"index" json:"next_retry_at"` +} + +// Op constants mirror the operation names used by DistributedBackendManager +// so callers don't repeat stringly-typed values. +const ( + OpBackendDelete = "delete" + OpBackendInstall = "install" + OpBackendUpgrade = "upgrade" +) + // NodeRegistry manages backend node registration and lookup in PostgreSQL. type NodeRegistry struct { db *gorm.DB @@ -114,7 +144,7 @@ type NodeRegistry struct { // when multiple instances (frontend + workers) start at the same time. func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) { if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { - return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}) + return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{}) }); err != nil { return nil, fmt.Errorf("migrating node tables: %w", err) } @@ -946,3 +976,114 @@ func (r *NodeRegistry) ApplyAutoLabels(ctx context.Context, nodeID string, node _ = r.SetNodeLabel(ctx, nodeID, "node.name", node.Name) } } + +// UpsertPendingBackendOp records or refreshes a pending backend operation for +// a node. If a row already exists for (nodeID, backend, op) we keep its +// Attempts/LastError but reset NextRetryAt to now, so reissuing the same +// delete/upgrade nudges it to the front of the queue instead of stacking a +// duplicate intent. +func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backend, op string, galleries []byte) error { + row := PendingBackendOp{ + NodeID: nodeID, + Backend: backend, + Op: op, + Galleries: galleries, + NextRetryAt: time.Now(), + } + return r.db.WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}}, + DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}), + }).Create(&row).Error +} + +// ListDuePendingBackendOps returns queued ops whose NextRetryAt has passed +// AND whose node is currently healthy. The reconciler drains this list; we +// filter by node status in the query so a tick doesn't hammer NATS for +// nodes that obviously can't answer. +func (r *NodeRegistry) ListDuePendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) { + var ops []PendingBackendOp + err := r.db.WithContext(ctx). + Joins("JOIN backend_nodes ON backend_nodes.id = pending_backend_ops.node_id"). + Where("pending_backend_ops.next_retry_at <= ? AND backend_nodes.status = ?", time.Now(), StatusHealthy). + Order("pending_backend_ops.next_retry_at ASC"). + Find(&ops).Error + if err != nil { + return nil, fmt.Errorf("listing due pending backend ops: %w", err) + } + return ops, nil +} + +// ListPendingBackendOps returns every queued row (for the UI "pending on N +// nodes" chip and the pre-delete ConfirmDialog). +func (r *NodeRegistry) ListPendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) { + var ops []PendingBackendOp + if err := r.db.WithContext(ctx).Order("backend ASC, created_at ASC").Find(&ops).Error; err != nil { + return nil, fmt.Errorf("listing pending backend ops: %w", err) + } + return ops, nil +} + +// DeletePendingBackendOp removes a queue row — called after the op succeeds. +func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) error { + if err := r.db.WithContext(ctx).Delete(&PendingBackendOp{}, id).Error; err != nil { + return fmt.Errorf("deleting pending backend op %d: %w", id, err) + } + return nil +} + +// RecordPendingBackendOpFailure bumps Attempts, captures the error, and +// pushes NextRetryAt out with exponential backoff capped at 15 minutes. +func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error { + return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var row PendingBackendOp + if err := tx.First(&row, id).Error; err != nil { + return err + } + row.Attempts++ + row.LastError = errMsg + row.NextRetryAt = time.Now().Add(backoffForAttempt(row.Attempts)) + return tx.Save(&row).Error + }) +} + +// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The +// reconciler tick is 30s so anything shorter would just re-fire immediately. +func backoffForAttempt(attempts int) time.Duration { + const cap = 15 * time.Minute + base := 30 * time.Second + shift := attempts - 1 + if shift < 0 { + shift = 0 + } + if shift > 10 { // 2^10 * 30s already exceeds the cap + shift = 10 + } + d := base << shift + if d > cap { + return cap + } + return d +} + +// CountPendingBackendOpsByBackend returns a map of backend name to the count +// of pending rows. Used to decorate Manage → Backends with a "pending on N +// nodes" chip without exposing the full queue. +func (r *NodeRegistry) CountPendingBackendOpsByBackend(ctx context.Context) (map[string]int, error) { + type row struct { + Backend string + Count int + } + var rows []row + err := r.db.WithContext(ctx).Model(&PendingBackendOp{}). + Select("backend, COUNT(*) as count"). + Group("backend"). + Scan(&rows).Error + if err != nil { + return nil, fmt.Errorf("counting pending backend ops: %w", err) + } + out := make(map[string]int, len(rows)) + for _, r := range rows { + out[r.Backend] = r.Count + } + return out, nil +}