diff --git a/core/services/nodes/interfaces.go b/core/services/nodes/interfaces.go index 086d83142..c1dcd2af9 100644 --- a/core/services/nodes/interfaces.go +++ b/core/services/nodes/interfaces.go @@ -17,6 +17,7 @@ type ModelRouter interface { TouchNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) SetNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int, state, address string, initialInFlight int) error SetNodeModelLoadInfo(ctx context.Context, nodeID, modelName string, replicaIndex int, backendType string, optsBlob []byte) error + UpsertModelLoadInfo(ctx context.Context, modelName, backendType string, optsBlob []byte) error GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error) NextFreeReplicaIndex(ctx context.Context, nodeID, modelName string, maxSlots int) (int, error) CountReplicasOnNode(ctx context.Context, nodeID, modelName string) (int, error) diff --git a/core/services/nodes/model_router_test.go b/core/services/nodes/model_router_test.go index fedeb0e3d..190dac731 100644 --- a/core/services/nodes/model_router_test.go +++ b/core/services/nodes/model_router_test.go @@ -56,6 +56,9 @@ func (f *fakeModelRouterForSmartRouter) SetNodeModel(_ context.Context, _, _ str func (f *fakeModelRouterForSmartRouter) SetNodeModelLoadInfo(_ context.Context, _, _ string, _ int, _ string, _ []byte) error { return nil } +func (f *fakeModelRouterForSmartRouter) UpsertModelLoadInfo(_ context.Context, _, _ string, _ []byte) error { + return nil +} func (f *fakeModelRouterForSmartRouter) GetModelLoadInfo(_ context.Context, _ string) (string, []byte, error) { return "", nil, fmt.Errorf("not found") } diff --git a/core/services/nodes/reconciler_test.go b/core/services/nodes/reconciler_test.go index 133813010..0697374a8 100644 --- a/core/services/nodes/reconciler_test.go +++ b/core/services/nodes/reconciler_test.go @@ -423,6 +423,47 @@ var _ = Describe("ReplicaReconciler", func() { Expect(cap).To(Equal(2)) }) }) + + Context("frontend-restart scenario (Bug-1)", func() { + It("recovers replicas after every NodeModel row has been removed", func() { + ctx := context.Background() + + // One healthy node. UpsertModelLoadInfo records per-model metadata + // independently of any NodeModel row, mirroring what + // scheduleAndLoad does on a successful dispatch. + node := registerNode("restart-node", "10.0.20.1:50051") + setSchedulingConfig("restart-model", 2, 4, "") + Expect(registry.UpsertModelLoadInfo(ctx, "restart-model", "llama-cpp", []byte("opts-from-pre-restart"))).To(Succeed()) + + // Simulate the bug: between frontend instances the NodeModel rows + // are wiped (MarkOffline path, stale-heartbeat reaping). The + // per-model load info row stays because it's not tied to any + // (node, replica) slot. + Expect(registry.RemoveAllNodeModelReplicas(ctx, node.ID, "restart-model")).To(Succeed()) + + // Pre-fix: GetModelLoadInfo returned ErrRecordNotFound here and + // the reconciler logged "no load info" every 30s until a manual + // inference request arrived. + bt, blob, err := registry.GetModelLoadInfo(ctx, "restart-model") + Expect(err).ToNot(HaveOccurred()) + Expect(bt).To(Equal("llama-cpp")) + Expect(blob).To(Equal([]byte("opts-from-pre-restart"))) + + // And the reconciler tick should now call into the scheduler + // for the missing replicas instead of bailing out. + scheduler := &fakeScheduler{scheduleNode: node} + reconciler := NewReplicaReconciler(ReplicaReconcilerOptions{ + Registry: registry, + Scheduler: scheduler, + DB: db, + }) + reconciler.reconcile(ctx) + + Expect(scheduler.scheduleCalls).ToNot(BeEmpty(), + "reconciler must call the scheduler after a frontend restart that wiped NodeModel rows") + Expect(scheduler.scheduleCalls[0].modelName).To(Equal("restart-model")) + }) + }) }) // fakeProber lets tests control whether a model's gRPC address "responds". diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index 5dcb48a5c..d27f9f791 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -94,6 +94,31 @@ type NodeModel struct { UpdatedAt time.Time `json:"updated_at"` } +// ModelLoadInfo is per-model load metadata kept independently of NodeModel rows +// so the Replica Reconciler can re-load a model after every replica row has +// been removed (worker death, eviction, MarkOffline reaping, frontend restart +// with stale heartbeats). +// +// Why a separate table when the same blob is also stamped on each NodeModel +// row? NodeModel rows are tied to a live (node, replica) slot and get deleted +// when a backend stops being healthy. Tying the only copy of load info to +// that lifecycle is exactly what caused Bug-1: a frontend restart followed by +// transient worker-row removal left no copy of ModelOptions, so the reconciler +// could not bring `min_replicas` back without a fresh inference request. +// +// Keyed by ModelName (the tracking key used by the router); last-write-wins +// on the opts blob because two concurrent frontends dispatching the same +// model with slightly different opts converge on whichever finished last. +// That is identical to the per-NodeModel-row semantics today; if a stronger +// guarantee is needed in the future, the row carries UpdatedAt for ordering. +type ModelLoadInfo struct { + ModelName string `gorm:"primaryKey;size:255" json:"model_name"` + BackendType string `gorm:"size:128" json:"backend_type"` + ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + // NodeLabel is a key-value label on a node (like K8s labels). type NodeLabel struct { ID string `gorm:"primaryKey;size:36" json:"id"` @@ -178,7 +203,7 @@ type NodeRegistry struct { // when multiple instances (frontend + workers) start at the same time. func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) { if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { - return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{}) + return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{}, &ModelLoadInfo{}) }); err != nil { return nil, fmt.Errorf("migrating node tables: %w", err) } @@ -622,10 +647,54 @@ func (r *NodeRegistry) SetNodeModelLoadInfo(ctx context.Context, nodeID, modelNa Updates(map[string]any{"backend_type": backendType, "model_opts_blob": optsBlob}).Error } +// UpsertModelLoadInfo records or replaces the per-model load info in the +// dedicated ModelLoadInfo table. Unlike SetNodeModelLoadInfo (which writes the +// blob onto a specific replica row and dies with it), this survives every +// NodeModel row being removed and so lets the reconciler recover replicas +// after worker death + frontend restart (Bug-1). +// +// ON CONFLICT updates backend_type, model_opts_blob, and updated_at. Two +// frontends dispatching the same model concurrently with slightly different +// opts converge on whichever transaction committed last; that matches the +// existing per-replica blob semantics today. +func (r *NodeRegistry) UpsertModelLoadInfo(ctx context.Context, modelName, backendType string, optsBlob []byte) error { + if modelName == "" { + return fmt.Errorf("model name is required") + } + now := time.Now() + rec := ModelLoadInfo{ + ModelName: modelName, + BackendType: backendType, + ModelOptsBlob: optsBlob, + CreatedAt: now, + UpdatedAt: now, + } + return r.db.WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "model_name"}}, + DoUpdates: clause.Assignments(map[string]any{ + "backend_type": backendType, + "model_opts_blob": optsBlob, + "updated_at": now, + }), + }).Create(&rec).Error +} + // GetModelLoadInfo retrieves the stored backend type and serialized model -// options from any existing loaded replica. Returns gorm.ErrRecordNotFound -// if no replica has stored options. +// options. Reads from the dedicated ModelLoadInfo table first (survives every +// NodeModel row being deleted); falls back to scanning loaded NodeModel rows +// for the load info stamped before any frontend in this cluster ran an +// UpsertModelLoadInfo (rolling-upgrade transition). Returns +// gorm.ErrRecordNotFound when neither source has an entry. func (r *NodeRegistry) GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error) { + var info ModelLoadInfo + err = r.db.WithContext(ctx).Where("model_name = ?", modelName).First(&info).Error + if err == nil { + return info.BackendType, info.ModelOptsBlob, nil + } + if !errors.Is(err, gorm.ErrRecordNotFound) { + return "", nil, err + } + var nm NodeModel err = r.db.WithContext(ctx). Where("model_name = ? AND state = ? AND model_opts_blob IS NOT NULL", modelName, "loaded"). diff --git a/core/services/nodes/registry_test.go b/core/services/nodes/registry_test.go index f57ca194e..a07398909 100644 --- a/core/services/nodes/registry_test.go +++ b/core/services/nodes/registry_test.go @@ -1100,4 +1100,69 @@ var _ = Describe("NodeRegistry", func() { "reserved capacity must remove a node from VRAM-aware candidates") }) }) + + Describe("ModelLoadInfo persistence (Bug-1)", func() { + It("survives every NodeModel row being removed", func() { + ctx := context.Background() + + // One node with one loaded replica + per-replica blob (the legacy path). + node := makeNode("li-1", "10.0.1.1:50051", 8_000_000_000) + Expect(registry.Register(ctx, node, true)).To(Succeed()) + Expect(registry.SetNodeModel(ctx, node.ID, "load-info-model", 0, "loaded", node.Address, 0)).To(Succeed()) + Expect(registry.SetNodeModelLoadInfo(ctx, node.ID, "load-info-model", 0, "llama-cpp", []byte("opts-v1"))).To(Succeed()) + + // Persist per-model via the new path (the dispatch hook does this). + Expect(registry.UpsertModelLoadInfo(ctx, "load-info-model", "llama-cpp", []byte("opts-v1"))).To(Succeed()) + + // Simulate worker death + MarkOffline reaping: every NodeModel row gone. + Expect(registry.RemoveAllNodeModelReplicas(ctx, node.ID, "load-info-model")).To(Succeed()) + + bt, blob, err := registry.GetModelLoadInfo(ctx, "load-info-model") + Expect(err).ToNot(HaveOccurred(), + "per-model load info must survive every NodeModel row going away") + Expect(bt).To(Equal("llama-cpp")) + Expect(blob).To(Equal([]byte("opts-v1"))) + }) + + It("ON CONFLICT updates backend type and opts (last-write-wins)", func() { + ctx := context.Background() + + Expect(registry.UpsertModelLoadInfo(ctx, "lww", "llama-cpp", []byte("v1"))).To(Succeed()) + Expect(registry.UpsertModelLoadInfo(ctx, "lww", "vllm", []byte("v2"))).To(Succeed()) + + bt, blob, err := registry.GetModelLoadInfo(ctx, "lww") + Expect(err).ToNot(HaveOccurred()) + Expect(bt).To(Equal("vllm")) + Expect(blob).To(Equal([]byte("v2"))) + }) + + It("falls back to legacy NodeModel blob when no per-model row exists", func() { + // Pre-fix rolling-upgrade path: a frontend that ran before the new + // table existed only wrote the per-replica blob. The new + // GetModelLoadInfo must still find it so an upgrade doesn't + // regress the reconciler for already-loaded models. + ctx := context.Background() + + node := makeNode("li-legacy", "10.0.1.2:50051", 8_000_000_000) + Expect(registry.Register(ctx, node, true)).To(Succeed()) + Expect(registry.SetNodeModel(ctx, node.ID, "legacy-model", 0, "loaded", node.Address, 0)).To(Succeed()) + Expect(registry.SetNodeModelLoadInfo(ctx, node.ID, "legacy-model", 0, "llama-cpp", []byte("legacy-opts"))).To(Succeed()) + + bt, blob, err := registry.GetModelLoadInfo(ctx, "legacy-model") + Expect(err).ToNot(HaveOccurred()) + Expect(bt).To(Equal("llama-cpp")) + Expect(blob).To(Equal([]byte("legacy-opts"))) + }) + + It("returns ErrRecordNotFound when neither source has the model", func() { + ctx := context.Background() + _, _, err := registry.GetModelLoadInfo(ctx, "never-loaded") + Expect(err).To(MatchError(gorm.ErrRecordNotFound)) + }) + + It("rejects empty model names", func() { + err := registry.UpsertModelLoadInfo(context.Background(), "", "llama-cpp", []byte("x")) + Expect(err).To(HaveOccurred()) + }) + }) }) diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index c29108846..a9b01e80c 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -156,12 +156,18 @@ func (r *SmartRouter) scheduleAndLoad(ctx context.Context, backendType, tracking xlog.Warn("Failed to record model on node", "node", node.Name, "model", trackingKey, "replica", replicaIndex, "error", err) } - // Store load metadata for future replica scale-ups by the reconciler + // Store load metadata for future replica scale-ups by the reconciler. + // Writes both per-replica (NodeModel.model_opts_blob) for backward compat + // and per-model (ModelLoadInfo table) so the reconciler can recover after + // every replica row has been removed (Bug-1). if modelOpts != nil { if optsBlob, marshalErr := proto.Marshal(modelOpts); marshalErr == nil { if storeErr := r.registry.SetNodeModelLoadInfo(ctx, node.ID, trackingKey, replicaIndex, backendType, optsBlob); storeErr != nil { xlog.Warn("Failed to store model load info", "node", node.Name, "model", trackingKey, "replica", replicaIndex, "error", storeErr) } + if storeErr := r.registry.UpsertModelLoadInfo(ctx, trackingKey, backendType, optsBlob); storeErr != nil { + xlog.Warn("Failed to upsert per-model load info", "model", trackingKey, "error", storeErr) + } } } diff --git a/core/services/nodes/router_test.go b/core/services/nodes/router_test.go index 5f944bce8..26237773e 100644 --- a/core/services/nodes/router_test.go +++ b/core/services/nodes/router_test.go @@ -159,6 +159,10 @@ func (f *fakeModelRouter) SetNodeModelLoadInfo(_ context.Context, _, _ string, _ return nil } +func (f *fakeModelRouter) UpsertModelLoadInfo(_ context.Context, _, _ string, _ []byte) error { + return nil +} + func (f *fakeModelRouter) GetModelLoadInfo(_ context.Context, _ string) (string, []byte, error) { return "", nil, fmt.Errorf("not found") } diff --git a/docs/content/features/distributed-mode.md b/docs/content/features/distributed-mode.md index 03a251b78..f9b09079a 100644 --- a/docs/content/features/distributed-mode.md +++ b/docs/content/features/distributed-mode.md @@ -490,6 +490,7 @@ The **Replica Reconciler** runs as a background process on the frontend: - **Scale down**: Removes idle replicas after 5 minutes of inactivity - **Maintain minimum**: Ensures `min_replicas` are always loaded (recovers from node failures) - **Eviction protection**: Models with auto-scaling enabled are never evicted below `min_replicas` +- **Restart-safe**: Per-model load metadata (backend type + `ModelOptions`) is persisted in the `model_load_infos` PostgreSQL table on the first successful dispatch, so a frontend restart or rolling upgrade does not require a fresh inference request to repopulate state before the reconciler can scale up replacement replicas. All fields are optional and composable: - Node selector only: pin model to matching nodes, single replica