diff --git a/core/http/endpoints/openresponses/responses.go b/core/http/endpoints/openresponses/responses.go index 916380d01..d185ac9e1 100644 --- a/core/http/endpoints/openresponses/responses.go +++ b/core/http/endpoints/openresponses/responses.go @@ -1648,6 +1648,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 var currentReasoningContentIndex int var reasoningTokens int extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig) + // router classifies each streamed token into reasoning vs message deltas + // and decides which output item they target. It encapsulates the + // sticky-preferAutoparser fallback and the reasoningDelta-based gate that + // fix issue #9658 (live reasoning was mis-routed onto the msg_ item and + // only re-classified as a reasoning item after the stream completed). + router := newStreamReasoningRouter(extractor) // Collect all output items for storage var collectedOutputItems []schema.ORItemField @@ -1671,7 +1677,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 // Reset reasoning and tool-call state for re-inference so reasoning // extraction runs again on subsequent iterations inToolCallMode = false - extractor.Reset() + router.resetForIteration() currentMessageID = "" lastEmittedToolCallCount = 0 currentReasoningID = "" @@ -1832,110 +1838,101 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 // If no tool calls detected yet, handle reasoning and text if !inToolCallMode { - var reasoningDelta, contentDelta string - goReasoning, goContent := extractor.ProcessToken(token) + routing := router.route(token, tokenUsage) - if tokenUsage.HasChatDeltaContent() { - rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent() - contentDelta = cd - reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning) - } else { - reasoningDelta = goReasoning - contentDelta = goContent + // Handle reasoning item. The reasoning item is opened lazily + // on the first reasoning delta - gating on routing, not + // extractor.Reasoning() (issue #9658): when the C++ + // autoparser drives reasoning via reasoning_content, + // extractor.Reasoning() stays empty and the old gate dropped + // the live reasoning item. + if routing.OpenReasoningItem { + outputIndex++ + currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String()) + reasoningItem := &schema.ORItemField{ + Type: "reasoning", + ID: currentReasoningID, + Status: "in_progress", + } + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_item.added", + SequenceNumber: sequenceNumber, + OutputIndex: &outputIndex, + Item: reasoningItem, + }) + sequenceNumber++ + + // Emit content_part.added for reasoning + currentReasoningContentIndex = 0 + emptyPart := makeOutputTextPart("") + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.content_part.added", + SequenceNumber: sequenceNumber, + ItemID: currentReasoningID, + OutputIndex: &outputIndex, + ContentIndex: ¤tReasoningContentIndex, + Part: &emptyPart, + }) + sequenceNumber++ } - // Handle reasoning item - if extractor.Reasoning() != "" { - // Check if we need to create reasoning item - if currentReasoningID == "" { - outputIndex++ - currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String()) - reasoningItem := &schema.ORItemField{ - Type: "reasoning", - ID: currentReasoningID, - Status: "in_progress", - } - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_item.added", - SequenceNumber: sequenceNumber, - OutputIndex: &outputIndex, - Item: reasoningItem, - }) - sequenceNumber++ - - // Emit content_part.added for reasoning - currentReasoningContentIndex = 0 - emptyPart := makeOutputTextPart("") - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.content_part.added", - SequenceNumber: sequenceNumber, - ItemID: currentReasoningID, - OutputIndex: &outputIndex, - ContentIndex: ¤tReasoningContentIndex, - Part: &emptyPart, - }) - sequenceNumber++ - } - - // Emit reasoning delta if there's new content - if reasoningDelta != "" { - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_text.delta", - SequenceNumber: sequenceNumber, - ItemID: currentReasoningID, - OutputIndex: &outputIndex, - ContentIndex: ¤tReasoningContentIndex, - Delta: strPtr(reasoningDelta), - Logprobs: emptyLogprobs(), - }) - sequenceNumber++ - c.Response().Flush() - } + // Emit reasoning delta against the reasoning_ item id. + if routing.ReasoningDelta != "" { + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_text.delta", + SequenceNumber: sequenceNumber, + ItemID: currentReasoningID, + OutputIndex: &outputIndex, + ContentIndex: ¤tReasoningContentIndex, + Delta: strPtr(routing.ReasoningDelta), + Logprobs: emptyLogprobs(), + }) + sequenceNumber++ + c.Response().Flush() } - // Only emit message content if there's actual content (not just reasoning) - if contentDelta != "" { - if currentMessageID == "" { - // Emit output_item.added for message - outputIndex++ - currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String()) - messageItem := &schema.ORItemField{ - Type: "message", - ID: currentMessageID, - Status: "in_progress", - Role: "assistant", - Content: []schema.ORContentPart{}, - } - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_item.added", - SequenceNumber: sequenceNumber, - OutputIndex: &outputIndex, - Item: messageItem, - }) - sequenceNumber++ - - // Emit content_part.added - currentContentIndex = 0 - emptyPart := makeOutputTextPart("") - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.content_part.added", - SequenceNumber: sequenceNumber, - ItemID: currentMessageID, - OutputIndex: &outputIndex, - ContentIndex: ¤tContentIndex, - Part: &emptyPart, - }) - sequenceNumber++ + // Open the message item lazily on the first content delta. + if routing.OpenMessageItem { + outputIndex++ + currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String()) + messageItem := &schema.ORItemField{ + Type: "message", + ID: currentMessageID, + Status: "in_progress", + Role: "assistant", + Content: []schema.ORContentPart{}, } + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_item.added", + SequenceNumber: sequenceNumber, + OutputIndex: &outputIndex, + Item: messageItem, + }) + sequenceNumber++ - // Emit text delta + // Emit content_part.added + currentContentIndex = 0 + emptyPart := makeOutputTextPart("") + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.content_part.added", + SequenceNumber: sequenceNumber, + ItemID: currentMessageID, + OutputIndex: &outputIndex, + ContentIndex: ¤tContentIndex, + Part: &emptyPart, + }) + sequenceNumber++ + } + + // Emit text delta against the msg_ item id. + if routing.ContentDelta != "" { sendSSEEvent(c, &schema.ORStreamEvent{ Type: "response.output_text.delta", SequenceNumber: sequenceNumber, ItemID: currentMessageID, OutputIndex: &outputIndex, ContentIndex: ¤tContentIndex, - Delta: strPtr(contentDelta), + Delta: strPtr(routing.ContentDelta), Logprobs: emptyLogprobs(), }) sequenceNumber++ @@ -2331,112 +2328,109 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 return nil } - // Non-tool-call streaming path - // Emit output_item.added for message - currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String()) - messageItem := &schema.ORItemField{ - Type: "message", - ID: currentMessageID, - Status: "in_progress", - Role: "assistant", - Content: []schema.ORContentPart{}, - } - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_item.added", - SequenceNumber: sequenceNumber, - OutputIndex: &outputIndex, - Item: messageItem, - }) - sequenceNumber++ - - // Emit content_part.added - currentContentIndex = 0 - emptyTextPart := makeOutputTextPart("") - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.content_part.added", - SequenceNumber: sequenceNumber, - ItemID: currentMessageID, - OutputIndex: &outputIndex, - ContentIndex: ¤tContentIndex, - Part: &emptyTextPart, - }) - sequenceNumber++ + // Non-tool-call streaming path. + // + // The message output item is created LAZILY on the first content delta + // (mirroring the tool-call path), not eagerly before the first token. + // Issue #9658: an eager msg_ item forced reasoning to a higher output + // index and made mis-split text land on the pre-existing message, + // so the thinking monologue streamed as message text instead of reasoning. + var messageItem *schema.ORItemField // Stream text deltas with reasoning extraction tokenCallback := func(token string, tokenUsage backend.TokenUsage) bool { accumulatedText += token - var reasoningDelta, contentDelta string - goReasoning, goContent := extractor.ProcessToken(token) + routing := router.route(token, tokenUsage) - if tokenUsage.HasChatDeltaContent() { - rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent() - contentDelta = cd - reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning) - } else { - reasoningDelta = goReasoning - contentDelta = goContent + // Open the reasoning item lazily on the first reasoning delta. + if routing.OpenReasoningItem { + outputIndex++ + currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String()) + reasoningItem := &schema.ORItemField{ + Type: "reasoning", + ID: currentReasoningID, + Status: "in_progress", + } + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_item.added", + SequenceNumber: sequenceNumber, + OutputIndex: &outputIndex, + Item: reasoningItem, + }) + sequenceNumber++ + + // Emit content_part.added for reasoning + currentReasoningContentIndex = 0 + emptyPart := makeOutputTextPart("") + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.content_part.added", + SequenceNumber: sequenceNumber, + ItemID: currentReasoningID, + OutputIndex: &outputIndex, + ContentIndex: ¤tReasoningContentIndex, + Part: &emptyPart, + }) + sequenceNumber++ } - // Handle reasoning item - if extractor.Reasoning() != "" { - // Check if we need to create reasoning item - if currentReasoningID == "" { - outputIndex++ - currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String()) - reasoningItem := &schema.ORItemField{ - Type: "reasoning", - ID: currentReasoningID, - Status: "in_progress", - } - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_item.added", - SequenceNumber: sequenceNumber, - OutputIndex: &outputIndex, - Item: reasoningItem, - }) - sequenceNumber++ - - // Emit content_part.added for reasoning - currentReasoningContentIndex = 0 - emptyPart := makeOutputTextPart("") - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.content_part.added", - SequenceNumber: sequenceNumber, - ItemID: currentReasoningID, - OutputIndex: &outputIndex, - ContentIndex: ¤tReasoningContentIndex, - Part: &emptyPart, - }) - sequenceNumber++ - } - - // Emit reasoning delta if there's new content - if reasoningDelta != "" { - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_text.delta", - SequenceNumber: sequenceNumber, - ItemID: currentReasoningID, - OutputIndex: &outputIndex, - ContentIndex: ¤tReasoningContentIndex, - Delta: strPtr(reasoningDelta), - Logprobs: emptyLogprobs(), - }) - sequenceNumber++ - c.Response().Flush() - } + // Emit reasoning delta against the reasoning_ item id. + if routing.ReasoningDelta != "" { + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_text.delta", + SequenceNumber: sequenceNumber, + ItemID: currentReasoningID, + OutputIndex: &outputIndex, + ContentIndex: ¤tReasoningContentIndex, + Delta: strPtr(routing.ReasoningDelta), + Logprobs: emptyLogprobs(), + }) + sequenceNumber++ + c.Response().Flush() } - // Only emit message content if there's actual content (not just reasoning) - if contentDelta != "" { - // Emit text delta + // Open the message item lazily on the first content delta. + if routing.OpenMessageItem { + outputIndex++ + currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String()) + messageItem = &schema.ORItemField{ + Type: "message", + ID: currentMessageID, + Status: "in_progress", + Role: "assistant", + Content: []schema.ORContentPart{}, + } + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_item.added", + SequenceNumber: sequenceNumber, + OutputIndex: &outputIndex, + Item: messageItem, + }) + sequenceNumber++ + + // Emit content_part.added + currentContentIndex = 0 + emptyTextPart := makeOutputTextPart("") + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.content_part.added", + SequenceNumber: sequenceNumber, + ItemID: currentMessageID, + OutputIndex: &outputIndex, + ContentIndex: ¤tContentIndex, + Part: &emptyTextPart, + }) + sequenceNumber++ + } + + // Emit text delta against the msg_ item id. + if routing.ContentDelta != "" { sendSSEEvent(c, &schema.ORStreamEvent{ Type: "response.output_text.delta", SequenceNumber: sequenceNumber, ItemID: currentMessageID, OutputIndex: &outputIndex, ContentIndex: ¤tContentIndex, - Delta: strPtr(contentDelta), + Delta: strPtr(routing.ContentDelta), Logprobs: emptyLogprobs(), }) sequenceNumber++ @@ -2561,40 +2555,78 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 // Convert logprobs for streaming events mcpStreamLogprobs := convertLogprobsForStreaming(noToolLogprobs) - // Emit output_text.done - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_text.done", - SequenceNumber: sequenceNumber, - ItemID: currentMessageID, - OutputIndex: &outputIndex, - ContentIndex: ¤tContentIndex, - Text: strPtr(result), - Logprobs: logprobsPtr(mcpStreamLogprobs), - }) - sequenceNumber++ + // The message item is created lazily on the first content delta (issue + // #9658). If no content streamed but final extraction produced text (e.g. + // the autoparser delivered everything at once), open the message item now + // so the closing events below are valid. A pure-reasoning turn (no content + // at all) leaves messageItem nil and emits no message item. + if messageItem == nil && result != "" { + outputIndex++ + currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String()) + messageItem = &schema.ORItemField{ + Type: "message", + ID: currentMessageID, + Status: "in_progress", + Role: "assistant", + Content: []schema.ORContentPart{}, + } + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_item.added", + SequenceNumber: sequenceNumber, + OutputIndex: &outputIndex, + Item: messageItem, + }) + sequenceNumber++ - // Emit content_part.done (with actual logprobs) - resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs) - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.content_part.done", - SequenceNumber: sequenceNumber, - ItemID: currentMessageID, - OutputIndex: &outputIndex, - ContentIndex: ¤tContentIndex, - Part: &resultPart, - }) - sequenceNumber++ + currentContentIndex = 0 + emptyTextPart := makeOutputTextPart("") + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.content_part.added", + SequenceNumber: sequenceNumber, + ItemID: currentMessageID, + OutputIndex: &outputIndex, + ContentIndex: ¤tContentIndex, + Part: &emptyTextPart, + }) + sequenceNumber++ + } - // Emit output_item.done (with actual logprobs) - messageItem.Status = "completed" - messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)} - sendSSEEvent(c, &schema.ORStreamEvent{ - Type: "response.output_item.done", - SequenceNumber: sequenceNumber, - OutputIndex: &outputIndex, - Item: messageItem, - }) - sequenceNumber++ + if messageItem != nil { + // Emit output_text.done + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_text.done", + SequenceNumber: sequenceNumber, + ItemID: currentMessageID, + OutputIndex: &outputIndex, + ContentIndex: ¤tContentIndex, + Text: strPtr(result), + Logprobs: logprobsPtr(mcpStreamLogprobs), + }) + sequenceNumber++ + + // Emit content_part.done (with actual logprobs) + resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs) + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.content_part.done", + SequenceNumber: sequenceNumber, + ItemID: currentMessageID, + OutputIndex: &outputIndex, + ContentIndex: ¤tContentIndex, + Part: &resultPart, + }) + sequenceNumber++ + + // Emit output_item.done (with actual logprobs) + messageItem.Status = "completed" + messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)} + sendSSEEvent(c, &schema.ORStreamEvent{ + Type: "response.output_item.done", + SequenceNumber: sequenceNumber, + OutputIndex: &outputIndex, + Item: messageItem, + }) + sequenceNumber++ + } // Emit function_call items from automatic tool parsing fallback for _, fc := range streamFallbackToolCalls { @@ -2631,10 +2663,13 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 // Emit response.completed now := time.Now().Unix() - // Collect final output items (reasoning first, then messages, then tool calls) + // Collect final output items, ordered reasoning -> message -> tool calls. + // Issue #9658: reasoning is emitted as its own item ahead of the message, + // matching the streamed order (reasoning item is opened before the message + // item when the model thinks first). var finalOutputItems []schema.ORItemField - // Add reasoning item if it exists - if currentReasoningID != "" && finalReasoning != "" { + // Add reasoning item if one was streamed. + if router.ReasoningStreamed() && finalReasoning != "" { finalOutputItems = append(finalOutputItems, schema.ORItemField{ Type: "reasoning", ID: currentReasoningID, @@ -2642,18 +2677,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 Content: []schema.ORContentPart{makeOutputTextPart(finalReasoning)}, }) } - // Add message item - if len(collectedOutputItems) > 0 { - // Use collected items (may include reasoning already) - for _, item := range collectedOutputItems { - if item.Type == "message" { - finalOutputItems = append(finalOutputItems, item) - } - } - } else { + // Add the message item if one was produced (created lazily, so it may be + // nil for a pure-reasoning turn). + if messageItem != nil { finalOutputItems = append(finalOutputItems, *messageItem) } - // Add function_call items from fallback + // Add function_call items from fallback parsing. for _, item := range collectedOutputItems { if item.Type == "function_call" { finalOutputItems = append(finalOutputItems, item) diff --git a/core/http/endpoints/openresponses/responses_stream_reasoning.go b/core/http/endpoints/openresponses/responses_stream_reasoning.go new file mode 100644 index 000000000..107689a1f --- /dev/null +++ b/core/http/endpoints/openresponses/responses_stream_reasoning.go @@ -0,0 +1,114 @@ +package openresponses + +import ( + "github.com/mudler/LocalAI/core/backend" + reason "github.com/mudler/LocalAI/pkg/reasoning" +) + +// streamTokenRouting describes how a single streamed token's deltas should be +// routed to Open Responses output items: the reasoning/content split and +// whether a new reasoning or message output item must be opened before the +// corresponding delta can be emitted. +type streamTokenRouting struct { + ReasoningDelta string + ContentDelta string + // OpenReasoningItem is true when a reasoning output item must be created + // before emitting ReasoningDelta (the first reasoning delta of the stream). + OpenReasoningItem bool + // OpenMessageItem is true when a message output item must be created before + // emitting ContentDelta (the first content delta of the stream). + OpenMessageItem bool +} + +// streamReasoningRouter classifies streamed tokens into reasoning vs message +// deltas and tracks which output items have been opened, so the SSE-emitting +// code in handleOpenResponsesStream becomes a thin shell over a unit-testable +// decision. +// +// It mirrors the sticky-preferAutoparser logic in the OpenAI chat streaming +// worker (core/http/endpoints/openai/chat_stream_workers.go, processStream): +// once the C++ autoparser has surfaced reasoning_content, we trust its +// classification for the rest of the stream; until then we fall back to the +// Go-side reasoning extractor so a pure-content autoparser (the non-jinja PEG +// fallback, issue #9985) does not leak ... tokens into content. +// +// Crucially, the decision to open and target a reasoning item keys off the +// per-token reasoningDelta, NOT extractor.Reasoning(): the autoparser path +// computes reasoning through ProcessChatDeltaReasoning, which updates a +// separate accumulator that extractor.Reasoning() never exposes. Gating on +// extractor.Reasoning() (issue #9658) dropped live reasoning whenever the +// autoparser drove it via reasoning_content, surfacing it only after the +// stream completed and mis-routing earlier deltas onto the msg_ item. +type streamReasoningRouter struct { + extractor *reason.ReasoningExtractor + preferAutoparser bool + reasoningOpened bool + messageOpened bool +} + +func newStreamReasoningRouter(extractor *reason.ReasoningExtractor) *streamReasoningRouter { + return &streamReasoningRouter{extractor: extractor} +} + +// classify splits a token into reasoning/content deltas using the sticky +// preferAutoparser preference. Once the C++ autoparser has surfaced +// reasoning_content we trust it for the rest of the stream; until then we fall +// back to the Go-side extractor so a pure-content autoparser (zero +// reasoning_content, issue #9985) does not leak ... tokens into +// content. +func (r *streamReasoningRouter) classify(token string, usage backend.TokenUsage) (reasoningDelta, contentDelta string) { + goReasoning, goContent := r.extractor.ProcessToken(token) + if usage.HasChatDeltaContent() { + rawReasoning, cd := usage.ChatDeltaReasoningAndContent() + if rawReasoning != "" { + r.preferAutoparser = true + } + if r.preferAutoparser { + contentDelta = cd + reasoningDelta = r.extractor.ProcessChatDeltaReasoning(rawReasoning) + } else { + reasoningDelta = goReasoning + contentDelta = goContent + } + } else { + reasoningDelta = goReasoning + contentDelta = goContent + } + return reasoningDelta, contentDelta +} + +// route classifies a token and decides which output items its deltas target, +// flipping the opened-flags as items are created. +// +// The reasoning gate keys off reasoningDelta, NOT extractor.Reasoning(): the +// autoparser path computes reasoning via ProcessChatDeltaReasoning into a +// separate accumulator that extractor.Reasoning() never reflects (issue #9658). +func (r *streamReasoningRouter) route(token string, usage backend.TokenUsage) streamTokenRouting { + reasoningDelta, contentDelta := r.classify(token, usage) + out := streamTokenRouting{ReasoningDelta: reasoningDelta, ContentDelta: contentDelta} + if reasoningDelta != "" && !r.reasoningOpened { + out.OpenReasoningItem = true + r.reasoningOpened = true + } + if contentDelta != "" && !r.messageOpened { + out.OpenMessageItem = true + r.messageOpened = true + } + return out +} + +// resetForIteration clears the per-stream routing state for an MCP re-inference +// iteration, mirroring extractor.Reset() on the underlying extractor. +func (r *streamReasoningRouter) resetForIteration() { + r.preferAutoparser = false + r.reasoningOpened = false + r.messageOpened = false + r.extractor.Reset() +} + +// ReasoningStreamed reports whether a reasoning output item was opened during +// the stream. The end-of-stream closing blocks key off this rather than a +// reasoning-id string so the ordering (reasoning before message) is explicit. +func (r *streamReasoningRouter) ReasoningStreamed() bool { + return r.reasoningOpened +} diff --git a/core/http/endpoints/openresponses/responses_stream_reasoning_test.go b/core/http/endpoints/openresponses/responses_stream_reasoning_test.go new file mode 100644 index 000000000..e5c2f5297 --- /dev/null +++ b/core/http/endpoints/openresponses/responses_stream_reasoning_test.go @@ -0,0 +1,101 @@ +package openresponses + +import ( + "github.com/mudler/LocalAI/core/backend" + pb "github.com/mudler/LocalAI/pkg/grpc/proto" + reason "github.com/mudler/LocalAI/pkg/reasoning" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// usageWithChatDeltas builds a TokenUsage carrying a single C++ autoparser +// ChatDelta with the given content / reasoning_content split. +func usageWithChatDeltas(content, reasoningContent string) backend.TokenUsage { + return backend.TokenUsage{ + ChatDeltas: []*pb.ChatDelta{ + {Content: content, ReasoningContent: reasoningContent}, + }, + } +} + +// Regression tests for issue #9658: in the /v1/responses streaming handler the +// thinking monologue from a reasoning model was streamed to the client as a +// normal message (msg_ item, output_text.delta) instead of as a reasoning +// item, and was only re-classified into a reasoning item AFTER the stream +// completed. +// +// Root cause: the live reasoning item was gated on extractor.Reasoning(), +// which is only updated by the Go-side raw-tag parser (ProcessToken). When the +// C++ autoparser drives reasoning through reasoning_content ChatDeltas, the +// reasoning is computed via ProcessChatDeltaReasoning into a SEPARATE +// accumulator, so extractor.Reasoning() stays empty and the gate never fires. +var _ = Describe("streamReasoningRouter", func() { + Context("autoparser drives reasoning via reasoning_content (issue #9658)", func() { + It("opens a reasoning item during streaming and targets it (not the message)", func() { + extractor := reason.NewReasoningExtractor("", reason.Config{}) + router := newStreamReasoningRouter(extractor) + + // The raw token is empty: the autoparser carries the reasoning in + // ChatDelta.ReasoningContent, so the Go-side extractor's + // Reasoning() stays "" — exactly the state in which the buggy + // extractor.Reasoning() gate failed to open a reasoning item. + routing := router.route("", usageWithChatDeltas("", "Let me think about this")) + + Expect(routing.ReasoningDelta).To(Equal("Let me think about this"), + "the autoparser's reasoning_content must surface as a reasoning delta during streaming") + Expect(routing.OpenReasoningItem).To(BeTrue(), + "a reasoning output item must be opened live, not deferred to end-of-stream (#9658)") + Expect(routing.ContentDelta).To(BeEmpty()) + Expect(routing.OpenMessageItem).To(BeFalse(), + "reasoning deltas must target the reasoning_ item, never open/route to a msg_ item") + }) + + It("does not re-open the reasoning item on subsequent reasoning deltas", func() { + extractor := reason.NewReasoningExtractor("", reason.Config{}) + router := newStreamReasoningRouter(extractor) + + _ = router.route("", usageWithChatDeltas("", "first ")) + routing := router.route("", usageWithChatDeltas("", "second")) + + Expect(routing.ReasoningDelta).To(Equal("second")) + Expect(routing.OpenReasoningItem).To(BeFalse()) + }) + }) + + Context("pure content stream", func() { + It("never opens a reasoning item", func() { + extractor := reason.NewReasoningExtractor("", reason.Config{}) + router := newStreamReasoningRouter(extractor) + + // Content-only with no reasoning_content: the autoparser is in its + // pure-content mode, so the router stays on the Go-side extractor, + // which sees the content via the raw token. + routing := router.route("hello world", usageWithChatDeltas("hello world", "")) + + Expect(routing.ContentDelta).To(Equal("hello world")) + Expect(routing.OpenMessageItem).To(BeTrue()) + Expect(routing.OpenReasoningItem).To(BeFalse(), + "a content-only stream must never open a reasoning item") + Expect(router.ReasoningStreamed()).To(BeFalse()) + }) + }) + + Context("content-only autoparser with embedded (issue #9985 fallback)", func() { + It("falls back to Go-side extraction instead of leaking into content", func() { + extractor := reason.NewReasoningExtractor("", reason.Config{}) + router := newStreamReasoningRouter(extractor) + + // The autoparser is in its non-jinja pure-content fallback: it + // surfaces the whole string as Content with zero reasoning_content, + // tags and all. The router must NOT trust it (preferAutoparser must + // stay false) and instead use the Go-side split. + routing := router.route("reasoning hereanswer", + usageWithChatDeltas("reasoning hereanswer", "")) + + Expect(routing.ContentDelta).To(Equal("answer"), + "content must be the cleaned answer, not the raw ... string") + Expect(routing.ReasoningDelta).To(Equal("reasoning here")) + Expect(routing.OpenReasoningItem).To(BeTrue()) + }) + }) +})