fix(distributed): persist per-model load info so reconciler survives frontend restart (#9981)

* feat(distributed): add per-model ModelLoadInfo persistence

Adds a dedicated ModelLoadInfo table keyed by model name, decoupled from
the per-replica NodeModel rows. The reconciler can now recover model load
metadata after every NodeModel row has been removed (worker death,
eviction, MarkOffline reaping, frontend restart with stale heartbeats),
which is the read side of Bug-1 from the distributed mode bug hunt.

Registry exposes:
  - UpsertModelLoadInfo: ON CONFLICT (model_name) update; last-write-wins,
    matching the existing per-replica blob semantics under concurrent
    multi-frontend dispatch.
  - GetModelLoadInfo: read from the new table first; fall back to the
    legacy NodeModel-blob scan for rows written before any frontend in
    the cluster ran an UpsertModelLoadInfo (rolling-upgrade transition).

SetNodeModelLoadInfo (per-replica blob) is preserved for backward
compatibility and per-replica diagnostics; the dispatch-path hook in the
next commit calls both.

The new table joins the existing nodes AutoMigrate set under the same
schema-migration advisory lock.

Refs: Bug-1, docs/superpowers/specs/2026-05-24-distributed-mode-bug-hunt-findings.md

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7[1m]

* fix(distributed): persist per-model load info on dispatch

scheduleAndLoad now writes the (backendType, ModelOptions blob) pair to
the new ModelLoadInfo table in addition to the existing per-replica
NodeModel.model_opts_blob field. The per-replica blob still works for
the hot path; the per-model row outlives every NodeModel row going away,
which is what unblocks the reconciler on the read side.

Both writes are best-effort with warn-level logging on failure: a write
miss here just means the reconciler may need a fresh inference request
to repopulate, which is the pre-fix behavior.

Concurrency: two frontends loading the same model at the same time both
fire UpsertModelLoadInfo; ON CONFLICT (model_name) makes the row
converge to whichever commits last. Matches the existing per-replica
blob semantics.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7[1m]

* test(distributed): cover load info persistence and Bug-1 recovery

Adds Ginkgo specs that prove the persistence layer behaves correctly and
that the reconciler actually recovers from the frontend-restart scenario
that was failing in production:

registry_test.go:
  - per-model row survives RemoveAllNodeModelReplicas (the bug repro)
  - ON CONFLICT (model_name) updates backend type + blob, last-write-wins
  - legacy NodeModel-blob fallback still works (rolling-upgrade transition)
  - GetModelLoadInfo returns ErrRecordNotFound when both sources are empty
  - UpsertModelLoadInfo rejects empty model names

reconciler_test.go:
  - Bug-1 end-to-end: with min_replicas=2, no NodeModel rows, but a
    ModelLoadInfo row present, one reconcile tick fires two scheduler
    calls. Pre-fix this returned "no load info" and the scheduler never
    got called until a fresh inference request arrived.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7[1m]

* docs(distributed): note restart-safe reconciler behavior

Adds a bullet to the Replica Reconciler section explaining that per-model
load metadata is persisted across frontend restarts via the new
model_load_infos PostgreSQL table, so a rolling upgrade no longer needs a
fresh inference request per model before the reconciler can replace dead
replicas.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7[1m]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-05-25 13:00:06 +02:00
committed by GitHub
parent 06e777b75e
commit a891eedd08
8 changed files with 194 additions and 4 deletions

View File

@@ -17,6 +17,7 @@ type ModelRouter interface {
TouchNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int)
SetNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int, state, address string, initialInFlight int) error
SetNodeModelLoadInfo(ctx context.Context, nodeID, modelName string, replicaIndex int, backendType string, optsBlob []byte) error
UpsertModelLoadInfo(ctx context.Context, modelName, backendType string, optsBlob []byte) error
GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error)
NextFreeReplicaIndex(ctx context.Context, nodeID, modelName string, maxSlots int) (int, error)
CountReplicasOnNode(ctx context.Context, nodeID, modelName string) (int, error)

View File

@@ -56,6 +56,9 @@ func (f *fakeModelRouterForSmartRouter) SetNodeModel(_ context.Context, _, _ str
func (f *fakeModelRouterForSmartRouter) SetNodeModelLoadInfo(_ context.Context, _, _ string, _ int, _ string, _ []byte) error {
return nil
}
func (f *fakeModelRouterForSmartRouter) UpsertModelLoadInfo(_ context.Context, _, _ string, _ []byte) error {
return nil
}
func (f *fakeModelRouterForSmartRouter) GetModelLoadInfo(_ context.Context, _ string) (string, []byte, error) {
return "", nil, fmt.Errorf("not found")
}

View File

@@ -423,6 +423,47 @@ var _ = Describe("ReplicaReconciler", func() {
Expect(cap).To(Equal(2))
})
})
Context("frontend-restart scenario (Bug-1)", func() {
It("recovers replicas after every NodeModel row has been removed", func() {
ctx := context.Background()
// One healthy node. UpsertModelLoadInfo records per-model metadata
// independently of any NodeModel row, mirroring what
// scheduleAndLoad does on a successful dispatch.
node := registerNode("restart-node", "10.0.20.1:50051")
setSchedulingConfig("restart-model", 2, 4, "")
Expect(registry.UpsertModelLoadInfo(ctx, "restart-model", "llama-cpp", []byte("opts-from-pre-restart"))).To(Succeed())
// Simulate the bug: between frontend instances the NodeModel rows
// are wiped (MarkOffline path, stale-heartbeat reaping). The
// per-model load info row stays because it's not tied to any
// (node, replica) slot.
Expect(registry.RemoveAllNodeModelReplicas(ctx, node.ID, "restart-model")).To(Succeed())
// Pre-fix: GetModelLoadInfo returned ErrRecordNotFound here and
// the reconciler logged "no load info" every 30s until a manual
// inference request arrived.
bt, blob, err := registry.GetModelLoadInfo(ctx, "restart-model")
Expect(err).ToNot(HaveOccurred())
Expect(bt).To(Equal("llama-cpp"))
Expect(blob).To(Equal([]byte("opts-from-pre-restart")))
// And the reconciler tick should now call into the scheduler
// for the missing replicas instead of bailing out.
scheduler := &fakeScheduler{scheduleNode: node}
reconciler := NewReplicaReconciler(ReplicaReconcilerOptions{
Registry: registry,
Scheduler: scheduler,
DB: db,
})
reconciler.reconcile(ctx)
Expect(scheduler.scheduleCalls).ToNot(BeEmpty(),
"reconciler must call the scheduler after a frontend restart that wiped NodeModel rows")
Expect(scheduler.scheduleCalls[0].modelName).To(Equal("restart-model"))
})
})
})
// fakeProber lets tests control whether a model's gRPC address "responds".

View File

@@ -94,6 +94,31 @@ type NodeModel struct {
UpdatedAt time.Time `json:"updated_at"`
}
// ModelLoadInfo is per-model load metadata kept independently of NodeModel rows
// so the Replica Reconciler can re-load a model after every replica row has
// been removed (worker death, eviction, MarkOffline reaping, frontend restart
// with stale heartbeats).
//
// Why a separate table when the same blob is also stamped on each NodeModel
// row? NodeModel rows are tied to a live (node, replica) slot and get deleted
// when a backend stops being healthy. Tying the only copy of load info to
// that lifecycle is exactly what caused Bug-1: a frontend restart followed by
// transient worker-row removal left no copy of ModelOptions, so the reconciler
// could not bring `min_replicas` back without a fresh inference request.
//
// Keyed by ModelName (the tracking key used by the router); last-write-wins
// on the opts blob because two concurrent frontends dispatching the same
// model with slightly different opts converge on whichever finished last.
// That is identical to the per-NodeModel-row semantics today; if a stronger
// guarantee is needed in the future, the row carries UpdatedAt for ordering.
type ModelLoadInfo struct {
ModelName string `gorm:"primaryKey;size:255" json:"model_name"`
BackendType string `gorm:"size:128" json:"backend_type"`
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// NodeLabel is a key-value label on a node (like K8s labels).
type NodeLabel struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
@@ -178,7 +203,7 @@ type NodeRegistry struct {
// when multiple instances (frontend + workers) start at the same time.
func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) {
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{})
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{}, &ModelLoadInfo{})
}); err != nil {
return nil, fmt.Errorf("migrating node tables: %w", err)
}
@@ -622,10 +647,54 @@ func (r *NodeRegistry) SetNodeModelLoadInfo(ctx context.Context, nodeID, modelNa
Updates(map[string]any{"backend_type": backendType, "model_opts_blob": optsBlob}).Error
}
// UpsertModelLoadInfo records or replaces the per-model load info in the
// dedicated ModelLoadInfo table. Unlike SetNodeModelLoadInfo (which writes the
// blob onto a specific replica row and dies with it), this survives every
// NodeModel row being removed and so lets the reconciler recover replicas
// after worker death + frontend restart (Bug-1).
//
// ON CONFLICT updates backend_type, model_opts_blob, and updated_at. Two
// frontends dispatching the same model concurrently with slightly different
// opts converge on whichever transaction committed last; that matches the
// existing per-replica blob semantics today.
func (r *NodeRegistry) UpsertModelLoadInfo(ctx context.Context, modelName, backendType string, optsBlob []byte) error {
if modelName == "" {
return fmt.Errorf("model name is required")
}
now := time.Now()
rec := ModelLoadInfo{
ModelName: modelName,
BackendType: backendType,
ModelOptsBlob: optsBlob,
CreatedAt: now,
UpdatedAt: now,
}
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "model_name"}},
DoUpdates: clause.Assignments(map[string]any{
"backend_type": backendType,
"model_opts_blob": optsBlob,
"updated_at": now,
}),
}).Create(&rec).Error
}
// GetModelLoadInfo retrieves the stored backend type and serialized model
// options from any existing loaded replica. Returns gorm.ErrRecordNotFound
// if no replica has stored options.
// options. Reads from the dedicated ModelLoadInfo table first (survives every
// NodeModel row being deleted); falls back to scanning loaded NodeModel rows
// for the load info stamped before any frontend in this cluster ran an
// UpsertModelLoadInfo (rolling-upgrade transition). Returns
// gorm.ErrRecordNotFound when neither source has an entry.
func (r *NodeRegistry) GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error) {
var info ModelLoadInfo
err = r.db.WithContext(ctx).Where("model_name = ?", modelName).First(&info).Error
if err == nil {
return info.BackendType, info.ModelOptsBlob, nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return "", nil, err
}
var nm NodeModel
err = r.db.WithContext(ctx).
Where("model_name = ? AND state = ? AND model_opts_blob IS NOT NULL", modelName, "loaded").

View File

@@ -1100,4 +1100,69 @@ var _ = Describe("NodeRegistry", func() {
"reserved capacity must remove a node from VRAM-aware candidates")
})
})
Describe("ModelLoadInfo persistence (Bug-1)", func() {
It("survives every NodeModel row being removed", func() {
ctx := context.Background()
// One node with one loaded replica + per-replica blob (the legacy path).
node := makeNode("li-1", "10.0.1.1:50051", 8_000_000_000)
Expect(registry.Register(ctx, node, true)).To(Succeed())
Expect(registry.SetNodeModel(ctx, node.ID, "load-info-model", 0, "loaded", node.Address, 0)).To(Succeed())
Expect(registry.SetNodeModelLoadInfo(ctx, node.ID, "load-info-model", 0, "llama-cpp", []byte("opts-v1"))).To(Succeed())
// Persist per-model via the new path (the dispatch hook does this).
Expect(registry.UpsertModelLoadInfo(ctx, "load-info-model", "llama-cpp", []byte("opts-v1"))).To(Succeed())
// Simulate worker death + MarkOffline reaping: every NodeModel row gone.
Expect(registry.RemoveAllNodeModelReplicas(ctx, node.ID, "load-info-model")).To(Succeed())
bt, blob, err := registry.GetModelLoadInfo(ctx, "load-info-model")
Expect(err).ToNot(HaveOccurred(),
"per-model load info must survive every NodeModel row going away")
Expect(bt).To(Equal("llama-cpp"))
Expect(blob).To(Equal([]byte("opts-v1")))
})
It("ON CONFLICT updates backend type and opts (last-write-wins)", func() {
ctx := context.Background()
Expect(registry.UpsertModelLoadInfo(ctx, "lww", "llama-cpp", []byte("v1"))).To(Succeed())
Expect(registry.UpsertModelLoadInfo(ctx, "lww", "vllm", []byte("v2"))).To(Succeed())
bt, blob, err := registry.GetModelLoadInfo(ctx, "lww")
Expect(err).ToNot(HaveOccurred())
Expect(bt).To(Equal("vllm"))
Expect(blob).To(Equal([]byte("v2")))
})
It("falls back to legacy NodeModel blob when no per-model row exists", func() {
// Pre-fix rolling-upgrade path: a frontend that ran before the new
// table existed only wrote the per-replica blob. The new
// GetModelLoadInfo must still find it so an upgrade doesn't
// regress the reconciler for already-loaded models.
ctx := context.Background()
node := makeNode("li-legacy", "10.0.1.2:50051", 8_000_000_000)
Expect(registry.Register(ctx, node, true)).To(Succeed())
Expect(registry.SetNodeModel(ctx, node.ID, "legacy-model", 0, "loaded", node.Address, 0)).To(Succeed())
Expect(registry.SetNodeModelLoadInfo(ctx, node.ID, "legacy-model", 0, "llama-cpp", []byte("legacy-opts"))).To(Succeed())
bt, blob, err := registry.GetModelLoadInfo(ctx, "legacy-model")
Expect(err).ToNot(HaveOccurred())
Expect(bt).To(Equal("llama-cpp"))
Expect(blob).To(Equal([]byte("legacy-opts")))
})
It("returns ErrRecordNotFound when neither source has the model", func() {
ctx := context.Background()
_, _, err := registry.GetModelLoadInfo(ctx, "never-loaded")
Expect(err).To(MatchError(gorm.ErrRecordNotFound))
})
It("rejects empty model names", func() {
err := registry.UpsertModelLoadInfo(context.Background(), "", "llama-cpp", []byte("x"))
Expect(err).To(HaveOccurred())
})
})
})

View File

@@ -156,12 +156,18 @@ func (r *SmartRouter) scheduleAndLoad(ctx context.Context, backendType, tracking
xlog.Warn("Failed to record model on node", "node", node.Name, "model", trackingKey, "replica", replicaIndex, "error", err)
}
// Store load metadata for future replica scale-ups by the reconciler
// Store load metadata for future replica scale-ups by the reconciler.
// Writes both per-replica (NodeModel.model_opts_blob) for backward compat
// and per-model (ModelLoadInfo table) so the reconciler can recover after
// every replica row has been removed (Bug-1).
if modelOpts != nil {
if optsBlob, marshalErr := proto.Marshal(modelOpts); marshalErr == nil {
if storeErr := r.registry.SetNodeModelLoadInfo(ctx, node.ID, trackingKey, replicaIndex, backendType, optsBlob); storeErr != nil {
xlog.Warn("Failed to store model load info", "node", node.Name, "model", trackingKey, "replica", replicaIndex, "error", storeErr)
}
if storeErr := r.registry.UpsertModelLoadInfo(ctx, trackingKey, backendType, optsBlob); storeErr != nil {
xlog.Warn("Failed to upsert per-model load info", "model", trackingKey, "error", storeErr)
}
}
}

View File

@@ -159,6 +159,10 @@ func (f *fakeModelRouter) SetNodeModelLoadInfo(_ context.Context, _, _ string, _
return nil
}
func (f *fakeModelRouter) UpsertModelLoadInfo(_ context.Context, _, _ string, _ []byte) error {
return nil
}
func (f *fakeModelRouter) GetModelLoadInfo(_ context.Context, _ string) (string, []byte, error) {
return "", nil, fmt.Errorf("not found")
}

View File

@@ -490,6 +490,7 @@ The **Replica Reconciler** runs as a background process on the frontend:
- **Scale down**: Removes idle replicas after 5 minutes of inactivity
- **Maintain minimum**: Ensures `min_replicas` are always loaded (recovers from node failures)
- **Eviction protection**: Models with auto-scaling enabled are never evicted below `min_replicas`
- **Restart-safe**: Per-model load metadata (backend type + `ModelOptions`) is persisted in the `model_load_infos` PostgreSQL table on the first successful dispatch, so a frontend restart or rolling upgrade does not require a fresh inference request to repopulate state before the reconciler can scale up replacement replicas.
All fields are optional and composable:
- Node selector only: pin model to matching nodes, single replica