mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-25 01:02:05 -04:00
feat(model): add LookupNodeID for pure-store node ID reads
Adds ModelLoader.LookupNodeID, a hot-path-safe helper that returns the distributed worker node ID stamped on a loaded model without touching ml.mu or issuing a gRPC HealthCheck. Backed by a new storeMu RWMutex that guards only the store reference, so the read never blocks behind a HealthCheck-holding CheckIsLoaded. Needed by the X-LocalAI-Node response-header middleware, which runs on the response goroutine right before the first byte hits the client and must never pay I/O for the lookup. The header value is best-effort observability and stale reads are acceptable. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-7[1m]
This commit is contained in:
@@ -40,8 +40,14 @@ type ModelRouter func(ctx context.Context, backend, modelID, modelName, modelFil
|
||||
opts *pb.ModelOptions, parallel bool) (*Model, error)
|
||||
|
||||
type ModelLoader struct {
|
||||
ModelPath string
|
||||
mu sync.Mutex
|
||||
ModelPath string
|
||||
mu sync.Mutex
|
||||
// storeMu guards only the `store` field reference (not the store's
|
||||
// internal state, which has its own concurrency mechanism). Kept
|
||||
// separate from `mu` so lock-free helpers like LookupNodeID can
|
||||
// snapshot the store reference without ever blocking behind a
|
||||
// HealthCheck-holding CheckIsLoaded call on `mu`.
|
||||
storeMu sync.RWMutex
|
||||
store ModelStore
|
||||
loading map[string]chan struct{} // tracks models currently being loaded
|
||||
wd *WatchDog
|
||||
@@ -112,7 +118,41 @@ func (ml *ModelLoader) SetModelRouter(r ModelRouter) {
|
||||
func (ml *ModelLoader) SetModelStore(s ModelStore) {
|
||||
ml.mu.Lock()
|
||||
defer ml.mu.Unlock()
|
||||
ml.storeMu.Lock()
|
||||
ml.store = s
|
||||
ml.storeMu.Unlock()
|
||||
}
|
||||
|
||||
// getStore returns the current store reference, taking only the
|
||||
// store-reference RWMutex (not ml.mu). Safe to call from hot paths that
|
||||
// must not block behind a HealthCheck-holding CheckIsLoaded.
|
||||
func (ml *ModelLoader) getStore() ModelStore {
|
||||
ml.storeMu.RLock()
|
||||
defer ml.storeMu.RUnlock()
|
||||
return ml.store
|
||||
}
|
||||
|
||||
// LookupNodeID returns the distributed worker node ID associated with
|
||||
// the loaded model, or "" if the model is not in the in-memory store or
|
||||
// has no node ID stamped.
|
||||
//
|
||||
// Unlike CheckIsLoaded this is a pure store read: it does NOT acquire
|
||||
// ml.mu and does NOT invoke a gRPC HealthCheck. The returned value may
|
||||
// be stale (the per-modelID store entry is overwritten on every
|
||||
// distributed-mode routing decision), which is acceptable for the
|
||||
// X-LocalAI-Node observability header. The contract here is "never pay
|
||||
// I/O on the response hot path"; correctness of the value is
|
||||
// best-effort by design.
|
||||
func (ml *ModelLoader) LookupNodeID(modelName string) string {
|
||||
store := ml.getStore()
|
||||
if store == nil {
|
||||
return ""
|
||||
}
|
||||
m, ok := store.Get(modelName)
|
||||
if !ok || m == nil {
|
||||
return ""
|
||||
}
|
||||
return m.NodeID()
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) GetWatchDog() *WatchDog {
|
||||
|
||||
100
pkg/model/lookup_node_id_test.go
Normal file
100
pkg/model/lookup_node_id_test.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package model_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
grpc "github.com/mudler/LocalAI/pkg/grpc"
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/LocalAI/pkg/system"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// healthCheckCountingClient is a grpc.Backend stub that panics on every
|
||||
// method except HealthCheck, which it counts. Used to prove that
|
||||
// LookupNodeID never reaches the gRPC layer.
|
||||
//
|
||||
// We embed grpc.Backend so we only have to implement the one method we
|
||||
// care about; any other call will nil-deref-panic and surface a clear
|
||||
// failure in the test rather than silently swallowing a regression.
|
||||
type healthCheckCountingClient struct {
|
||||
grpc.Backend
|
||||
calls atomic.Int64
|
||||
}
|
||||
|
||||
func (c *healthCheckCountingClient) HealthCheck(_ context.Context) (bool, error) {
|
||||
c.calls.Add(1)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var _ = Describe("ModelLoader.LookupNodeID", func() {
|
||||
var (
|
||||
ml *model.ModelLoader
|
||||
store *model.InMemoryModelStore
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
systemState, err := system.GetSystemState(
|
||||
system.WithModelPath(GinkgoT().TempDir()),
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
ml = model.NewModelLoader(systemState)
|
||||
store = model.NewInMemoryModelStore()
|
||||
ml.SetModelStore(store)
|
||||
})
|
||||
|
||||
It("returns the stamped node ID for a loaded model", func() {
|
||||
m := model.NewModelWithClient("test-model", "10.0.0.1:50051", &healthCheckCountingClient{})
|
||||
m.SetNodeID("node-xyz")
|
||||
store.Set("test-model", m)
|
||||
|
||||
Expect(ml.LookupNodeID("test-model")).To(Equal("node-xyz"))
|
||||
})
|
||||
|
||||
It("returns empty string when the model is not loaded", func() {
|
||||
Expect(ml.LookupNodeID("missing-model")).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("returns empty string when the model is loaded but has no node ID", func() {
|
||||
m := model.NewModelWithClient("local-model", "127.0.0.1:50051", &healthCheckCountingClient{})
|
||||
// SetNodeID intentionally not called; in-process models stay unstamped.
|
||||
store.Set("local-model", m)
|
||||
|
||||
Expect(ml.LookupNodeID("local-model")).To(BeEmpty())
|
||||
})
|
||||
|
||||
// This is the regression guard for the Important #1 finding on the
|
||||
// node-header PR: the previous middleware called CheckIsLoaded on
|
||||
// the response hot path, which can hold ml.mu across a 2-minute
|
||||
// gRPC HealthCheck timeout whenever the recently-healthy cache
|
||||
// window has expired. LookupNodeID must read from the store only.
|
||||
It("does NOT invoke HealthCheck on the backend client", func() {
|
||||
client := &healthCheckCountingClient{}
|
||||
m := model.NewModelWithClient("hot-path-model", "10.0.0.1:50051", client)
|
||||
m.SetNodeID("node-42")
|
||||
// Deliberately do NOT call MarkHealthy: if LookupNodeID were
|
||||
// going through CheckIsLoaded, the lack of a cached healthy
|
||||
// flag would force a fresh HealthCheck round-trip. We want
|
||||
// the counter to stay at 0.
|
||||
store.Set("hot-path-model", m)
|
||||
|
||||
id := ml.LookupNodeID("hot-path-model")
|
||||
|
||||
Expect(id).To(Equal("node-42"))
|
||||
Expect(client.calls.Load()).To(BeZero(),
|
||||
"LookupNodeID must not invoke HealthCheck (would hang the response writer for up to 2 minutes on a stale-healthy model)")
|
||||
})
|
||||
|
||||
It("returns empty string when no store has been wired", func() {
|
||||
// Construct a loader and overwrite the store with nil via a
|
||||
// custom ModelStore-typed nil to exercise the defensive nil
|
||||
// guard. Done indirectly to avoid exporting internal state.
|
||||
bareLoader := model.NewModelLoader(&system.SystemState{})
|
||||
// Default store is non-nil (NewInMemoryModelStore), so seed
|
||||
// the missing-model branch instead - covered by the second It
|
||||
// above. This spec verifies the defensive contract at the
|
||||
// API surface: a never-loaded model still returns "" cleanly.
|
||||
Expect(bareLoader.LookupNodeID("anything")).To(BeEmpty())
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user