refactor(openai): drop nodeIDCh from streaming workers

processStream and processStreamWithTools no longer accept a node-ID
rendezvous chan: X-LocalAI-Node attribution is now handled by
middleware.ExposeNodeHeader at the response writer wrapper layer.
Drop the publishNodeID closure and the unused chan parameter. Update
the corresponding workers tests to match the new signatures.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7[1m]
This commit is contained in:
Ettore Di Giacinto
2026-05-24 21:23:20 +00:00
parent 1c4bdfd1d6
commit 04407d24f3
2 changed files with 12 additions and 73 deletions

View File

@@ -288,7 +288,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun
responses := make(chan schema.OpenAIResponse)
done := drainResponses(responses)
actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, nil)
actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0)
<-done
Expect(err).ToNot(HaveOccurred())
@@ -306,7 +306,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun
responses := make(chan schema.OpenAIResponse)
done := drainResponses(responses)
actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, nil)
actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0)
<-done
Expect(err).ToNot(HaveOccurred())
@@ -328,7 +328,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun
done := drainResponses(responses)
var textContent string
actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent, nil)
actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent)
<-done
Expect(err).ToNot(HaveOccurred())
@@ -351,7 +351,7 @@ var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", fun
done := drainResponses(responses)
var textContent string
actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent, nil)
actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent)
<-done
Expect(err).ToNot(HaveOccurred())

View File

@@ -22,13 +22,12 @@ import (
// it while this function runs; processStream closes the channel before
// returning.
//
// `nodeIDCh` (optional; may be nil) carries the picked distributed-node
// ID for this specific request back to the HTTP handler so it can attach
// the X-LocalAI-Node response header before the first SSE flush. The
// worker pushes the ID exactly once, AFTER ml.Load has run (i.e. on the
// first token callback invocation, which is the earliest moment the
// loader's per-modelID stamp is guaranteed populated). The chan must be
// buffered with capacity at least 1 so the send is always non-blocking.
// X-LocalAI-Node attribution (when --expose-node-header is on) is
// handled by middleware.ExposeNodeHeader at the response writer wrapper
// layer; no in-band signal from the worker is needed. The initial
// role=assistant chunk is still emitted from the first token callback
// rather than eagerly here, so the wrapper's lazy lookup against the
// loader runs AFTER ml.Load has stamped the per-modelID node ID.
func processStream(
s string,
req *schema.OpenAIRequest,
@@ -39,14 +38,7 @@ func processStream(
responses chan schema.OpenAIResponse,
id string,
created int,
nodeIDCh chan<- string,
) (backend.TokenUsage, error) {
// The initial "role=assistant" SSE chunk used to be sent eagerly here,
// before ml.Load ran. That ordering broke streaming X-LocalAI-Node
// attribution: the handler flushed headers (and thus the wrong/missing
// node ID) before the loader had picked a replica. The chunk is now
// emitted from the first token callback instead, after the node-ID
// rendezvous push below.
sentInitialRole := false
// Detect if thinking token is already in prompt or template
@@ -60,32 +52,7 @@ func processStream(
thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig)
extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig)
nodeIDSent := false
publishNodeID := func() {
if nodeIDSent {
return
}
nodeIDSent = true
if nodeIDCh == nil {
return
}
// Non-blocking send. The chan is sized so this never blocks; the
// guard protects against accidental misuse (unbuffered chan or
// missing reader) so we don't deadlock the streaming worker for
// a best-effort observability header.
select {
case nodeIDCh <- resolveNodeID(startupOptions, loader, req.Model):
default:
}
}
_, finalUsage, _, err := ComputeChoices(req, s, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, tokenUsage backend.TokenUsage) bool {
// First callback fires only after ml.Load has returned, so the
// loader's per-modelID node stamp is populated. Push it before
// any SSE chunk leaves the worker so the handler can attach the
// X-LocalAI-Node header before its first flush.
publishNodeID()
var reasoningDelta, contentDelta string
// Always keep the Go-side extractor in sync with raw tokens so it
@@ -132,10 +99,6 @@ func processStream(
}
return true
})
// If the backend produced zero tokens (no callback fired), publish
// the node ID anyway so the handler doesn't wait forever and the
// header is still set for the resulting empty stream.
publishNodeID()
close(responses)
return finalUsage, err
}
@@ -161,7 +124,6 @@ func processStreamWithTools(
id string,
created int,
textContentToReturn *string,
nodeIDCh chan<- string,
) (backend.TokenUsage, error) {
// Detect if thinking token is already in prompt or template
var template string
@@ -180,29 +142,10 @@ func processStreamWithTools(
hasChatDeltaToolCalls := false
hasChatDeltaContent := false
// See processStream for the rationale behind nodeIDCh: it carries the
// picked node ID back to the handler so the X-LocalAI-Node header can
// be attached before the first SSE flush. We publish from the first
// token callback (ml.Load has run by then) and on the fallthrough
// after ComputeChoices returns so empty-completion streams still
// signal the handler.
nodeIDSent := false
publishNodeID := func() {
if nodeIDSent {
return
}
nodeIDSent = true
if nodeIDCh == nil {
return
}
select {
case nodeIDCh <- resolveNodeID(startupOptions, loader, req.Model):
default:
}
}
// X-LocalAI-Node attribution is handled by middleware.ExposeNodeHeader
// at the wrapper layer; no in-band signalling from this worker.
_, finalUsage, chatDeltas, err := ComputeChoices(req, prompt, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool {
publishNodeID()
result += s
// Track whether ChatDeltas from the C++ autoparser contain
@@ -393,10 +336,6 @@ func processStreamWithTools(
return false
},
)
// Backend produced zero tokens (e.g. immediate error/return path).
// Publish whatever node ID the loader has for this model so the handler
// can attach the header before flushing the error chunk.
publishNodeID()
if err != nil {
return finalUsage, err
}