fix(distributed): honor NodeSelector in cached-replica lookup, stop empty-backend reconciler scaleups (#9652)

* fix(distributed): honor NodeSelector in cached-replica lookup, stop empty-backend reconciler scaleups

Two distinct bugs were causing tight retry loops in the distributed scheduler:

1. FindAndLockNodeWithModel ignored the model's NodeSelector. When a model
   was loaded on multiple nodes and only some matched the current selector,
   the function returned the lowest-in_flight node — even one the selector
   excluded. Route()'s post-check then fell through to scheduleNewModel,
   which targeted the matching node where the model was already at
   MaxReplicasPerModel capacity. Eviction couldn't help (the only loaded
   model on that node was the one being requested, and it was busy), so
   every request looped through "evicting LRU" → "all models busy".

   Fix: thread an optional candidateNodeIDs filter through
   FindAndLockNodeWithModel. Route() resolves the selector once via a new
   resolveSelectorCandidates helper and passes the matching IDs to both
   the cached-replica lookup and scheduleNewModel. The same helper
   replaces the inline selector block in scheduleNewModel.

2. ScheduleAndLoadModel (reconciler scale-up path) fell back to
   scheduleNewModel with backendType="" when no replica had ever been
   loaded for a model. The worker rejected the resulting backend.install
   ("backend name is empty") on every reconciler tick (~30s).

   Fix: remove the broken fallback. When GetModelLoadInfo has nothing
   stored, return a clear error instead of firing a doomed NATS install.
   The reconciler's existing scale-up failure log surfaces it once per
   tick; the model auto-replicates as soon as Route() serves it once and
   stores load info.

Also downgrade the post-LoadModel-failure StopGRPC error to Debug — that
cleanup attempt usually hits "model not found" because LoadModel failed
before registering the process, and the outer "Failed to load model"
error already carries the real reason.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: claude-code:claude-opus-4-7 [Read] [Edit] [Bash]

* test(distributed): cover selector-aware FindAndLockNodeWithModel and reconciler scaleup guard

Two regression tests for the bugs fixed in the previous commit:

1. FindAndLockNodeWithModel — registry-level integration tests verify the
   candidateNodeIDs filter:
   - Returns the included node even when an excluded node has lower
     in_flight (the original selector-mismatch loop scenario).
   - Returns not-found when the model is loaded only on excluded nodes,
     forcing Route() to fall through to a fresh schedule instead of
     reusing the excluded replica.

2. ScheduleAndLoadModel — mock-based test verifies the reconciler scale-up
   path returns an error and does NOT fire backend.install when no replica
   has been loaded yet. fakeUnloader gains an installCalls slice so this
   negative assertion is direct.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: claude-code:claude-opus-4-7 [Read] [Edit] [Bash]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-05-04 09:42:14 +02:00
committed by GitHub
parent 28b4857bd6
commit 170d55c67d
8 changed files with 150 additions and 33 deletions

View File

@@ -9,7 +9,7 @@ import (
// ModelRouter is used by SmartRouter for routing decisions and model lifecycle.
type ModelRouter interface {
FindAndLockNodeWithModel(ctx context.Context, modelName string) (*BackendNode, *NodeModel, error)
FindAndLockNodeWithModel(ctx context.Context, modelName string, candidateNodeIDs []string) (*BackendNode, *NodeModel, error)
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error

View File

@@ -27,7 +27,7 @@ func newFakeModelRouterForSmartRouter() *fakeModelRouterForSmartRouter {
}
}
func (f *fakeModelRouterForSmartRouter) FindAndLockNodeWithModel(_ context.Context, _ string) (*BackendNode, *NodeModel, error) {
func (f *fakeModelRouterForSmartRouter) FindAndLockNodeWithModel(_ context.Context, _ string, _ []string) (*BackendNode, *NodeModel, error) {
f.mu.Lock()
defer f.mu.Unlock()
return f.node, f.nodeModel, f.findErr

View File

@@ -652,16 +652,26 @@ func (r *NodeRegistry) FindNodesWithModel(ctx context.Context, modelName string)
// model loaded and increments its in-flight counter within a single transaction.
// The SELECT FOR UPDATE row lock prevents concurrent eviction from removing the
// NodeModel row between the find and increment operations.
func (r *NodeRegistry) FindAndLockNodeWithModel(ctx context.Context, modelName string) (*BackendNode, *NodeModel, error) {
//
// When candidateNodeIDs is non-empty, only nodes in that set are considered.
// Pass nil (or empty) to consider any node. This lets callers pre-filter by
// NodeSelector so a cached replica on a now-excluded node isn't picked over a
// matching replica elsewhere — the selector-mismatch fall-through path used to
// trigger an eviction-busy loop when both sides had the model loaded.
func (r *NodeRegistry) FindAndLockNodeWithModel(ctx context.Context, modelName string, candidateNodeIDs []string) (*BackendNode, *NodeModel, error) {
var nm NodeModel
var node BackendNode
err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// Order by in_flight ASC (least busy replica), then by available_vram DESC
// (prefer nodes with more free VRAM to spread load across the cluster).
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
q := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id").
Where("node_models.model_name = ? AND node_models.state = ?", modelName, "loaded").
Where("node_models.model_name = ? AND node_models.state = ?", modelName, "loaded")
if len(candidateNodeIDs) > 0 {
q = q.Where("node_models.node_id IN ?", candidateNodeIDs)
}
if err := q.
Order("node_models.in_flight ASC, backend_nodes.available_vram DESC").
First(&nm).Error; err != nil {
return err

View File

@@ -244,7 +244,7 @@ var _ = Describe("NodeRegistry", func() {
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
Expect(registry.SetNodeModel(context.Background(), node.ID, "my-model", 0, "loaded", "10.0.0.40:50052", 0)).To(Succeed())
foundNode, foundNM, err := registry.FindAndLockNodeWithModel(context.Background(), "my-model")
foundNode, foundNM, err := registry.FindAndLockNodeWithModel(context.Background(), "my-model", nil)
Expect(err).ToNot(HaveOccurred())
Expect(foundNode.ID).To(Equal(node.ID))
Expect(foundNM.ModelName).To(Equal("my-model"))
@@ -256,7 +256,7 @@ var _ = Describe("NodeRegistry", func() {
})
It("returns error when model is not loaded anywhere", func() {
_, _, err := registry.FindAndLockNodeWithModel(context.Background(), "nonexistent-model")
_, _, err := registry.FindAndLockNodeWithModel(context.Background(), "nonexistent-model", nil)
Expect(err).To(HaveOccurred())
})
@@ -273,10 +273,52 @@ var _ = Describe("NodeRegistry", func() {
Expect(registry.IncrementInFlight(context.Background(), n1.ID, "shared-model", 0)).To(Succeed())
Expect(registry.IncrementInFlight(context.Background(), n1.ID, "shared-model", 0)).To(Succeed())
foundNode, _, err := registry.FindAndLockNodeWithModel(context.Background(), "shared-model")
foundNode, _, err := registry.FindAndLockNodeWithModel(context.Background(), "shared-model", nil)
Expect(err).ToNot(HaveOccurred())
Expect(foundNode.Name).To(Equal("lock-light"))
})
It("filters by candidateNodeIDs even when an excluded node has lower in_flight", func() {
// Reproduces the selector-mismatch loop: a model loaded on a node
// the selector now excludes (excluded) and on a node it includes
// (included). Without the filter the excluded node wins on
// in_flight ASC; with the filter the included node is returned
// directly so Route() can serve from its existing replica.
excluded := makeNode("excluded-node", "10.0.0.43:50051", 8_000_000_000)
included := makeNode("included-node", "10.0.0.44:50051", 8_000_000_000)
Expect(registry.Register(context.Background(), excluded, true)).To(Succeed())
Expect(registry.Register(context.Background(), included, true)).To(Succeed())
Expect(registry.SetNodeModel(context.Background(), excluded.ID, "filtered-model", 0, "loaded", "", 0)).To(Succeed())
Expect(registry.SetNodeModel(context.Background(), included.ID, "filtered-model", 0, "loaded", "", 0)).To(Succeed())
// Make `included` strictly busier than `excluded` so the unfiltered
// query would prefer the excluded one — proving the filter is
// what's steering the result, not the in_flight ordering.
Expect(registry.IncrementInFlight(context.Background(), included.ID, "filtered-model", 0)).To(Succeed())
Expect(registry.IncrementInFlight(context.Background(), included.ID, "filtered-model", 0)).To(Succeed())
foundNode, foundNM, err := registry.FindAndLockNodeWithModel(context.Background(), "filtered-model", []string{included.ID})
Expect(err).ToNot(HaveOccurred())
Expect(foundNode.ID).To(Equal(included.ID))
Expect(foundNM.NodeID).To(Equal(included.ID))
})
It("returns not-found when the model is loaded only on excluded nodes", func() {
loadedExcluded := makeNode("excl-only-node", "10.0.0.45:50051", 8_000_000_000)
emptyIncluded := makeNode("empty-included-node", "10.0.0.46:50051", 8_000_000_000)
Expect(registry.Register(context.Background(), loadedExcluded, true)).To(Succeed())
Expect(registry.Register(context.Background(), emptyIncluded, true)).To(Succeed())
Expect(registry.SetNodeModel(context.Background(), loadedExcluded.ID, "no-match-model", 0, "loaded", "", 0)).To(Succeed())
// Filter restricts to a node that does not have the model — the
// query must return an error so Route() falls through to schedule
// a fresh load on a matching node instead of reusing the excluded
// replica.
_, _, err := registry.FindAndLockNodeWithModel(context.Background(), "no-match-model", []string{emptyIncluded.ID})
Expect(err).To(HaveOccurred())
})
})
Describe("MarkHealthy and MarkUnhealthy round-trip", func() {

View File

@@ -150,12 +150,13 @@ func (r *SmartRouter) ScheduleAndLoadModel(ctx context.Context, modelName string
// Get load info from an existing replica (stored when Route() first loaded the model)
backendType, optsBlob, err := r.registry.GetModelLoadInfo(ctx, modelName)
if err != nil {
// No existing replica with stored opts — fall back to install-only.
// This happens on the very first load (before Route() has stored opts).
xlog.Warn("No stored model load info for reconciler scale-up, falling back to backend install only",
"model", modelName, "error", err)
node, _, _, schedErr := r.scheduleNewModel(ctx, "", modelName, nil)
return node, schedErr
// No replica has ever been loaded for this model, so we have no
// backend type or model options to replicate. The previous fallback
// fired backend.install with backend="" every reconciler tick, which
// the worker rejected ("backend name is empty"). Skip cleanly: the
// model needs to be served at least once via Route() so its load
// info is stored — then the reconciler can replicate it.
return nil, fmt.Errorf("no load info for model %s: serve at least one request for it before the reconciler can replicate (cause: %w)", modelName, err)
}
// Deserialize the stored model options
@@ -198,8 +199,18 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
trackingKey = modelName
}
// Resolve the model's NodeSelector once so cached-replica lookup and the
// new-load scheduler agree on the candidate set. Without this, a cached
// replica on a node the selector now excludes was picked over a matching
// replica elsewhere, and the fall-through then tried to load on the
// matching node where the model was already at capacity (eviction-busy).
candidateNodeIDs, err := r.resolveSelectorCandidates(ctx, trackingKey)
if err != nil {
return nil, err
}
// Step 1: Find and atomically lock a node with this model loaded
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey)
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs)
if err == nil && node != nil {
modelAddr := node.Address
if nm.Address != "" {
@@ -246,7 +257,7 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
// Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates
loadModel := func() (*RouteResult, error) {
// Re-check after acquiring lock — another request may have loaded it
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey)
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs)
if err == nil && node != nil {
modelAddr := node.Address
if nm.Address != "" {
@@ -347,6 +358,30 @@ func extractNodeIDs(nodes []BackendNode) []string {
return ids
}
// resolveSelectorCandidates returns the node IDs that match the model's
// NodeSelector. Returns nil when no selector is configured ("any healthy node"
// — registry helpers treat nil as no filter). Returns an error when a
// non-empty selector matches zero healthy nodes, since there is nothing to
// route or schedule on.
func (r *SmartRouter) resolveSelectorCandidates(ctx context.Context, modelID string) ([]string, error) {
sched, _ := r.registry.GetModelScheduling(ctx, modelID)
if sched == nil || sched.NodeSelector == "" {
return nil, nil
}
selector := parseSelectorJSON(sched.NodeSelector)
if len(selector) == 0 {
return nil, nil
}
candidates, err := r.registry.FindNodesBySelector(ctx, selector)
if err != nil {
return nil, fmt.Errorf("looking up nodes for selector %s: %w", sched.NodeSelector, err)
}
if len(candidates) == 0 {
return nil, fmt.Errorf("no healthy nodes match selector for model %s: %s", modelID, sched.NodeSelector)
}
return extractNodeIDs(candidates), nil
}
// nodeMatchesScheduling checks if a node satisfies the scheduling constraints for a model.
// Returns true if no constraints exist or the node matches all selector labels.
func (r *SmartRouter) nodeMatchesScheduling(ctx context.Context, node *BackendNode, modelName string) bool {
@@ -398,18 +433,9 @@ func (r *SmartRouter) scheduleNewModel(ctx context.Context, backendType, modelID
// Check for scheduling constraints (node selector). If a selector is set,
// we restrict the candidate pool to matching nodes; otherwise nil means
// "any healthy node".
sched, _ := r.registry.GetModelScheduling(ctx, modelID)
var candidateNodeIDs []string
if sched != nil && sched.NodeSelector != "" {
selector := parseSelectorJSON(sched.NodeSelector)
if len(selector) > 0 {
candidates, err := r.registry.FindNodesBySelector(ctx, selector)
if err != nil || len(candidates) == 0 {
return nil, "", 0, fmt.Errorf("no healthy nodes match selector for model %s: %v", modelID, sched.NodeSelector)
}
candidateNodeIDs = extractNodeIDs(candidates)
}
candidateNodeIDs, err := r.resolveSelectorCandidates(ctx, modelID)
if err != nil {
return nil, "", 0, err
}
// Narrow candidates to nodes that still have a free replica slot for this

View File

@@ -114,7 +114,7 @@ type fakeModelRouter struct {
touchCalls []string
}
func (f *fakeModelRouter) FindAndLockNodeWithModel(_ context.Context, modelName string) (*BackendNode, *NodeModel, error) {
func (f *fakeModelRouter) FindAndLockNodeWithModel(_ context.Context, modelName string, _ []string) (*BackendNode, *NodeModel, error) {
return f.findAndLockNode, f.findAndLockNM, f.findAndLockErr
}
@@ -268,13 +268,26 @@ func (f *stubClientFactory) NewClient(_ string, _ bool) grpc.Backend {
type fakeUnloader struct {
installReply *messaging.BackendInstallReply
installErr error
stopCalls []string // "nodeID:model"
installCalls []installCall // every InstallBackend invocation, in order
stopCalls []string // "nodeID:model"
stopErr error
unloadCalls []string
unloadErr error
}
func (f *fakeUnloader) InstallBackend(_, _, _, _, _, _, _ string, _ int) (*messaging.BackendInstallReply, error) {
// installCall captures the args we care about when asserting that the
// reconciler / router did or did not fire a NATS install. The fake records
// every call so tests can verify both presence and shape (e.g. that backend
// is non-empty).
type installCall struct {
nodeID string
backend string
modelID string
replica int
}
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) {
f.installCalls = append(f.installCalls, installCall{nodeID, backend, modelID, replica})
return f.installReply, f.installErr
}
@@ -692,6 +705,28 @@ var _ = Describe("SmartRouter", func() {
})
})
Describe("ScheduleAndLoadModel (mock-based)", func() {
It("returns an error and does not fire a NATS install when no load info is stored", func() {
// Reproduces the reconciler scale-up bug: when GetModelLoadInfo
// returns ErrRecordNotFound (no replica has ever been loaded),
// the previous fallback called scheduleNewModel with an empty
// backend type, which the worker rejected on every reconciler
// tick. The fix bails out cleanly with an explanatory error and
// never sends backend.install.
unloader := &fakeUnloader{}
reg := &fakeModelRouter{}
router := NewSmartRouter(reg, SmartRouterOptions{Unloader: unloader})
node, err := router.ScheduleAndLoadModel(context.Background(), "never-loaded", nil)
Expect(err).To(HaveOccurred())
Expect(node).To(BeNil())
Expect(err.Error()).To(ContainSubstring("never-loaded"))
Expect(unloader.installCalls).To(BeEmpty(),
"reconciler must not fire backend.install when there is no load info to replicate")
})
})
// -----------------------------------------------------------------------
// Integration tests using real PostgreSQL (existing)
// -----------------------------------------------------------------------

View File

@@ -183,8 +183,12 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e
model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o))
if err != nil {
// Defensive cleanup: the model usually wasn't registered yet (LoadModel
// failed before that), so StopGRPC reporting "model not found" is the
// expected case, not an error. The outer Failed-to-load log below
// carries the real reason.
if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil {
xlog.Error("error stopping model", "error", stopErr, "model", o.modelID)
xlog.Debug("cleanup stop after failed load", "error", stopErr, "model", o.modelID)
}
xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString)
return nil, err

View File

@@ -63,7 +63,7 @@ var _ = Describe("Model Routing", Label("Distributed"), func() {
Expect(models[0].InFlight).To(Equal(2))
// FindAndLockNodeWithModel should return this node and atomically increment in-flight
foundNode, foundModel, err := registry.FindAndLockNodeWithModel(context.Background(), "llama3")
foundNode, foundModel, err := registry.FindAndLockNodeWithModel(context.Background(), "llama3", nil)
Expect(err).ToNot(HaveOccurred())
Expect(foundNode.ID).To(Equal(node.ID))
Expect(foundModel.ModelName).To(Equal("llama3"))