diff --git a/core/services/nodes/interfaces.go b/core/services/nodes/interfaces.go index de7ebb8d2..c5f09105c 100644 --- a/core/services/nodes/interfaces.go +++ b/core/services/nodes/interfaces.go @@ -9,7 +9,7 @@ import ( // ModelRouter is used by SmartRouter for routing decisions and model lifecycle. type ModelRouter interface { - FindAndLockNodeWithModel(ctx context.Context, modelName string) (*BackendNode, *NodeModel, error) + FindAndLockNodeWithModel(ctx context.Context, modelName string, candidateNodeIDs []string) (*BackendNode, *NodeModel, error) DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error diff --git a/core/services/nodes/model_router_test.go b/core/services/nodes/model_router_test.go index e369473a4..4c0844158 100644 --- a/core/services/nodes/model_router_test.go +++ b/core/services/nodes/model_router_test.go @@ -27,7 +27,7 @@ func newFakeModelRouterForSmartRouter() *fakeModelRouterForSmartRouter { } } -func (f *fakeModelRouterForSmartRouter) FindAndLockNodeWithModel(_ context.Context, _ string) (*BackendNode, *NodeModel, error) { +func (f *fakeModelRouterForSmartRouter) FindAndLockNodeWithModel(_ context.Context, _ string, _ []string) (*BackendNode, *NodeModel, error) { f.mu.Lock() defer f.mu.Unlock() return f.node, f.nodeModel, f.findErr diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index f7c9dca23..3d4111217 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -652,16 +652,26 @@ func (r *NodeRegistry) FindNodesWithModel(ctx context.Context, modelName string) // model loaded and increments its in-flight counter within a single transaction. // The SELECT FOR UPDATE row lock prevents concurrent eviction from removing the // NodeModel row between the find and increment operations. -func (r *NodeRegistry) FindAndLockNodeWithModel(ctx context.Context, modelName string) (*BackendNode, *NodeModel, error) { +// +// When candidateNodeIDs is non-empty, only nodes in that set are considered. +// Pass nil (or empty) to consider any node. This lets callers pre-filter by +// NodeSelector so a cached replica on a now-excluded node isn't picked over a +// matching replica elsewhere — the selector-mismatch fall-through path used to +// trigger an eviction-busy loop when both sides had the model loaded. +func (r *NodeRegistry) FindAndLockNodeWithModel(ctx context.Context, modelName string, candidateNodeIDs []string) (*BackendNode, *NodeModel, error) { var nm NodeModel var node BackendNode err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // Order by in_flight ASC (least busy replica), then by available_vram DESC // (prefer nodes with more free VRAM to spread load across the cluster). - if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). + q := tx.Clauses(clause.Locking{Strength: "UPDATE"}). Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id"). - Where("node_models.model_name = ? AND node_models.state = ?", modelName, "loaded"). + Where("node_models.model_name = ? AND node_models.state = ?", modelName, "loaded") + if len(candidateNodeIDs) > 0 { + q = q.Where("node_models.node_id IN ?", candidateNodeIDs) + } + if err := q. Order("node_models.in_flight ASC, backend_nodes.available_vram DESC"). First(&nm).Error; err != nil { return err diff --git a/core/services/nodes/registry_test.go b/core/services/nodes/registry_test.go index df323bc29..dce310ff0 100644 --- a/core/services/nodes/registry_test.go +++ b/core/services/nodes/registry_test.go @@ -244,7 +244,7 @@ var _ = Describe("NodeRegistry", func() { Expect(registry.Register(context.Background(), node, true)).To(Succeed()) Expect(registry.SetNodeModel(context.Background(), node.ID, "my-model", 0, "loaded", "10.0.0.40:50052", 0)).To(Succeed()) - foundNode, foundNM, err := registry.FindAndLockNodeWithModel(context.Background(), "my-model") + foundNode, foundNM, err := registry.FindAndLockNodeWithModel(context.Background(), "my-model", nil) Expect(err).ToNot(HaveOccurred()) Expect(foundNode.ID).To(Equal(node.ID)) Expect(foundNM.ModelName).To(Equal("my-model")) @@ -256,7 +256,7 @@ var _ = Describe("NodeRegistry", func() { }) It("returns error when model is not loaded anywhere", func() { - _, _, err := registry.FindAndLockNodeWithModel(context.Background(), "nonexistent-model") + _, _, err := registry.FindAndLockNodeWithModel(context.Background(), "nonexistent-model", nil) Expect(err).To(HaveOccurred()) }) @@ -273,10 +273,52 @@ var _ = Describe("NodeRegistry", func() { Expect(registry.IncrementInFlight(context.Background(), n1.ID, "shared-model", 0)).To(Succeed()) Expect(registry.IncrementInFlight(context.Background(), n1.ID, "shared-model", 0)).To(Succeed()) - foundNode, _, err := registry.FindAndLockNodeWithModel(context.Background(), "shared-model") + foundNode, _, err := registry.FindAndLockNodeWithModel(context.Background(), "shared-model", nil) Expect(err).ToNot(HaveOccurred()) Expect(foundNode.Name).To(Equal("lock-light")) }) + + It("filters by candidateNodeIDs even when an excluded node has lower in_flight", func() { + // Reproduces the selector-mismatch loop: a model loaded on a node + // the selector now excludes (excluded) and on a node it includes + // (included). Without the filter the excluded node wins on + // in_flight ASC; with the filter the included node is returned + // directly so Route() can serve from its existing replica. + excluded := makeNode("excluded-node", "10.0.0.43:50051", 8_000_000_000) + included := makeNode("included-node", "10.0.0.44:50051", 8_000_000_000) + Expect(registry.Register(context.Background(), excluded, true)).To(Succeed()) + Expect(registry.Register(context.Background(), included, true)).To(Succeed()) + + Expect(registry.SetNodeModel(context.Background(), excluded.ID, "filtered-model", 0, "loaded", "", 0)).To(Succeed()) + Expect(registry.SetNodeModel(context.Background(), included.ID, "filtered-model", 0, "loaded", "", 0)).To(Succeed()) + + // Make `included` strictly busier than `excluded` so the unfiltered + // query would prefer the excluded one — proving the filter is + // what's steering the result, not the in_flight ordering. + Expect(registry.IncrementInFlight(context.Background(), included.ID, "filtered-model", 0)).To(Succeed()) + Expect(registry.IncrementInFlight(context.Background(), included.ID, "filtered-model", 0)).To(Succeed()) + + foundNode, foundNM, err := registry.FindAndLockNodeWithModel(context.Background(), "filtered-model", []string{included.ID}) + Expect(err).ToNot(HaveOccurred()) + Expect(foundNode.ID).To(Equal(included.ID)) + Expect(foundNM.NodeID).To(Equal(included.ID)) + }) + + It("returns not-found when the model is loaded only on excluded nodes", func() { + loadedExcluded := makeNode("excl-only-node", "10.0.0.45:50051", 8_000_000_000) + emptyIncluded := makeNode("empty-included-node", "10.0.0.46:50051", 8_000_000_000) + Expect(registry.Register(context.Background(), loadedExcluded, true)).To(Succeed()) + Expect(registry.Register(context.Background(), emptyIncluded, true)).To(Succeed()) + + Expect(registry.SetNodeModel(context.Background(), loadedExcluded.ID, "no-match-model", 0, "loaded", "", 0)).To(Succeed()) + + // Filter restricts to a node that does not have the model — the + // query must return an error so Route() falls through to schedule + // a fresh load on a matching node instead of reusing the excluded + // replica. + _, _, err := registry.FindAndLockNodeWithModel(context.Background(), "no-match-model", []string{emptyIncluded.ID}) + Expect(err).To(HaveOccurred()) + }) }) Describe("MarkHealthy and MarkUnhealthy round-trip", func() { diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index abae8f3f5..ea49255d8 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -150,12 +150,13 @@ func (r *SmartRouter) ScheduleAndLoadModel(ctx context.Context, modelName string // Get load info from an existing replica (stored when Route() first loaded the model) backendType, optsBlob, err := r.registry.GetModelLoadInfo(ctx, modelName) if err != nil { - // No existing replica with stored opts — fall back to install-only. - // This happens on the very first load (before Route() has stored opts). - xlog.Warn("No stored model load info for reconciler scale-up, falling back to backend install only", - "model", modelName, "error", err) - node, _, _, schedErr := r.scheduleNewModel(ctx, "", modelName, nil) - return node, schedErr + // No replica has ever been loaded for this model, so we have no + // backend type or model options to replicate. The previous fallback + // fired backend.install with backend="" every reconciler tick, which + // the worker rejected ("backend name is empty"). Skip cleanly: the + // model needs to be served at least once via Route() so its load + // info is stored — then the reconciler can replicate it. + return nil, fmt.Errorf("no load info for model %s: serve at least one request for it before the reconciler can replicate (cause: %w)", modelName, err) } // Deserialize the stored model options @@ -198,8 +199,18 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType trackingKey = modelName } + // Resolve the model's NodeSelector once so cached-replica lookup and the + // new-load scheduler agree on the candidate set. Without this, a cached + // replica on a node the selector now excludes was picked over a matching + // replica elsewhere, and the fall-through then tried to load on the + // matching node where the model was already at capacity (eviction-busy). + candidateNodeIDs, err := r.resolveSelectorCandidates(ctx, trackingKey) + if err != nil { + return nil, err + } + // Step 1: Find and atomically lock a node with this model loaded - node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey) + node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs) if err == nil && node != nil { modelAddr := node.Address if nm.Address != "" { @@ -246,7 +257,7 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType // Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates loadModel := func() (*RouteResult, error) { // Re-check after acquiring lock — another request may have loaded it - node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey) + node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs) if err == nil && node != nil { modelAddr := node.Address if nm.Address != "" { @@ -347,6 +358,30 @@ func extractNodeIDs(nodes []BackendNode) []string { return ids } +// resolveSelectorCandidates returns the node IDs that match the model's +// NodeSelector. Returns nil when no selector is configured ("any healthy node" +// — registry helpers treat nil as no filter). Returns an error when a +// non-empty selector matches zero healthy nodes, since there is nothing to +// route or schedule on. +func (r *SmartRouter) resolveSelectorCandidates(ctx context.Context, modelID string) ([]string, error) { + sched, _ := r.registry.GetModelScheduling(ctx, modelID) + if sched == nil || sched.NodeSelector == "" { + return nil, nil + } + selector := parseSelectorJSON(sched.NodeSelector) + if len(selector) == 0 { + return nil, nil + } + candidates, err := r.registry.FindNodesBySelector(ctx, selector) + if err != nil { + return nil, fmt.Errorf("looking up nodes for selector %s: %w", sched.NodeSelector, err) + } + if len(candidates) == 0 { + return nil, fmt.Errorf("no healthy nodes match selector for model %s: %s", modelID, sched.NodeSelector) + } + return extractNodeIDs(candidates), nil +} + // nodeMatchesScheduling checks if a node satisfies the scheduling constraints for a model. // Returns true if no constraints exist or the node matches all selector labels. func (r *SmartRouter) nodeMatchesScheduling(ctx context.Context, node *BackendNode, modelName string) bool { @@ -398,18 +433,9 @@ func (r *SmartRouter) scheduleNewModel(ctx context.Context, backendType, modelID // Check for scheduling constraints (node selector). If a selector is set, // we restrict the candidate pool to matching nodes; otherwise nil means // "any healthy node". - sched, _ := r.registry.GetModelScheduling(ctx, modelID) - var candidateNodeIDs []string - - if sched != nil && sched.NodeSelector != "" { - selector := parseSelectorJSON(sched.NodeSelector) - if len(selector) > 0 { - candidates, err := r.registry.FindNodesBySelector(ctx, selector) - if err != nil || len(candidates) == 0 { - return nil, "", 0, fmt.Errorf("no healthy nodes match selector for model %s: %v", modelID, sched.NodeSelector) - } - candidateNodeIDs = extractNodeIDs(candidates) - } + candidateNodeIDs, err := r.resolveSelectorCandidates(ctx, modelID) + if err != nil { + return nil, "", 0, err } // Narrow candidates to nodes that still have a free replica slot for this diff --git a/core/services/nodes/router_test.go b/core/services/nodes/router_test.go index 028aa0dc1..2e5d3ce9c 100644 --- a/core/services/nodes/router_test.go +++ b/core/services/nodes/router_test.go @@ -114,7 +114,7 @@ type fakeModelRouter struct { touchCalls []string } -func (f *fakeModelRouter) FindAndLockNodeWithModel(_ context.Context, modelName string) (*BackendNode, *NodeModel, error) { +func (f *fakeModelRouter) FindAndLockNodeWithModel(_ context.Context, modelName string, _ []string) (*BackendNode, *NodeModel, error) { return f.findAndLockNode, f.findAndLockNM, f.findAndLockErr } @@ -268,13 +268,26 @@ func (f *stubClientFactory) NewClient(_ string, _ bool) grpc.Backend { type fakeUnloader struct { installReply *messaging.BackendInstallReply installErr error - stopCalls []string // "nodeID:model" + installCalls []installCall // every InstallBackend invocation, in order + stopCalls []string // "nodeID:model" stopErr error unloadCalls []string unloadErr error } -func (f *fakeUnloader) InstallBackend(_, _, _, _, _, _, _ string, _ int) (*messaging.BackendInstallReply, error) { +// installCall captures the args we care about when asserting that the +// reconciler / router did or did not fire a NATS install. The fake records +// every call so tests can verify both presence and shape (e.g. that backend +// is non-empty). +type installCall struct { + nodeID string + backend string + modelID string + replica int +} + +func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) { + f.installCalls = append(f.installCalls, installCall{nodeID, backend, modelID, replica}) return f.installReply, f.installErr } @@ -692,6 +705,28 @@ var _ = Describe("SmartRouter", func() { }) }) + Describe("ScheduleAndLoadModel (mock-based)", func() { + It("returns an error and does not fire a NATS install when no load info is stored", func() { + // Reproduces the reconciler scale-up bug: when GetModelLoadInfo + // returns ErrRecordNotFound (no replica has ever been loaded), + // the previous fallback called scheduleNewModel with an empty + // backend type, which the worker rejected on every reconciler + // tick. The fix bails out cleanly with an explanatory error and + // never sends backend.install. + unloader := &fakeUnloader{} + reg := &fakeModelRouter{} + router := NewSmartRouter(reg, SmartRouterOptions{Unloader: unloader}) + + node, err := router.ScheduleAndLoadModel(context.Background(), "never-loaded", nil) + + Expect(err).To(HaveOccurred()) + Expect(node).To(BeNil()) + Expect(err.Error()).To(ContainSubstring("never-loaded")) + Expect(unloader.installCalls).To(BeEmpty(), + "reconciler must not fire backend.install when there is no load info to replicate") + }) + }) + // ----------------------------------------------------------------------- // Integration tests using real PostgreSQL (existing) // ----------------------------------------------------------------------- diff --git a/pkg/model/initializers.go b/pkg/model/initializers.go index a3de78dd4..ad9a88c06 100644 --- a/pkg/model/initializers.go +++ b/pkg/model/initializers.go @@ -183,8 +183,12 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o)) if err != nil { + // Defensive cleanup: the model usually wasn't registered yet (LoadModel + // failed before that), so StopGRPC reporting "model not found" is the + // expected case, not an error. The outer Failed-to-load log below + // carries the real reason. if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil { - xlog.Error("error stopping model", "error", stopErr, "model", o.modelID) + xlog.Debug("cleanup stop after failed load", "error", stopErr, "model", o.modelID) } xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString) return nil, err diff --git a/tests/e2e/distributed/model_routing_test.go b/tests/e2e/distributed/model_routing_test.go index cb2b06e7a..d8e3127f0 100644 --- a/tests/e2e/distributed/model_routing_test.go +++ b/tests/e2e/distributed/model_routing_test.go @@ -63,7 +63,7 @@ var _ = Describe("Model Routing", Label("Distributed"), func() { Expect(models[0].InFlight).To(Equal(2)) // FindAndLockNodeWithModel should return this node and atomically increment in-flight - foundNode, foundModel, err := registry.FindAndLockNodeWithModel(context.Background(), "llama3") + foundNode, foundModel, err := registry.FindAndLockNodeWithModel(context.Background(), "llama3", nil) Expect(err).ToNot(HaveOccurred()) Expect(foundNode.ID).To(Equal(node.ID)) Expect(foundModel.ModelName).To(Equal("llama3"))