From c778ad0f6d7fd618fedcd62265fab5ea0f82ebff Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 24 May 2026 21:47:31 +0000 Subject: [PATCH] test(distributed): add route-level integration test for X-LocalAI-Node middleware The existing unit tests in node_header_test.go drive the wrapper by calling mw(handler)(c) directly against a hand-built echo.Context. That misses regressions where the contract between the real Echo router and the wrapper breaks (e.g. middleware not on the writer chain when the handler runs, or a handler that writes via some surface the wrapper cannot see). Add one integration spec that dispatches a real HTTP request through e.ServeHTTP into a streaming handler shaped like chat.go's streaming branch: set SSE headers, write chunks, Flush. Asserts the X-LocalAI-Node header is on the response map BEFORE the first underlying write commits via an order recorder installed BELOW the wrapper in the writer chain (via a preceding middleware so the wrapper installed by ExposeNodeHeader wraps the recorder). Documents what is NOT exercised: ChatEndpoint and processStream are not wired end-to-end (ChatEndpoint depends on templates.Evaluator, MCP NATS, the LocalAI Assistant holder; processStream lives in core/http/endpoints/openai which imports core/http/middleware, so a regular import would create a cycle). processStream is covered separately in core/http/endpoints/openai/chat_stream_usage_test.go. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-7[1m] --- .../node_header_integration_test.go | 225 ++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 core/http/middleware/node_header_integration_test.go diff --git a/core/http/middleware/node_header_integration_test.go b/core/http/middleware/node_header_integration_test.go new file mode 100644 index 000000000..61e140efe --- /dev/null +++ b/core/http/middleware/node_header_integration_test.go @@ -0,0 +1,225 @@ +package middleware_test + +// Route-level integration coverage for the X-LocalAI-Node middleware. +// +// What this file pins (and why a separate spec on top of the unit tests +// in node_header_test.go): +// +// - The unit tests in node_header_test.go exercise the wrapper by +// invoking `mw(handler)(c)` directly against a hand-built +// echo.Context. That misses regressions where the contract between +// the real Echo router and the wrapper breaks: e.g. middleware +// installation via e.Use() loses the wrapper because the framework +// re-decorates c.Response().Writer after middleware setup, or a +// handler that bypasses c.Response().Writer (writing to some other +// captured surface). +// +// - This spec dispatches a real HTTP request through e.ServeHTTP into +// a streaming handler shaped like chat.go's streaming branch: set +// SSE headers, write chunks via c.Response().Write, Flush. It +// proves that: +// 1. Middleware installed via e.Use() is on the writer chain +// when the handler runs. +// 2. The wrapper's lazy maybeSet fires on the first underlying +// Write/Flush, so X-LocalAI-Node lands on the response map +// BEFORE the first body byte is committed. +// 3. The header is present in the recorded response (i.e. it +// isn't dropped because we tried to set it post-WriteHeader). +// +// Out of scope (and why): +// +// - We do NOT wire core/http/endpoints/openai.ChatEndpoint +// end-to-end. ChatEndpoint depends on templates.Evaluator, the +// MCP NATS client, and the LocalAI Assistant holder; standing +// those up just to assert header ordering is out of proportion to +// the property under test. The handler used here mirrors +// chat.go's streaming branch and exercises the SAME middleware → +// c.Response().Writer → SSE write path as production. If +// chat.go's streaming branch ever stops going through +// c.Response().Writer (e.g. it starts using a captured raw +// http.ResponseWriter from a different seam), this test will not +// notice; guard that with a code review checklist on chat.go. +// +// - We do NOT exercise the real processStream worker here. +// processStream lives in core/http/endpoints/openai, which itself +// imports core/http/middleware - a regular import from middleware +// into openai would create a cycle. processStream is independently +// covered in core/http/endpoints/openai/chat_stream_usage_test.go; +// the only behaviour we need at this layer is the writer-contract +// check above, which the synthetic SSE handler reproduces faithfully. + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + + "github.com/labstack/echo/v4" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/middleware" + "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" +) + +// orderRecorder snapshots the X-LocalAI-Node header value AT THE MOMENT +// the underlying writer is asked to commit each event. Any header set on +// the response map AFTER the first write/flush is dropped on the wire, +// so this is the ground-truth observation a real SSE client would see. +type orderRecorder struct { + http.ResponseWriter + mu sync.Mutex + events []string +} + +func (o *orderRecorder) record(ev string) { + o.mu.Lock() + defer o.mu.Unlock() + o.events = append(o.events, ev) +} + +func (o *orderRecorder) snapshot() []string { + o.mu.Lock() + defer o.mu.Unlock() + out := make([]string, len(o.events)) + copy(out, o.events) + return out +} + +func (o *orderRecorder) WriteHeader(code int) { + o.record(fmt.Sprintf("header:%d:node=%s", code, o.Header().Get(middleware.NodeHeaderName))) + o.ResponseWriter.WriteHeader(code) +} + +func (o *orderRecorder) Write(b []byte) (int, error) { + o.record(fmt.Sprintf("write:node=%s", o.Header().Get(middleware.NodeHeaderName))) + return o.ResponseWriter.Write(b) +} + +func (o *orderRecorder) Flush() { + o.record(fmt.Sprintf("flush:node=%s", o.Header().Get(middleware.NodeHeaderName))) + if f, ok := o.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + +var _ = Describe("ExposeNodeHeader middleware (route-level integration)", func() { + const ( + modelID = "integration-model" + fakeNodeID = "node-route-7" + ) + + var ( + ml *model.ModelLoader + appCfg *config.ApplicationConfig + ) + + BeforeEach(func() { + systemState, err := system.GetSystemState( + system.WithModelPath(GinkgoT().TempDir()), + ) + Expect(err).ToNot(HaveOccurred()) + ml = model.NewModelLoader(systemState) + + // Stamp the loader with a model entry that already has the + // node ID set. In production the SmartRouter stamps this + // during ml.Load before the first chunk is emitted; here we + // pre-stamp it because the assertion is about wire ordering + // (header-before-first-byte), not about ml.Load timing + // (which is covered separately in pkg/model/lookup_node_id_test.go). + m := model.NewModelWithClient(modelID, "10.0.0.1:50051", nil) + m.SetNodeID(fakeNodeID) + m.MarkHealthy() + store := model.NewInMemoryModelStore() + store.Set(modelID, m) + ml.SetModelStore(store) + + appCfg = config.NewApplicationConfig() + appCfg.ExposeNodeHeader = true + }) + + It("stamps X-LocalAI-Node before the first SSE byte via the real router + middleware chain", func() { + // Build a real Echo router. We need the tracker to sit BELOW + // the ExposeNodeHeader wrapper in the writer chain (so its + // recorded snapshot reflects what bytes-on-the-wire see AFTER + // the wrapper has had a chance to stamp the header). Install + // the tracker via a middleware that runs BEFORE + // ExposeNodeHeader; Echo's middleware execution order matches + // e.Use() call order, so the first Use() wraps the OUTER + // layer of the writer chain (i.e. the wrapper installed by + // the second Use() wraps the tracker installed by the first). + var ( + recorderMu sync.Mutex + tracker *orderRecorder + ) + e := echo.New() + e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + recorderMu.Lock() + tracker = &orderRecorder{ResponseWriter: c.Response().Writer} + c.Response().Writer = tracker + recorderMu.Unlock() + return next(c) + } + }) + e.Use(middleware.ExposeNodeHeader(appCfg, ml)) + + e.POST("/v1/chat/completions", func(c echo.Context) error { + // Mirror SetModelAndConfig: stash the model name on the + // per-request locals so the middleware's resolve closure + // can pick it up. Every real chat / completion handler + // goes through this contract. + c.Set(middleware.CONTEXT_LOCALS_KEY_MODEL_NAME, modelID) + + // SSE response prelude (same shape as chat.go). + c.Response().Header().Set("Content-Type", "text/event-stream") + c.Response().Header().Set("Cache-Control", "no-cache") + c.Response().Header().Set("Connection", "keep-alive") + + // Emit a handful of SSE chunks. The very first + // Write/Flush is what triggers the middleware + // wrapper's maybeSet, so the X-LocalAI-Node header + // MUST already be on the response map by the time the + // byte is committed. + for i := 0; i < 3; i++ { + _, err := c.Response().Write([]byte(fmt.Sprintf("data: chunk %d\n\n", i))) + if err != nil { + return err + } + c.Response().Flush() + } + return nil + }) + + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader("")) + rec := httptest.NewRecorder() + + e.ServeHTTP(rec, req) + + recorderMu.Lock() + Expect(tracker).ToNot(BeNil(), "handler must run and install the order recorder") + events := tracker.snapshot() + recorderMu.Unlock() + + Expect(rec.Code).To(Equal(http.StatusOK)) + Expect(rec.Header().Get(middleware.NodeHeaderName)).To(Equal(fakeNodeID), + "production contract: header must reach the wire on a streamed response") + + Expect(events).ToNot(BeEmpty(), + "expected at least one underlying-writer event from the streaming handler") + + // The very first observed event is the moment the wrapper + // commits to the wire. Its recorded node= value is what a + // real HTTP client would actually see; anything that lands + // AFTER this byte is invisible. + first := events[0] + Expect(first).To(ContainSubstring("node="+fakeNodeID), + "first writer event must carry the X-LocalAI-Node header (chain: middleware.Use -> e.POST -> handler.Write/Flush); got events: %v", events) + + // Body sanity: SSE chunks made it to the recorder. + Expect(rec.Body.String()).To(ContainSubstring("data: chunk 0")) + }) +})