feat(distributed): durable backend fan-out + state reconciliation

Two connected problems handled together:

1) Backend delete/install/upgrade used to silently skip non-healthy nodes,
   so a delete during an outage left a zombie on the offline node once it
   returned. The fan-out now records intent in a new pending_backend_ops
   table before attempting the NATS round-trip. Currently-healthy nodes
   get an immediate attempt; everyone else is queued. Unique index on
   (node_id, backend, op) means reissuing the same operation refreshes
   next_retry_at instead of stacking duplicates.

2) Loaded-model state could drift from reality: a worker OOM'd, got
   killed, or restarted a backend process would leave a node_models row
   claiming the model was still loaded, feeding ghost entries into the
   /api/nodes/models listing and the router's scheduling decisions.

The existing ReplicaReconciler gains two new passes that run under a
fresh KeyStateReconciler advisory lock (non-blocking, so one wedged
frontend doesn't freeze the cluster):

  - drainPendingBackendOps: retries queued ops whose next_retry_at has
    passed on currently-healthy nodes. Success deletes the row; failure
    bumps attempts and pushes next_retry_at out with exponential backoff
    (30s → 15m cap). ErrNoResponders also marks the node unhealthy.

  - probeLoadedModels: gRPC-HealthChecks addresses the DB thinks are
    loaded but hasn't seen touched in the last probeStaleAfter (2m).
    Unreachable addresses are removed from the registry. A pluggable
    ModelProber lets tests substitute a fake without standing up gRPC.

DistributedBackendManager exposes DeleteBackendDetailed so the HTTP
handler can surface per-node outcomes ("2 succeeded, 1 queued") to the
UI in a follow-up commit; the existing DeleteBackend still returns
error-only for callers that don't care about node breakdown.

Multi-frontend safety: the state pass uses advisorylock.TryWithLockCtx
on a new key so N frontends coordinate — the same pattern the health
monitor and replica reconciler already rely on. Single-node mode runs
both passes inline (adapter is nil, state drain is a no-op).

Tests cover the upsert semantics, backoff math, the probe removing an
unreachable model but keeping a reachable one, and filtering by
probeStaleAfter.
This commit is contained in:
Ettore Di Giacinto
2026-04-19 08:34:57 +00:00
parent 9373de9f9b
commit f0ab68e352
6 changed files with 614 additions and 94 deletions

View File

@@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut
DB: authDB,
})
// Create ReplicaReconciler for auto-scaling model replicas
// Create ReplicaReconciler for auto-scaling model replicas. Adapter +
// RegistrationToken feed the state-reconciliation passes: pending op
// drain uses the adapter, and model health probes use the token to auth
// against workers' gRPC HealthCheck.
reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
Adapter: remoteUnloader,
RegistrationToken: cfg.Distributed.RegistrationToken,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
ProbeStaleAfter: 2 * time.Minute,
})
// Create ModelRouterAdapter to wire into ModelLoader

View File

@@ -11,4 +11,5 @@ const (
KeyHealthCheck int64 = 104
KeySchemaMigrate int64 = 105
KeyBackendUpgradeCheck int64 = 106
KeyStateReconciler int64 = 107
)

View File

@@ -68,40 +68,147 @@ func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model
}
}
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
// Returned as part of BackendOpResult so the frontend can surface exactly
// what happened on each worker instead of a single joined error string.
type NodeOpStatus struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"` // "success" | "queued" | "error"
Error string `json:"error,omitempty"`
}
// BackendOpResult aggregates per-node outcomes.
type BackendOpResult struct {
Nodes []NodeOpStatus `json:"nodes"`
}
// enqueueAndDrainBackendOp is the shared scaffolding for
// delete/install/upgrade. Every non-pending node gets a pending_backend_ops
// row (intent is durable even if the node is offline). Currently-healthy
// nodes get an immediate attempt; success deletes the row, failure records
// the error and leaves the row for the reconciler to retry.
//
// `apply` is the NATS round-trip for one node. Returning an error keeps the
// row in the queue and marks the per-node status as "error"; returning nil
// deletes the row and reports "success". For non-healthy nodes the status
// is "queued" — no attempt is made right now, reconciler will pick it up
// when the node returns.
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet — no intent to apply.
if node.Status == StatusPending {
continue
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
})
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
})
continue
}
applyErr := apply(node)
if applyErr == nil {
// Find the row we just upserted and delete it; cheap but requires
// a lookup since UpsertPendingBackendOp doesn't return the ID.
if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil {
xlog.Debug("Failed to clear pending backend op after success", "error", err)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "success",
})
continue
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL — mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
}
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
})
}
return result, nil
}
// findPendingRow looks up the ID of a pending_backend_ops row by its
// composite key. Used to hand off to RecordPendingBackendOpFailure /
// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same
// composite key.
func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) {
var row PendingBackendOp
if err := d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
First(&row).Error; err != nil {
return 0, err
}
return row.ID, nil
}
// deletePendingRow removes the queue row keyed by (nodeID, backend, op).
func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error {
return d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
Delete(&PendingBackendOp{}).Error
}
// DeleteBackend fans out backend deletion to every known node. The previous
// implementation silently skipped non-healthy nodes, which meant zombies
// reappeared once those nodes returned. Now the intent is durable — see
// enqueueAndDrainBackendOp — and the reconciler catches up later.
func (d *DistributedBackendManager) DeleteBackend(name string) error {
// Try local deletion but ignore "not found" errors — in distributed mode
// the frontend node typically doesn't have backends installed locally;
// they only exist on worker nodes.
// Local delete first (frontend rarely has backends installed in
// distributed mode, but the gallery operation still expects it; ignore
// "not found" which is the common case).
if err := d.local.DeleteBackend(name); err != nil {
if !errors.Is(err, gallery.ErrBackendNotFound) {
return err
}
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
}
// Fan out backend.delete to all healthy nodes
allNodes, listErr := d.registry.List(context.Background())
if listErr != nil {
xlog.Warn("Failed to list nodes for backend deletion fan-out", "error", listErr)
return listErr
ctx := context.Background()
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
_, err := d.adapter.DeleteBackend(node.ID, name)
return err
})
return err
}
// DeleteBackendDetailed is the per-node-result variant called by the HTTP
// handler so the UI can render a per-node status drawer. DeleteBackend still
// returns error-only for callers that don't care about node breakdown.
func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) {
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
var errs []error
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
if _, delErr := d.adapter.DeleteBackend(node.ID, name); delErr != nil {
if errors.Is(delErr, nats.ErrNoResponders) {
// Node's NATS subscription is gone — likely restarted with a new ID.
// Mark it unhealthy so future fan-outs skip it.
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to propagate backend deletion to worker", "node", node.Name, "backend", name, "error", delErr)
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, delErr))
}
}
return errors.Join(errs...)
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
_, err := d.adapter.DeleteBackend(node.ID, name)
return err
})
}
// ListBackends aggregates installed backends from all worker nodes, preserving
@@ -170,69 +277,43 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
return result, nil
}
// InstallBackend fans out backend installation to all healthy worker nodes.
// InstallBackend fans out installation through the pending-ops queue so
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
// reply.Success==false is treated as an error so the row stays for retry.
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
allNodes, err := d.registry.List(context.Background())
if err != nil {
return err
}
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON))
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to install backend on worker", "node", node.Name, "backend", backendName, "error", err)
continue
return err
}
if !reply.Success {
xlog.Warn("Backend install failed on worker", "node", node.Name, "backend", backendName, "error", reply.Error)
return fmt.Errorf("install failed: %s", reply.Error)
}
}
return nil
return nil
})
return err
}
// UpgradeBackend fans out a backend upgrade to all healthy worker nodes.
// TODO: Add dedicated NATS subject for upgrade (currently reuses install with force flag)
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
// from the gallery). Same queue semantics as Install/Delete.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
allNodes, err := d.registry.List(context.Background())
if err != nil {
return err
}
galleriesJSON, _ := json.Marshal(d.backendGalleries)
var errs []error
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
// Reuse install endpoint which will re-download the backend (force mode)
_, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON))
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node during upgrade, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, err))
continue
return err
}
if !reply.Success {
errs = append(errs, fmt.Errorf("node %s: %s", node.Name, reply.Error))
return fmt.Errorf("upgrade failed: %s", reply.Error)
}
}
return errors.Join(errs...)
return nil
})
return err
}
// CheckUpgrades checks for available backend upgrades across the cluster.

View File

@@ -3,26 +3,57 @@ package nodes
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/advisorylock"
grpcclient "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/xlog"
"gorm.io/gorm"
)
// ModelProber checks whether a model's backend process is still reachable.
// Defaulted to a gRPC health probe but overridable for tests so we don't
// need to stand up a real server. Returning false without an error means the
// process is reachable but unhealthy (same as a timeout for our purposes).
type ModelProber interface {
IsAlive(ctx context.Context, address string) bool
}
// grpcModelProber does a 1s HealthCheck on the model's stored gRPC address.
type grpcModelProber struct{ token string }
func (g grpcModelProber) IsAlive(ctx context.Context, address string) bool {
client := grpcclient.NewClientWithToken(address, false, nil, false, g.token)
probeCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
ok, _ := client.HealthCheck(probeCtx)
return ok
}
// ReplicaReconciler periodically ensures model replica counts match their
// scheduling configs. It scales up replicas when below MinReplicas or when
// all replicas are busy (up to MaxReplicas), and scales down idle replicas
// above MinReplicas.
//
// Alongside replica scaling it runs two state-reconciliation passes — draining
// the pending_backend_ops queue and probing loaded models' gRPC addresses to
// orphan ghosts. Both passes are wrapped in the KeyStateReconciler advisory
// lock so N frontends don't stampede.
//
// Only processes models with auto-scaling enabled (MinReplicas > 0 or MaxReplicas > 0).
type ReplicaReconciler struct {
registry *NodeRegistry
scheduler ModelScheduler // interface for scheduling new models
unloader NodeCommandSender
adapter *RemoteUnloaderAdapter // NATS sender for pending-op drain
prober ModelProber // health probe for model gRPC addrs
db *gorm.DB
interval time.Duration
scaleDownDelay time.Duration
// probeStaleAfter: only probe node_models rows older than this so we
// don't hammer every worker every tick for models we just heard from.
probeStaleAfter time.Duration
}
// ModelScheduler abstracts the scheduling logic needed by the reconciler.
@@ -35,12 +66,21 @@ type ModelScheduler interface {
// ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler.
type ReplicaReconcilerOptions struct {
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
// Adapter is the NATS sender used to retry pending backend ops. When nil,
// the state-reconciler pending-drain pass is a no-op (single-node mode).
Adapter *RemoteUnloaderAdapter
// RegistrationToken is used by the default gRPC prober when probing model
// addresses. Matches the worker's token so HealthCheck auth succeeds.
RegistrationToken string
// Prober overrides the default gRPC health probe (used by tests).
Prober ModelProber
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
ProbeStaleAfter time.Duration // default 2m
}
// NewReplicaReconciler creates a new ReplicaReconciler.
@@ -53,13 +93,24 @@ func NewReplicaReconciler(opts ReplicaReconcilerOptions) *ReplicaReconciler {
if scaleDownDelay == 0 {
scaleDownDelay = 5 * time.Minute
}
probeStaleAfter := opts.ProbeStaleAfter
if probeStaleAfter == 0 {
probeStaleAfter = 2 * time.Minute
}
prober := opts.Prober
if prober == nil {
prober = grpcModelProber{token: opts.RegistrationToken}
}
return &ReplicaReconciler{
registry: opts.Registry,
scheduler: opts.Scheduler,
unloader: opts.Unloader,
db: opts.DB,
interval: interval,
scaleDownDelay: scaleDownDelay,
registry: opts.Registry,
scheduler: opts.Scheduler,
unloader: opts.Unloader,
adapter: opts.Adapter,
prober: prober,
db: opts.DB,
interval: interval,
scaleDownDelay: scaleDownDelay,
probeStaleAfter: probeStaleAfter,
}
}
@@ -78,17 +129,122 @@ func (rc *ReplicaReconciler) Run(ctx context.Context) {
}
}
// reconcileOnce performs a single reconciliation pass.
// Uses an advisory lock so only one frontend instance reconciles at a time.
// reconcileOnce performs a single reconciliation pass. Replica work and
// state-reconciliation work run under *different* advisory locks so multiple
// frontends can share load across passes, and one long-running pass doesn't
// block the other forever if a frontend wedges.
func (rc *ReplicaReconciler) reconcileOnce(ctx context.Context) {
if rc.db != nil {
lockKey := advisorylock.KeyFromString("replica-reconciler")
_ = advisorylock.WithLockCtx(ctx, rc.db, lockKey, func() error {
replicaKey := advisorylock.KeyFromString("replica-reconciler")
_ = advisorylock.WithLockCtx(ctx, rc.db, replicaKey, func() error {
rc.reconcile(ctx)
return nil
})
// Try, don't block: if another frontend is already running the state
// pass, this tick is a no-op. Matches the health monitor pattern.
_, _ = advisorylock.TryWithLockCtx(ctx, rc.db, advisorylock.KeyStateReconciler, func() error {
rc.reconcileState(ctx)
return nil
})
} else {
rc.reconcile(ctx)
rc.reconcileState(ctx)
}
}
// reconcileState runs the state-reconciliation passes: drain pending backend
// ops for freshly-healthy nodes, then probe model gRPC addresses to orphan
// ghosts. Both passes are best-effort: a failure on one node doesn't stop
// the rest.
func (rc *ReplicaReconciler) reconcileState(ctx context.Context) {
if rc.adapter != nil {
rc.drainPendingBackendOps(ctx)
}
rc.probeLoadedModels(ctx)
}
// drainPendingBackendOps retries queued backend ops whose next_retry_at has
// passed on nodes that are currently healthy. On success the row is deleted;
// on failure attempts++ and next_retry_at moves out via exponential backoff.
func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
ops, err := rc.registry.ListDuePendingBackendOps(ctx)
if err != nil {
xlog.Warn("Reconciler: failed to list pending backend ops", "error", err)
return
}
if len(ops) == 0 {
return
}
xlog.Debug("Reconciler: draining pending backend ops", "count", len(ops))
for _, op := range ops {
if err := ctx.Err(); err != nil {
return
}
var applyErr error
switch op.Op {
case OpBackendDelete:
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
case OpBackendInstall, OpBackendUpgrade:
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries))
if err != nil {
applyErr = err
} else if !reply.Success {
applyErr = fmt.Errorf("%s failed: %s", op.Op, reply.Error)
}
default:
xlog.Warn("Reconciler: unknown pending op", "op", op.Op, "id", op.ID)
continue
}
if applyErr == nil {
if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil {
xlog.Warn("Reconciler: failed to delete drained op row", "id", op.ID, "error", err)
} else {
xlog.Info("Reconciler: pending backend op applied",
"op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1)
}
continue
}
_ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error())
xlog.Warn("Reconciler: pending backend op retry failed",
"op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr)
}
}
// probeLoadedModels gRPC-health-checks model addresses that the DB says are
// loaded. If a model's backend process is gone (OOM, crash, manual restart)
// we remove the row so ghosts don't linger. Only probes rows older than
// probeStaleAfter so we don't hammer every worker every tick for models we
// just heard from.
func (rc *ReplicaReconciler) probeLoadedModels(ctx context.Context) {
var stale []NodeModel
cutoff := time.Now().Add(-rc.probeStaleAfter)
err := rc.registry.db.WithContext(ctx).
Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id").
Where("node_models.state = ? AND backend_nodes.status = ? AND node_models.updated_at < ? AND node_models.address != ''",
"loaded", StatusHealthy, cutoff).
Find(&stale).Error
if err != nil {
xlog.Warn("Reconciler: failed to list loaded models for probe", "error", err)
return
}
for _, m := range stale {
if err := ctx.Err(); err != nil {
return
}
if rc.prober.IsAlive(ctx, m.Address) {
// Bump updated_at so we don't probe this row again immediately.
_ = rc.registry.db.WithContext(ctx).Model(&NodeModel{}).
Where("id = ?", m.ID).Update("updated_at", time.Now()).Error
continue
}
if err := rc.registry.RemoveNodeModel(ctx, m.NodeID, m.ModelName); err != nil {
xlog.Warn("Reconciler: failed to remove unreachable model", "node", m.NodeID, "model", m.ModelName, "error", err)
continue
}
xlog.Warn("Reconciler: model unreachable, removed from registry",
"node", m.NodeID, "model", m.ModelName, "address", m.Address)
}
}

View File

@@ -239,3 +239,138 @@ var _ = Describe("ReplicaReconciler", func() {
})
})
})
// fakeProber lets tests control whether a model's gRPC address "responds".
type fakeProber struct {
alive map[string]bool
calls int
}
func (f *fakeProber) IsAlive(_ context.Context, address string) bool {
f.calls++
if f.alive == nil {
return false
}
return f.alive[address]
}
var _ = Describe("ReplicaReconciler — state reconciliation", func() {
var (
db *gorm.DB
registry *NodeRegistry
)
BeforeEach(func() {
if runtime.GOOS == "darwin" {
Skip("testcontainers requires Docker, not available on macOS CI")
}
db = testutil.SetupTestDB()
var err error
registry, err = NewNodeRegistry(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("probeLoadedModels", func() {
It("removes loaded models whose gRPC address is unreachable", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
// Two loaded models — one stale (will probe), one fresh (skipped).
stale := &NodeModel{
ID: "stale-1",
NodeID: node.ID,
ModelName: "stale-model",
Address: "10.0.0.1:12345",
State: "loaded",
UpdatedAt: time.Now().Add(-5 * time.Minute),
}
fresh := &NodeModel{
ID: "fresh-1",
NodeID: node.ID,
ModelName: "fresh-model",
Address: "10.0.0.1:54321",
State: "loaded",
UpdatedAt: time.Now(), // within probeStaleAfter
}
Expect(db.Create(stale).Error).To(Succeed())
Expect(db.Create(fresh).Error).To(Succeed())
prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": false}}
rc := NewReplicaReconciler(ReplicaReconcilerOptions{
Registry: registry,
DB: db,
Prober: prober,
ProbeStaleAfter: 2 * time.Minute,
})
rc.probeLoadedModels(context.Background())
// Stale was unreachable — row removed.
var after []NodeModel
Expect(db.Find(&after).Error).To(Succeed())
Expect(after).To(HaveLen(1))
Expect(after[0].ModelName).To(Equal("fresh-model"))
// Prober was only called once (the fresh row was filtered out).
Expect(prober.calls).To(Equal(1))
})
It("keeps reachable models and bumps their updated_at", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
stale := &NodeModel{
ID: "stale-2",
NodeID: node.ID,
ModelName: "alive-model",
Address: "10.0.0.1:12345",
State: "loaded",
UpdatedAt: time.Now().Add(-5 * time.Minute),
}
Expect(db.Create(stale).Error).To(Succeed())
prober := &fakeProber{alive: map[string]bool{"10.0.0.1:12345": true}}
rc := NewReplicaReconciler(ReplicaReconcilerOptions{
Registry: registry,
DB: db,
Prober: prober,
ProbeStaleAfter: 2 * time.Minute,
})
rc.probeLoadedModels(context.Background())
var after NodeModel
Expect(db.First(&after, "id = ?", "stale-2").Error).To(Succeed())
Expect(after.UpdatedAt).To(BeTemporally("~", time.Now(), time.Second))
})
})
Describe("UpsertPendingBackendOp + RecordPendingBackendOpFailure", func() {
It("upserts on the composite key rather than duplicating rows", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
// Second call for the same (node, backend, op) should not create a
// new row — that's how re-issuing a delete works.
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
var rows []PendingBackendOp
Expect(db.Find(&rows).Error).To(Succeed())
Expect(rows).To(HaveLen(1))
})
It("increments attempts and moves next_retry_at out on failure", func() {
node := &BackendNode{Name: "n1", NodeType: NodeTypeBackend, Address: "10.0.0.1:50051"}
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
Expect(registry.UpsertPendingBackendOp(context.Background(), node.ID, "foo", OpBackendDelete, nil)).To(Succeed())
var row PendingBackendOp
Expect(db.First(&row).Error).To(Succeed())
before := row.NextRetryAt
Expect(registry.RecordPendingBackendOpFailure(context.Background(), row.ID, "boom")).To(Succeed())
Expect(db.First(&row, row.ID).Error).To(Succeed())
Expect(row.Attempts).To(Equal(1))
Expect(row.LastError).To(Equal("boom"))
Expect(row.NextRetryAt).To(BeTemporally(">", before))
})
})
})

View File

@@ -104,6 +104,36 @@ type NodeWithExtras struct {
Labels map[string]string `json:"labels,omitempty"`
}
// PendingBackendOp is a durable intent for a backend lifecycle operation
// (delete/install/upgrade) that needs to eventually apply on a specific node.
//
// Without this table, a backend delete against an offline node silently
// dropped: the frontend skipped the node, the node came back later with the
// backend still installed, and the operator saw a zombie. Now the intent is
// recorded regardless of node status; the state reconciler drains the queue
// whenever a node is healthy and removes the row on success. Reissuing the
// same operation while a row exists updates NextRetryAt instead of stacking
// duplicates (see the unique index).
type PendingBackendOp struct {
ID uint `gorm:"primaryKey;autoIncrement" json:"id"`
NodeID string `gorm:"index;size:36;not null;uniqueIndex:idx_pending_backend_op,priority:1" json:"node_id"`
Backend string `gorm:"index;size:255;not null;uniqueIndex:idx_pending_backend_op,priority:2" json:"backend"`
Op string `gorm:"size:16;not null;uniqueIndex:idx_pending_backend_op,priority:3" json:"op"`
Galleries []byte `gorm:"type:bytea" json:"-"` // serialized JSON for install/upgrade retries
Attempts int `gorm:"default:0" json:"attempts"`
LastError string `gorm:"type:text" json:"last_error,omitempty"`
CreatedAt time.Time `json:"created_at"`
NextRetryAt time.Time `gorm:"index" json:"next_retry_at"`
}
// Op constants mirror the operation names used by DistributedBackendManager
// so callers don't repeat stringly-typed values.
const (
OpBackendDelete = "delete"
OpBackendInstall = "install"
OpBackendUpgrade = "upgrade"
)
// NodeRegistry manages backend node registration and lookup in PostgreSQL.
type NodeRegistry struct {
db *gorm.DB
@@ -114,7 +144,7 @@ type NodeRegistry struct {
// when multiple instances (frontend + workers) start at the same time.
func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) {
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{})
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{})
}); err != nil {
return nil, fmt.Errorf("migrating node tables: %w", err)
}
@@ -946,3 +976,114 @@ func (r *NodeRegistry) ApplyAutoLabels(ctx context.Context, nodeID string, node
_ = r.SetNodeLabel(ctx, nodeID, "node.name", node.Name)
}
}
// UpsertPendingBackendOp records or refreshes a pending backend operation for
// a node. If a row already exists for (nodeID, backend, op) we keep its
// Attempts/LastError but reset NextRetryAt to now, so reissuing the same
// delete/upgrade nudges it to the front of the queue instead of stacking a
// duplicate intent.
func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backend, op string, galleries []byte) error {
row := PendingBackendOp{
NodeID: nodeID,
Backend: backend,
Op: op,
Galleries: galleries,
NextRetryAt: time.Now(),
}
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
}).Create(&row).Error
}
// ListDuePendingBackendOps returns queued ops whose NextRetryAt has passed
// AND whose node is currently healthy. The reconciler drains this list; we
// filter by node status in the query so a tick doesn't hammer NATS for
// nodes that obviously can't answer.
func (r *NodeRegistry) ListDuePendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
var ops []PendingBackendOp
err := r.db.WithContext(ctx).
Joins("JOIN backend_nodes ON backend_nodes.id = pending_backend_ops.node_id").
Where("pending_backend_ops.next_retry_at <= ? AND backend_nodes.status = ?", time.Now(), StatusHealthy).
Order("pending_backend_ops.next_retry_at ASC").
Find(&ops).Error
if err != nil {
return nil, fmt.Errorf("listing due pending backend ops: %w", err)
}
return ops, nil
}
// ListPendingBackendOps returns every queued row (for the UI "pending on N
// nodes" chip and the pre-delete ConfirmDialog).
func (r *NodeRegistry) ListPendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
var ops []PendingBackendOp
if err := r.db.WithContext(ctx).Order("backend ASC, created_at ASC").Find(&ops).Error; err != nil {
return nil, fmt.Errorf("listing pending backend ops: %w", err)
}
return ops, nil
}
// DeletePendingBackendOp removes a queue row — called after the op succeeds.
func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) error {
if err := r.db.WithContext(ctx).Delete(&PendingBackendOp{}, id).Error; err != nil {
return fmt.Errorf("deleting pending backend op %d: %w", id, err)
}
return nil
}
// RecordPendingBackendOpFailure bumps Attempts, captures the error, and
// pushes NextRetryAt out with exponential backoff capped at 15 minutes.
func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error {
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var row PendingBackendOp
if err := tx.First(&row, id).Error; err != nil {
return err
}
row.Attempts++
row.LastError = errMsg
row.NextRetryAt = time.Now().Add(backoffForAttempt(row.Attempts))
return tx.Save(&row).Error
})
}
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
// reconciler tick is 30s so anything shorter would just re-fire immediately.
func backoffForAttempt(attempts int) time.Duration {
const cap = 15 * time.Minute
base := 30 * time.Second
shift := attempts - 1
if shift < 0 {
shift = 0
}
if shift > 10 { // 2^10 * 30s already exceeds the cap
shift = 10
}
d := base << shift
if d > cap {
return cap
}
return d
}
// CountPendingBackendOpsByBackend returns a map of backend name to the count
// of pending rows. Used to decorate Manage → Backends with a "pending on N
// nodes" chip without exposing the full queue.
func (r *NodeRegistry) CountPendingBackendOpsByBackend(ctx context.Context) (map[string]int, error) {
type row struct {
Backend string
Count int
}
var rows []row
err := r.db.WithContext(ctx).Model(&PendingBackendOp{}).
Select("backend, COUNT(*) as count").
Group("backend").
Scan(&rows).Error
if err != nil {
return nil, fmt.Errorf("counting pending backend ops: %w", err)
}
out := make(map[string]int, len(rows))
for _, r := range rows {
out[r.Backend] = r.Count
}
return out, nil
}