diff --git a/core/http/endpoints/openai/chat_stream_usage_test.go b/core/http/endpoints/openai/chat_stream_usage_test.go index 8b542e4f6..7f7338e55 100644 --- a/core/http/endpoints/openai/chat_stream_usage_test.go +++ b/core/http/endpoints/openai/chat_stream_usage_test.go @@ -288,7 +288,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun responses := make(chan schema.OpenAIResponse) done := drainResponses(responses) - actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, nil) + actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0) <-done Expect(err).ToNot(HaveOccurred()) @@ -306,7 +306,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun responses := make(chan schema.OpenAIResponse) done := drainResponses(responses) - actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, nil) + actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0) <-done Expect(err).ToNot(HaveOccurred()) @@ -328,7 +328,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun done := drainResponses(responses) var textContent string - actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent, nil) + actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent) <-done Expect(err).ToNot(HaveOccurred()) @@ -351,7 +351,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun done := drainResponses(responses) var textContent string - actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent, nil) + actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent) <-done Expect(err).ToNot(HaveOccurred()) diff --git a/core/http/endpoints/openai/chat_stream_workers.go b/core/http/endpoints/openai/chat_stream_workers.go index ef7059d3a..e5eb2d1ca 100644 --- a/core/http/endpoints/openai/chat_stream_workers.go +++ b/core/http/endpoints/openai/chat_stream_workers.go @@ -22,13 +22,12 @@ import ( // it while this function runs; processStream closes the channel before // returning. // -// `nodeIDCh` (optional; may be nil) carries the picked distributed-node -// ID for this specific request back to the HTTP handler so it can attach -// the X-LocalAI-Node response header before the first SSE flush. The -// worker pushes the ID exactly once, AFTER ml.Load has run (i.e. on the -// first token callback invocation, which is the earliest moment the -// loader's per-modelID stamp is guaranteed populated). The chan must be -// buffered with capacity at least 1 so the send is always non-blocking. +// X-LocalAI-Node attribution (when --expose-node-header is on) is +// handled by middleware.ExposeNodeHeader at the response writer wrapper +// layer; no in-band signal from the worker is needed. The initial +// role=assistant chunk is still emitted from the first token callback +// rather than eagerly here, so the wrapper's lazy lookup against the +// loader runs AFTER ml.Load has stamped the per-modelID node ID. func processStream( s string, req *schema.OpenAIRequest, @@ -39,14 +38,7 @@ func processStream( responses chan schema.OpenAIResponse, id string, created int, - nodeIDCh chan<- string, ) (backend.TokenUsage, error) { - // The initial "role=assistant" SSE chunk used to be sent eagerly here, - // before ml.Load ran. That ordering broke streaming X-LocalAI-Node - // attribution: the handler flushed headers (and thus the wrong/missing - // node ID) before the loader had picked a replica. The chunk is now - // emitted from the first token callback instead, after the node-ID - // rendezvous push below. sentInitialRole := false // Detect if thinking token is already in prompt or template @@ -60,32 +52,7 @@ func processStream( thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig) extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig) - nodeIDSent := false - publishNodeID := func() { - if nodeIDSent { - return - } - nodeIDSent = true - if nodeIDCh == nil { - return - } - // Non-blocking send. The chan is sized so this never blocks; the - // guard protects against accidental misuse (unbuffered chan or - // missing reader) so we don't deadlock the streaming worker for - // a best-effort observability header. - select { - case nodeIDCh <- resolveNodeID(startupOptions, loader, req.Model): - default: - } - } - _, finalUsage, _, err := ComputeChoices(req, s, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, tokenUsage backend.TokenUsage) bool { - // First callback fires only after ml.Load has returned, so the - // loader's per-modelID node stamp is populated. Push it before - // any SSE chunk leaves the worker so the handler can attach the - // X-LocalAI-Node header before its first flush. - publishNodeID() - var reasoningDelta, contentDelta string // Always keep the Go-side extractor in sync with raw tokens so it @@ -132,10 +99,6 @@ func processStream( } return true }) - // If the backend produced zero tokens (no callback fired), publish - // the node ID anyway so the handler doesn't wait forever and the - // header is still set for the resulting empty stream. - publishNodeID() close(responses) return finalUsage, err } @@ -161,7 +124,6 @@ func processStreamWithTools( id string, created int, textContentToReturn *string, - nodeIDCh chan<- string, ) (backend.TokenUsage, error) { // Detect if thinking token is already in prompt or template var template string @@ -180,29 +142,10 @@ func processStreamWithTools( hasChatDeltaToolCalls := false hasChatDeltaContent := false - // See processStream for the rationale behind nodeIDCh: it carries the - // picked node ID back to the handler so the X-LocalAI-Node header can - // be attached before the first SSE flush. We publish from the first - // token callback (ml.Load has run by then) and on the fallthrough - // after ComputeChoices returns so empty-completion streams still - // signal the handler. - nodeIDSent := false - publishNodeID := func() { - if nodeIDSent { - return - } - nodeIDSent = true - if nodeIDCh == nil { - return - } - select { - case nodeIDCh <- resolveNodeID(startupOptions, loader, req.Model): - default: - } - } + // X-LocalAI-Node attribution is handled by middleware.ExposeNodeHeader + // at the wrapper layer; no in-band signalling from this worker. _, finalUsage, chatDeltas, err := ComputeChoices(req, prompt, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool { - publishNodeID() result += s // Track whether ChatDeltas from the C++ autoparser contain @@ -393,10 +336,6 @@ func processStreamWithTools( return false }, ) - // Backend produced zero tokens (e.g. immediate error/return path). - // Publish whatever node ID the loader has for this model so the handler - // can attach the header before flushing the error chunk. - publishNodeID() if err != nil { return finalUsage, err }