From df8418cb2decb2e34f52961a2035d8f83007c937 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 24 May 2026 20:48:06 +0000 Subject: [PATCH] test(distributed): cover streaming X-LocalAI-Node header end-to-end The pre-existing buffered-handler tests only exercised maybeSetNodeHeader against a pre-populated ModelLoader store. They did nothing to verify that the streaming path attaches the header AFTER ml.Load has stamped a node ID on the model, which is exactly the ordering bug the streaming rendezvous chan fix addresses. Add a streaming integration spec that: - Builds a ModelLoader with a Model entry but NO node ID stamped on it (so any pre-Load read returns empty). - Installs a fake backend.ModelInferenceFunc that stamps the node ID onto the Model AT THE MOMENT IT IS CALLED, matching production timing where ModelRouterAdapter.Route does the stamp inside ml.Load. - Drives processStream with a per-request nodeIDCh and a handler-side loop that mirrors chat.go's flush ordering (read response, apply nodeIDCh header, write, flush). - Asserts the recorded X-LocalAI-Node header equals the node ID the fake backend stamped during the worker's ml.Load. With the pre-fix code the header would be empty because the request goroutine read the loader's node ID before the worker had stamped it. Cover three additional scenarios: - ExposeNodeHeader=false suppresses the header even after stamping (opt-in is sacred). - Two sequential requests each get THEIR OWN routing decision in the header, not the prior request's; this is the direct regression check for the original bug under load. - The SSE body is still written so we don't regress streaming output while attaching the header. All four specs use Ginkgo; no stdlib testing patterns. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-7[1m] Signed-off-by: Ettore Di Giacinto --- .../openai/node_header_stream_test.go | 256 ++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100644 core/http/endpoints/openai/node_header_stream_test.go diff --git a/core/http/endpoints/openai/node_header_stream_test.go b/core/http/endpoints/openai/node_header_stream_test.go new file mode 100644 index 000000000..8d7a2694c --- /dev/null +++ b/core/http/endpoints/openai/node_header_stream_test.go @@ -0,0 +1,256 @@ +package openai + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "time" + + "github.com/labstack/echo/v4" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" +) + +// Streaming integration coverage for the X-LocalAI-Node header. This is the +// test class that would have caught the pre-fix bug where the streaming +// branches of chat / completion set the header BEFORE ml.Load ran, so the +// header reflected the previous request's routing decision (or was empty +// on a cold cache) instead of THIS request's. +// +// The test drives the real streaming worker (processStream) plus a +// handler-side flush loop modelled on chat.go's. The fake +// backend.ModelInferenceFunc only stamps the loader's per-modelID NodeID +// AT THE MOMENT IT IS CALLED (i.e. AT THE MOMENT ml.Load would run), +// proving that the header is sourced from the post-Load value, not from +// any state that could have been stashed earlier on the request goroutine. +var _ = Describe("X-LocalAI-Node response header (streaming)", func() { + const ( + modelID = "qwen3.5-0.8b" + fakeNodeID = "node-stream-target" + ) + + var ( + origInference modelInferenceFunc + ml *model.ModelLoader + appCfg *config.ApplicationConfig + e *echo.Echo + rec *httptest.ResponseRecorder + c echo.Context + ) + + BeforeEach(func() { + origInference = backend.ModelInferenceFunc + + // In-memory ModelLoader and store. We pre-create a Model entry + // for the test model BUT we do NOT stamp NodeID yet: the fake + // ModelInferenceFunc stamps it when invoked, simulating what + // ModelRouterAdapter.Route does inside ml.Load in production. + ml = model.NewModelLoader(&system.SystemState{}) + fakeModel := model.NewModelWithClient(modelID, "10.0.0.1:50051", nil) + fakeModel.MarkHealthy() + store := model.NewInMemoryModelStore() + store.Set(modelID, fakeModel) + ml.SetModelStore(store) + + appCfg = config.NewApplicationConfig() + appCfg.ExposeNodeHeader = true + + e = echo.New() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + rec = httptest.NewRecorder() + c = e.NewContext(req, rec) + }) + + AfterEach(func() { + backend.ModelInferenceFunc = origInference + }) + + // stampNodeIDOnLoad replaces backend.ModelInferenceFunc with a stub + // that, when invoked (the moment ml.Load runs in the real flow): + // 1. Stamps the Model in the loader with `nodeID` (mimics + // ModelRouterAdapter.Route → m.SetNodeID). + // 2. Returns a predFunc that fires the token callback once with + // `tokenText` so the worker proceeds through its normal stream + // path. + // The stamping happens inside ModelInferenceFunc itself - NOT in the + // returned predFunc - to match production timing where the node ID + // is bound by ml.Load before any prediction work runs. + stampNodeIDOnLoad := func(nodeID, tokenText string) { + backend.ModelInferenceFunc = func( + ctx context.Context, s string, messages schema.Messages, + images, videos, audios []string, + loader *model.ModelLoader, c *config.ModelConfig, cl *config.ModelConfigLoader, + o *config.ApplicationConfig, + tokenCallback func(string, backend.TokenUsage) bool, + tools, toolChoice string, + logprobs, topLogprobs *int, + logitBias map[string]float64, + metadata map[string]string, + ) (func() (backend.LLMResponse, error), error) { + if loader != nil { + if m := loader.CheckIsLoaded(modelID); m != nil { + m.SetNodeID(nodeID) + } + } + predFunc := func() (backend.LLMResponse, error) { + if tokenCallback != nil { + tokenCallback(tokenText, backend.TokenUsage{ + Prompt: 5, + Completion: 3, + }) + } + return backend.LLMResponse{ + Response: tokenText, + Usage: backend.TokenUsage{Prompt: 5, Completion: 3}, + }, nil + } + return predFunc, nil + } + } + + // runStreamingHandler mirrors the relevant portion of chat.go's + // streaming branch: launch the worker, drain responses, attach the + // X-LocalAI-Node header from nodeIDCh before the first flush. It + // exercises the exact wiring that the production handler uses; if + // the wiring is wrong (Critical 1 regression) this loop will write + // SSE chunks to the recorder without ever attaching the header. + runStreamingHandler := func(req *schema.OpenAIRequest, cfg *config.ModelConfig) { + responses := make(chan schema.OpenAIResponse) + nodeIDCh := make(chan string, 1) + ended := make(chan error, 1) + + go func() { + _, err := processStream("prompt", req, cfg, nil, appCfg, ml, responses, "req-1", 0, nodeIDCh) + ended <- err + }() + + nodeIDApplied := false + applyNodeIDHeader := func() { + if nodeIDApplied { + return + } + select { + case nodeID := <-nodeIDCh: + nodeIDApplied = true + if nodeID != "" { + c.Response().Header().Set(NodeHeaderName, nodeID) + } + default: + } + } + + // Mirror chat.go's main streaming loop: read each chunk, + // call applyNodeIDHeader BEFORE writing/flushing, then write + // the chunk to the response writer. + LOOP: + for { + select { + case ev, ok := <-responses: + if !ok { + break LOOP + } + applyNodeIDHeader() + _, _ = c.Response().Write([]byte("data: chunk\n\n")) + c.Response().Flush() + _ = ev + case err := <-ended: + // Drain any remaining responses to let the worker exit. + if err == nil { + for ev := range responses { + _ = ev + } + } + break LOOP + case <-time.After(2 * time.Second): + Fail("streaming handler timed out waiting for chunks") + } + } + // Wait for worker to finish to keep AfterEach deterministic. + select { + case <-ended: + case <-time.After(2 * time.Second): + Fail("worker goroutine did not finish in time") + } + } + + It("attaches the picked node ID to a streaming response after ml.Load runs", func() { + stampNodeIDOnLoad(fakeNodeID, "hello") + + // Sanity check: before processStream runs, the model has NO + // node ID. This isolates the test from any pre-stamp shortcut + // that bypasses the post-Load read. + Expect(ml.CheckIsLoaded(modelID).NodeID()).To(BeEmpty(), + "precondition: node ID must be empty before processStream is called") + + req := &schema.OpenAIRequest{ + Context: context.Background(), + } + req.Model = modelID + + runStreamingHandler(req, &config.ModelConfig{}) + + Expect(rec.Header().Get(NodeHeaderName)).To(Equal(fakeNodeID), + "header must reflect the node ID stamped DURING ml.Load (this request's routing decision)") + }) + + It("omits the header when ExposeNodeHeader is off, even after the loader stamps a node ID", func() { + appCfg.ExposeNodeHeader = false + stampNodeIDOnLoad(fakeNodeID, "hello") + + req := &schema.OpenAIRequest{ + Context: context.Background(), + } + req.Model = modelID + + runStreamingHandler(req, &config.ModelConfig{}) + + Expect(rec.Header().Get(NodeHeaderName)).To(BeEmpty(), + "feature is opt-in; off must mean off even when a node ID is available") + }) + + It("does not attach the previous request's node ID to a new request (per-request scope)", func() { + // Round 1: route to node A and run the streaming handler. + stampNodeIDOnLoad("node-A", "first") + + req1 := &schema.OpenAIRequest{Context: context.Background()} + req1.Model = modelID + runStreamingHandler(req1, &config.ModelConfig{}) + Expect(rec.Header().Get(NodeHeaderName)).To(Equal("node-A")) + + // Round 2: a new request must independently re-route to node B + // and attach node-B to its own response. With the pre-fix bug + // (header set on the request goroutine BEFORE ml.Load), + // request 2 would carry node-A here because the worker + // goroutine hadn't yet overwritten the loader's stamp. + stampNodeIDOnLoad("node-B", "second") + rec = httptest.NewRecorder() + c = e.NewContext(httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil), rec) + + req2 := &schema.OpenAIRequest{Context: context.Background()} + req2.Model = modelID + runStreamingHandler(req2, &config.ModelConfig{}) + + Expect(rec.Header().Get(NodeHeaderName)).To(Equal("node-B"), + "each request must receive ITS OWN routing decision in the header, not the prior request's") + }) + + It("writes the SSE body unchanged when the streaming flow runs end-to-end", func() { + stampNodeIDOnLoad(fakeNodeID, "hello") + + req := &schema.OpenAIRequest{Context: context.Background()} + req.Model = modelID + runStreamingHandler(req, &config.ModelConfig{}) + + // Defensive: the harness above writes a stub SSE payload, so the + // body should never be empty when the worker produced tokens. + Expect(strings.Contains(rec.Body.String(), "data:")).To(BeTrue(), + "streaming body must contain at least one SSE chunk") + }) +})