Compare commits

...

1 Commits

Author SHA1 Message Date
Ettore Di Giacinto
390664ff72 fix(responses): classify streamed reasoning as a reasoning item live (#9658)
In the /v1/responses streaming handler a reasoning model's thinking
monologue was streamed to the client as normal message text (a msg_
output item with output_text.delta) and only reclassified into a
reasoning item after the stream completed. Subsequent output_text.delta
events also kept referencing the old msg_ item id instead of the
reasoning_ id.

Root causes:

1. The live reasoning item was gated on extractor.Reasoning(), which is
   only updated by the Go-side raw-tag parser (ProcessToken). When the
   C++ autoparser drives reasoning through reasoning_content ChatDeltas,
   the reasoning delta is computed via ProcessChatDeltaReasoning into a
   separate accumulator, so extractor.Reasoning() stays empty and the
   gate never fired. The reasoning item was thus only reconstructed at
   end-of-stream.

2. The non-tool-call path created the message/msg_ output item eagerly
   before any token, forcing reasoning to a higher output index and
   making mis-split <think> text land on the pre-existing message item.

3. Neither path carried the sticky preferAutoparser flag, so a
   content-only autoparser (the non-jinja pure-content fallback, #9985)
   could leak <think>...</think> tokens into content.

Extract the per-token reasoning-vs-message classification into a pure,
unit-tested streamReasoningRouter (mirroring chooseDeferredReasoning and
processStream in the chat streaming worker): it gates the reasoning item
on the reasoning delta, opens the message item lazily on the first
content delta, and keeps a sticky preferAutoparser fallback. Both
streaming paths now route reasoning deltas to the reasoning_ id and order
the reasoning item ahead of the message at completion.

Assisted-by: claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-12 21:39:42 +00:00
3 changed files with 471 additions and 227 deletions

View File

@@ -1648,6 +1648,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
var currentReasoningContentIndex int
var reasoningTokens int
extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig)
// router classifies each streamed token into reasoning vs message deltas
// and decides which output item they target. It encapsulates the
// sticky-preferAutoparser fallback and the reasoningDelta-based gate that
// fix issue #9658 (live reasoning was mis-routed onto the msg_ item and
// only re-classified as a reasoning item after the stream completed).
router := newStreamReasoningRouter(extractor)
// Collect all output items for storage
var collectedOutputItems []schema.ORItemField
@@ -1671,7 +1677,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// Reset reasoning and tool-call state for re-inference so reasoning
// extraction runs again on subsequent iterations
inToolCallMode = false
extractor.Reset()
router.resetForIteration()
currentMessageID = ""
lastEmittedToolCallCount = 0
currentReasoningID = ""
@@ -1832,110 +1838,101 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// If no tool calls detected yet, handle reasoning and text
if !inToolCallMode {
var reasoningDelta, contentDelta string
goReasoning, goContent := extractor.ProcessToken(token)
routing := router.route(token, tokenUsage)
if tokenUsage.HasChatDeltaContent() {
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
// Handle reasoning item. The reasoning item is opened lazily
// on the first reasoning delta - gating on routing, not
// extractor.Reasoning() (issue #9658): when the C++
// autoparser drives reasoning via reasoning_content,
// extractor.Reasoning() stays empty and the old gate dropped
// the live reasoning item.
if routing.OpenReasoningItem {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Handle reasoning item
if extractor.Reasoning() != "" {
// Check if we need to create reasoning item
if currentReasoningID == "" {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Emit reasoning delta if there's new content
if reasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(reasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Emit reasoning delta against the reasoning_ item id.
if routing.ReasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(routing.ReasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Only emit message content if there's actual content (not just reasoning)
if contentDelta != "" {
if currentMessageID == "" {
// Emit output_item.added for message
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem := &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.added
currentContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyPart,
})
sequenceNumber++
// Open the message item lazily on the first content delta.
if routing.OpenMessageItem {
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem := &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit text delta
// Emit content_part.added
currentContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Emit text delta against the msg_ item id.
if routing.ContentDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Delta: strPtr(contentDelta),
Delta: strPtr(routing.ContentDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
@@ -2331,112 +2328,109 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
return nil
}
// Non-tool-call streaming path
// Emit output_item.added for message
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem := &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.added
currentContentIndex = 0
emptyTextPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyTextPart,
})
sequenceNumber++
// Non-tool-call streaming path.
//
// The message output item is created LAZILY on the first content delta
// (mirroring the tool-call path), not eagerly before the first token.
// Issue #9658: an eager msg_ item forced reasoning to a higher output
// index and made mis-split <think> text land on the pre-existing message,
// so the thinking monologue streamed as message text instead of reasoning.
var messageItem *schema.ORItemField
// Stream text deltas with reasoning extraction
tokenCallback := func(token string, tokenUsage backend.TokenUsage) bool {
accumulatedText += token
var reasoningDelta, contentDelta string
goReasoning, goContent := extractor.ProcessToken(token)
routing := router.route(token, tokenUsage)
if tokenUsage.HasChatDeltaContent() {
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
// Open the reasoning item lazily on the first reasoning delta.
if routing.OpenReasoningItem {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Handle reasoning item
if extractor.Reasoning() != "" {
// Check if we need to create reasoning item
if currentReasoningID == "" {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Emit reasoning delta if there's new content
if reasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(reasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Emit reasoning delta against the reasoning_ item id.
if routing.ReasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(routing.ReasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Only emit message content if there's actual content (not just reasoning)
if contentDelta != "" {
// Emit text delta
// Open the message item lazily on the first content delta.
if routing.OpenMessageItem {
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem = &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.added
currentContentIndex = 0
emptyTextPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyTextPart,
})
sequenceNumber++
}
// Emit text delta against the msg_ item id.
if routing.ContentDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Delta: strPtr(contentDelta),
Delta: strPtr(routing.ContentDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
@@ -2561,40 +2555,78 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// Convert logprobs for streaming events
mcpStreamLogprobs := convertLogprobsForStreaming(noToolLogprobs)
// Emit output_text.done
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Text: strPtr(result),
Logprobs: logprobsPtr(mcpStreamLogprobs),
})
sequenceNumber++
// The message item is created lazily on the first content delta (issue
// #9658). If no content streamed but final extraction produced text (e.g.
// the autoparser delivered everything at once), open the message item now
// so the closing events below are valid. A pure-reasoning turn (no content
// at all) leaves messageItem nil and emits no message item.
if messageItem == nil && result != "" {
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem = &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.done (with actual logprobs)
resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs)
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &resultPart,
})
sequenceNumber++
currentContentIndex = 0
emptyTextPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyTextPart,
})
sequenceNumber++
}
// Emit output_item.done (with actual logprobs)
messageItem.Status = "completed"
messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.done",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
if messageItem != nil {
// Emit output_text.done
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Text: strPtr(result),
Logprobs: logprobsPtr(mcpStreamLogprobs),
})
sequenceNumber++
// Emit content_part.done (with actual logprobs)
resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs)
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &resultPart,
})
sequenceNumber++
// Emit output_item.done (with actual logprobs)
messageItem.Status = "completed"
messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.done",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
}
// Emit function_call items from automatic tool parsing fallback
for _, fc := range streamFallbackToolCalls {
@@ -2631,10 +2663,13 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// Emit response.completed
now := time.Now().Unix()
// Collect final output items (reasoning first, then messages, then tool calls)
// Collect final output items, ordered reasoning -> message -> tool calls.
// Issue #9658: reasoning is emitted as its own item ahead of the message,
// matching the streamed order (reasoning item is opened before the message
// item when the model thinks first).
var finalOutputItems []schema.ORItemField
// Add reasoning item if it exists
if currentReasoningID != "" && finalReasoning != "" {
// Add reasoning item if one was streamed.
if router.ReasoningStreamed() && finalReasoning != "" {
finalOutputItems = append(finalOutputItems, schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
@@ -2642,18 +2677,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
Content: []schema.ORContentPart{makeOutputTextPart(finalReasoning)},
})
}
// Add message item
if len(collectedOutputItems) > 0 {
// Use collected items (may include reasoning already)
for _, item := range collectedOutputItems {
if item.Type == "message" {
finalOutputItems = append(finalOutputItems, item)
}
}
} else {
// Add the message item if one was produced (created lazily, so it may be
// nil for a pure-reasoning turn).
if messageItem != nil {
finalOutputItems = append(finalOutputItems, *messageItem)
}
// Add function_call items from fallback
// Add function_call items from fallback parsing.
for _, item := range collectedOutputItems {
if item.Type == "function_call" {
finalOutputItems = append(finalOutputItems, item)

View File

@@ -0,0 +1,114 @@
package openresponses
import (
"github.com/mudler/LocalAI/core/backend"
reason "github.com/mudler/LocalAI/pkg/reasoning"
)
// streamTokenRouting describes how a single streamed token's deltas should be
// routed to Open Responses output items: the reasoning/content split and
// whether a new reasoning or message output item must be opened before the
// corresponding delta can be emitted.
type streamTokenRouting struct {
ReasoningDelta string
ContentDelta string
// OpenReasoningItem is true when a reasoning output item must be created
// before emitting ReasoningDelta (the first reasoning delta of the stream).
OpenReasoningItem bool
// OpenMessageItem is true when a message output item must be created before
// emitting ContentDelta (the first content delta of the stream).
OpenMessageItem bool
}
// streamReasoningRouter classifies streamed tokens into reasoning vs message
// deltas and tracks which output items have been opened, so the SSE-emitting
// code in handleOpenResponsesStream becomes a thin shell over a unit-testable
// decision.
//
// It mirrors the sticky-preferAutoparser logic in the OpenAI chat streaming
// worker (core/http/endpoints/openai/chat_stream_workers.go, processStream):
// once the C++ autoparser has surfaced reasoning_content, we trust its
// classification for the rest of the stream; until then we fall back to the
// Go-side reasoning extractor so a pure-content autoparser (the non-jinja PEG
// fallback, issue #9985) does not leak <think>...</think> tokens into content.
//
// Crucially, the decision to open and target a reasoning item keys off the
// per-token reasoningDelta, NOT extractor.Reasoning(): the autoparser path
// computes reasoning through ProcessChatDeltaReasoning, which updates a
// separate accumulator that extractor.Reasoning() never exposes. Gating on
// extractor.Reasoning() (issue #9658) dropped live reasoning whenever the
// autoparser drove it via reasoning_content, surfacing it only after the
// stream completed and mis-routing earlier deltas onto the msg_ item.
type streamReasoningRouter struct {
extractor *reason.ReasoningExtractor
preferAutoparser bool
reasoningOpened bool
messageOpened bool
}
func newStreamReasoningRouter(extractor *reason.ReasoningExtractor) *streamReasoningRouter {
return &streamReasoningRouter{extractor: extractor}
}
// classify splits a token into reasoning/content deltas using the sticky
// preferAutoparser preference. Once the C++ autoparser has surfaced
// reasoning_content we trust it for the rest of the stream; until then we fall
// back to the Go-side extractor so a pure-content autoparser (zero
// reasoning_content, issue #9985) does not leak <think>...</think> tokens into
// content.
func (r *streamReasoningRouter) classify(token string, usage backend.TokenUsage) (reasoningDelta, contentDelta string) {
goReasoning, goContent := r.extractor.ProcessToken(token)
if usage.HasChatDeltaContent() {
rawReasoning, cd := usage.ChatDeltaReasoningAndContent()
if rawReasoning != "" {
r.preferAutoparser = true
}
if r.preferAutoparser {
contentDelta = cd
reasoningDelta = r.extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
return reasoningDelta, contentDelta
}
// route classifies a token and decides which output items its deltas target,
// flipping the opened-flags as items are created.
//
// The reasoning gate keys off reasoningDelta, NOT extractor.Reasoning(): the
// autoparser path computes reasoning via ProcessChatDeltaReasoning into a
// separate accumulator that extractor.Reasoning() never reflects (issue #9658).
func (r *streamReasoningRouter) route(token string, usage backend.TokenUsage) streamTokenRouting {
reasoningDelta, contentDelta := r.classify(token, usage)
out := streamTokenRouting{ReasoningDelta: reasoningDelta, ContentDelta: contentDelta}
if reasoningDelta != "" && !r.reasoningOpened {
out.OpenReasoningItem = true
r.reasoningOpened = true
}
if contentDelta != "" && !r.messageOpened {
out.OpenMessageItem = true
r.messageOpened = true
}
return out
}
// resetForIteration clears the per-stream routing state for an MCP re-inference
// iteration, mirroring extractor.Reset() on the underlying extractor.
func (r *streamReasoningRouter) resetForIteration() {
r.preferAutoparser = false
r.reasoningOpened = false
r.messageOpened = false
r.extractor.Reset()
}
// ReasoningStreamed reports whether a reasoning output item was opened during
// the stream. The end-of-stream closing blocks key off this rather than a
// reasoning-id string so the ordering (reasoning before message) is explicit.
func (r *streamReasoningRouter) ReasoningStreamed() bool {
return r.reasoningOpened
}

View File

@@ -0,0 +1,101 @@
package openresponses
import (
"github.com/mudler/LocalAI/core/backend"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
reason "github.com/mudler/LocalAI/pkg/reasoning"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// usageWithChatDeltas builds a TokenUsage carrying a single C++ autoparser
// ChatDelta with the given content / reasoning_content split.
func usageWithChatDeltas(content, reasoningContent string) backend.TokenUsage {
return backend.TokenUsage{
ChatDeltas: []*pb.ChatDelta{
{Content: content, ReasoningContent: reasoningContent},
},
}
}
// Regression tests for issue #9658: in the /v1/responses streaming handler the
// thinking monologue from a reasoning model was streamed to the client as a
// normal message (msg_ item, output_text.delta) instead of as a reasoning
// item, and was only re-classified into a reasoning item AFTER the stream
// completed.
//
// Root cause: the live reasoning item was gated on extractor.Reasoning(),
// which is only updated by the Go-side raw-tag parser (ProcessToken). When the
// C++ autoparser drives reasoning through reasoning_content ChatDeltas, the
// reasoning is computed via ProcessChatDeltaReasoning into a SEPARATE
// accumulator, so extractor.Reasoning() stays empty and the gate never fires.
var _ = Describe("streamReasoningRouter", func() {
Context("autoparser drives reasoning via reasoning_content (issue #9658)", func() {
It("opens a reasoning item during streaming and targets it (not the message)", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
// The raw token is empty: the autoparser carries the reasoning in
// ChatDelta.ReasoningContent, so the Go-side extractor's
// Reasoning() stays "" — exactly the state in which the buggy
// extractor.Reasoning() gate failed to open a reasoning item.
routing := router.route("", usageWithChatDeltas("", "Let me think about this"))
Expect(routing.ReasoningDelta).To(Equal("Let me think about this"),
"the autoparser's reasoning_content must surface as a reasoning delta during streaming")
Expect(routing.OpenReasoningItem).To(BeTrue(),
"a reasoning output item must be opened live, not deferred to end-of-stream (#9658)")
Expect(routing.ContentDelta).To(BeEmpty())
Expect(routing.OpenMessageItem).To(BeFalse(),
"reasoning deltas must target the reasoning_ item, never open/route to a msg_ item")
})
It("does not re-open the reasoning item on subsequent reasoning deltas", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
_ = router.route("", usageWithChatDeltas("", "first "))
routing := router.route("", usageWithChatDeltas("", "second"))
Expect(routing.ReasoningDelta).To(Equal("second"))
Expect(routing.OpenReasoningItem).To(BeFalse())
})
})
Context("pure content stream", func() {
It("never opens a reasoning item", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
// Content-only with no reasoning_content: the autoparser is in its
// pure-content mode, so the router stays on the Go-side extractor,
// which sees the content via the raw token.
routing := router.route("hello world", usageWithChatDeltas("hello world", ""))
Expect(routing.ContentDelta).To(Equal("hello world"))
Expect(routing.OpenMessageItem).To(BeTrue())
Expect(routing.OpenReasoningItem).To(BeFalse(),
"a content-only stream must never open a reasoning item")
Expect(router.ReasoningStreamed()).To(BeFalse())
})
})
Context("content-only autoparser with embedded <think> (issue #9985 fallback)", func() {
It("falls back to Go-side extraction instead of leaking <think> into content", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
// The autoparser is in its non-jinja pure-content fallback: it
// surfaces the whole string as Content with zero reasoning_content,
// tags and all. The router must NOT trust it (preferAutoparser must
// stay false) and instead use the Go-side split.
routing := router.route("<think>reasoning here</think>answer",
usageWithChatDeltas("<think>reasoning here</think>answer", ""))
Expect(routing.ContentDelta).To(Equal("answer"),
"content must be the cleaned answer, not the raw <think>...</think> string")
Expect(routing.ReasoningDelta).To(Equal("reasoning here"))
Expect(routing.OpenReasoningItem).To(BeTrue())
})
})
})