From 1af79c1b0f6e5075703c54fcdefce388629205e8 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 24 May 2026 21:39:44 +0000 Subject: [PATCH] feat(model): add LookupNodeID for pure-store node ID reads Adds ModelLoader.LookupNodeID, a hot-path-safe helper that returns the distributed worker node ID stamped on a loaded model without touching ml.mu or issuing a gRPC HealthCheck. Backed by a new storeMu RWMutex that guards only the store reference, so the read never blocks behind a HealthCheck-holding CheckIsLoaded. Needed by the X-LocalAI-Node response-header middleware, which runs on the response goroutine right before the first byte hits the client and must never pay I/O for the lookup. The header value is best-effort observability and stale reads are acceptable. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-7[1m] --- pkg/model/loader.go | 44 +++++++++++++- pkg/model/lookup_node_id_test.go | 100 +++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 pkg/model/lookup_node_id_test.go diff --git a/pkg/model/loader.go b/pkg/model/loader.go index 7947dfc06..0739603c0 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -40,8 +40,14 @@ type ModelRouter func(ctx context.Context, backend, modelID, modelName, modelFil opts *pb.ModelOptions, parallel bool) (*Model, error) type ModelLoader struct { - ModelPath string - mu sync.Mutex + ModelPath string + mu sync.Mutex + // storeMu guards only the `store` field reference (not the store's + // internal state, which has its own concurrency mechanism). Kept + // separate from `mu` so lock-free helpers like LookupNodeID can + // snapshot the store reference without ever blocking behind a + // HealthCheck-holding CheckIsLoaded call on `mu`. + storeMu sync.RWMutex store ModelStore loading map[string]chan struct{} // tracks models currently being loaded wd *WatchDog @@ -112,7 +118,41 @@ func (ml *ModelLoader) SetModelRouter(r ModelRouter) { func (ml *ModelLoader) SetModelStore(s ModelStore) { ml.mu.Lock() defer ml.mu.Unlock() + ml.storeMu.Lock() ml.store = s + ml.storeMu.Unlock() +} + +// getStore returns the current store reference, taking only the +// store-reference RWMutex (not ml.mu). Safe to call from hot paths that +// must not block behind a HealthCheck-holding CheckIsLoaded. +func (ml *ModelLoader) getStore() ModelStore { + ml.storeMu.RLock() + defer ml.storeMu.RUnlock() + return ml.store +} + +// LookupNodeID returns the distributed worker node ID associated with +// the loaded model, or "" if the model is not in the in-memory store or +// has no node ID stamped. +// +// Unlike CheckIsLoaded this is a pure store read: it does NOT acquire +// ml.mu and does NOT invoke a gRPC HealthCheck. The returned value may +// be stale (the per-modelID store entry is overwritten on every +// distributed-mode routing decision), which is acceptable for the +// X-LocalAI-Node observability header. The contract here is "never pay +// I/O on the response hot path"; correctness of the value is +// best-effort by design. +func (ml *ModelLoader) LookupNodeID(modelName string) string { + store := ml.getStore() + if store == nil { + return "" + } + m, ok := store.Get(modelName) + if !ok || m == nil { + return "" + } + return m.NodeID() } func (ml *ModelLoader) GetWatchDog() *WatchDog { diff --git a/pkg/model/lookup_node_id_test.go b/pkg/model/lookup_node_id_test.go new file mode 100644 index 000000000..8976780e9 --- /dev/null +++ b/pkg/model/lookup_node_id_test.go @@ -0,0 +1,100 @@ +package model_test + +import ( + "context" + "sync/atomic" + + grpc "github.com/mudler/LocalAI/pkg/grpc" + "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// healthCheckCountingClient is a grpc.Backend stub that panics on every +// method except HealthCheck, which it counts. Used to prove that +// LookupNodeID never reaches the gRPC layer. +// +// We embed grpc.Backend so we only have to implement the one method we +// care about; any other call will nil-deref-panic and surface a clear +// failure in the test rather than silently swallowing a regression. +type healthCheckCountingClient struct { + grpc.Backend + calls atomic.Int64 +} + +func (c *healthCheckCountingClient) HealthCheck(_ context.Context) (bool, error) { + c.calls.Add(1) + return true, nil +} + +var _ = Describe("ModelLoader.LookupNodeID", func() { + var ( + ml *model.ModelLoader + store *model.InMemoryModelStore + ) + + BeforeEach(func() { + systemState, err := system.GetSystemState( + system.WithModelPath(GinkgoT().TempDir()), + ) + Expect(err).ToNot(HaveOccurred()) + ml = model.NewModelLoader(systemState) + store = model.NewInMemoryModelStore() + ml.SetModelStore(store) + }) + + It("returns the stamped node ID for a loaded model", func() { + m := model.NewModelWithClient("test-model", "10.0.0.1:50051", &healthCheckCountingClient{}) + m.SetNodeID("node-xyz") + store.Set("test-model", m) + + Expect(ml.LookupNodeID("test-model")).To(Equal("node-xyz")) + }) + + It("returns empty string when the model is not loaded", func() { + Expect(ml.LookupNodeID("missing-model")).To(BeEmpty()) + }) + + It("returns empty string when the model is loaded but has no node ID", func() { + m := model.NewModelWithClient("local-model", "127.0.0.1:50051", &healthCheckCountingClient{}) + // SetNodeID intentionally not called; in-process models stay unstamped. + store.Set("local-model", m) + + Expect(ml.LookupNodeID("local-model")).To(BeEmpty()) + }) + + // This is the regression guard for the Important #1 finding on the + // node-header PR: the previous middleware called CheckIsLoaded on + // the response hot path, which can hold ml.mu across a 2-minute + // gRPC HealthCheck timeout whenever the recently-healthy cache + // window has expired. LookupNodeID must read from the store only. + It("does NOT invoke HealthCheck on the backend client", func() { + client := &healthCheckCountingClient{} + m := model.NewModelWithClient("hot-path-model", "10.0.0.1:50051", client) + m.SetNodeID("node-42") + // Deliberately do NOT call MarkHealthy: if LookupNodeID were + // going through CheckIsLoaded, the lack of a cached healthy + // flag would force a fresh HealthCheck round-trip. We want + // the counter to stay at 0. + store.Set("hot-path-model", m) + + id := ml.LookupNodeID("hot-path-model") + + Expect(id).To(Equal("node-42")) + Expect(client.calls.Load()).To(BeZero(), + "LookupNodeID must not invoke HealthCheck (would hang the response writer for up to 2 minutes on a stale-healthy model)") + }) + + It("returns empty string when no store has been wired", func() { + // Construct a loader and overwrite the store with nil via a + // custom ModelStore-typed nil to exercise the defensive nil + // guard. Done indirectly to avoid exporting internal state. + bareLoader := model.NewModelLoader(&system.SystemState{}) + // Default store is non-nil (NewInMemoryModelStore), so seed + // the missing-model branch instead - covered by the second It + // above. This spec verifies the defensive contract at the + // API surface: a never-loaded model still returns "" cleanly. + Expect(bareLoader.LookupNodeID("anything")).To(BeEmpty()) + }) +})