mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-13 03:09:03 -04:00
Compare commits
1 Commits
fix/7461-m
...
fix/9658-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
390664ff72 |
@@ -1648,6 +1648,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
var currentReasoningContentIndex int
|
||||
var reasoningTokens int
|
||||
extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig)
|
||||
// router classifies each streamed token into reasoning vs message deltas
|
||||
// and decides which output item they target. It encapsulates the
|
||||
// sticky-preferAutoparser fallback and the reasoningDelta-based gate that
|
||||
// fix issue #9658 (live reasoning was mis-routed onto the msg_ item and
|
||||
// only re-classified as a reasoning item after the stream completed).
|
||||
router := newStreamReasoningRouter(extractor)
|
||||
|
||||
// Collect all output items for storage
|
||||
var collectedOutputItems []schema.ORItemField
|
||||
@@ -1671,7 +1677,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
// Reset reasoning and tool-call state for re-inference so reasoning
|
||||
// extraction runs again on subsequent iterations
|
||||
inToolCallMode = false
|
||||
extractor.Reset()
|
||||
router.resetForIteration()
|
||||
currentMessageID = ""
|
||||
lastEmittedToolCallCount = 0
|
||||
currentReasoningID = ""
|
||||
@@ -1832,110 +1838,101 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
|
||||
// If no tool calls detected yet, handle reasoning and text
|
||||
if !inToolCallMode {
|
||||
var reasoningDelta, contentDelta string
|
||||
goReasoning, goContent := extractor.ProcessToken(token)
|
||||
routing := router.route(token, tokenUsage)
|
||||
|
||||
if tokenUsage.HasChatDeltaContent() {
|
||||
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
|
||||
contentDelta = cd
|
||||
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
|
||||
} else {
|
||||
reasoningDelta = goReasoning
|
||||
contentDelta = goContent
|
||||
// Handle reasoning item. The reasoning item is opened lazily
|
||||
// on the first reasoning delta - gating on routing, not
|
||||
// extractor.Reasoning() (issue #9658): when the C++
|
||||
// autoparser drives reasoning via reasoning_content,
|
||||
// extractor.Reasoning() stays empty and the old gate dropped
|
||||
// the live reasoning item.
|
||||
if routing.OpenReasoningItem {
|
||||
outputIndex++
|
||||
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
|
||||
reasoningItem := &schema.ORItemField{
|
||||
Type: "reasoning",
|
||||
ID: currentReasoningID,
|
||||
Status: "in_progress",
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: reasoningItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.added for reasoning
|
||||
currentReasoningContentIndex = 0
|
||||
emptyPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Part: &emptyPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Handle reasoning item
|
||||
if extractor.Reasoning() != "" {
|
||||
// Check if we need to create reasoning item
|
||||
if currentReasoningID == "" {
|
||||
outputIndex++
|
||||
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
|
||||
reasoningItem := &schema.ORItemField{
|
||||
Type: "reasoning",
|
||||
ID: currentReasoningID,
|
||||
Status: "in_progress",
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: reasoningItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.added for reasoning
|
||||
currentReasoningContentIndex = 0
|
||||
emptyPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Part: &emptyPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Emit reasoning delta if there's new content
|
||||
if reasoningDelta != "" {
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.delta",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Delta: strPtr(reasoningDelta),
|
||||
Logprobs: emptyLogprobs(),
|
||||
})
|
||||
sequenceNumber++
|
||||
c.Response().Flush()
|
||||
}
|
||||
// Emit reasoning delta against the reasoning_ item id.
|
||||
if routing.ReasoningDelta != "" {
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.delta",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Delta: strPtr(routing.ReasoningDelta),
|
||||
Logprobs: emptyLogprobs(),
|
||||
})
|
||||
sequenceNumber++
|
||||
c.Response().Flush()
|
||||
}
|
||||
|
||||
// Only emit message content if there's actual content (not just reasoning)
|
||||
if contentDelta != "" {
|
||||
if currentMessageID == "" {
|
||||
// Emit output_item.added for message
|
||||
outputIndex++
|
||||
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
|
||||
messageItem := &schema.ORItemField{
|
||||
Type: "message",
|
||||
ID: currentMessageID,
|
||||
Status: "in_progress",
|
||||
Role: "assistant",
|
||||
Content: []schema.ORContentPart{},
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: messageItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.added
|
||||
currentContentIndex = 0
|
||||
emptyPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Part: &emptyPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
// Open the message item lazily on the first content delta.
|
||||
if routing.OpenMessageItem {
|
||||
outputIndex++
|
||||
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
|
||||
messageItem := &schema.ORItemField{
|
||||
Type: "message",
|
||||
ID: currentMessageID,
|
||||
Status: "in_progress",
|
||||
Role: "assistant",
|
||||
Content: []schema.ORContentPart{},
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: messageItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit text delta
|
||||
// Emit content_part.added
|
||||
currentContentIndex = 0
|
||||
emptyPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Part: &emptyPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Emit text delta against the msg_ item id.
|
||||
if routing.ContentDelta != "" {
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.delta",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Delta: strPtr(contentDelta),
|
||||
Delta: strPtr(routing.ContentDelta),
|
||||
Logprobs: emptyLogprobs(),
|
||||
})
|
||||
sequenceNumber++
|
||||
@@ -2331,112 +2328,109 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
return nil
|
||||
}
|
||||
|
||||
// Non-tool-call streaming path
|
||||
// Emit output_item.added for message
|
||||
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
|
||||
messageItem := &schema.ORItemField{
|
||||
Type: "message",
|
||||
ID: currentMessageID,
|
||||
Status: "in_progress",
|
||||
Role: "assistant",
|
||||
Content: []schema.ORContentPart{},
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: messageItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.added
|
||||
currentContentIndex = 0
|
||||
emptyTextPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Part: &emptyTextPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
// Non-tool-call streaming path.
|
||||
//
|
||||
// The message output item is created LAZILY on the first content delta
|
||||
// (mirroring the tool-call path), not eagerly before the first token.
|
||||
// Issue #9658: an eager msg_ item forced reasoning to a higher output
|
||||
// index and made mis-split <think> text land on the pre-existing message,
|
||||
// so the thinking monologue streamed as message text instead of reasoning.
|
||||
var messageItem *schema.ORItemField
|
||||
|
||||
// Stream text deltas with reasoning extraction
|
||||
tokenCallback := func(token string, tokenUsage backend.TokenUsage) bool {
|
||||
accumulatedText += token
|
||||
|
||||
var reasoningDelta, contentDelta string
|
||||
goReasoning, goContent := extractor.ProcessToken(token)
|
||||
routing := router.route(token, tokenUsage)
|
||||
|
||||
if tokenUsage.HasChatDeltaContent() {
|
||||
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
|
||||
contentDelta = cd
|
||||
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
|
||||
} else {
|
||||
reasoningDelta = goReasoning
|
||||
contentDelta = goContent
|
||||
// Open the reasoning item lazily on the first reasoning delta.
|
||||
if routing.OpenReasoningItem {
|
||||
outputIndex++
|
||||
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
|
||||
reasoningItem := &schema.ORItemField{
|
||||
Type: "reasoning",
|
||||
ID: currentReasoningID,
|
||||
Status: "in_progress",
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: reasoningItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.added for reasoning
|
||||
currentReasoningContentIndex = 0
|
||||
emptyPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Part: &emptyPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Handle reasoning item
|
||||
if extractor.Reasoning() != "" {
|
||||
// Check if we need to create reasoning item
|
||||
if currentReasoningID == "" {
|
||||
outputIndex++
|
||||
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
|
||||
reasoningItem := &schema.ORItemField{
|
||||
Type: "reasoning",
|
||||
ID: currentReasoningID,
|
||||
Status: "in_progress",
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: reasoningItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.added for reasoning
|
||||
currentReasoningContentIndex = 0
|
||||
emptyPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Part: &emptyPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Emit reasoning delta if there's new content
|
||||
if reasoningDelta != "" {
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.delta",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Delta: strPtr(reasoningDelta),
|
||||
Logprobs: emptyLogprobs(),
|
||||
})
|
||||
sequenceNumber++
|
||||
c.Response().Flush()
|
||||
}
|
||||
// Emit reasoning delta against the reasoning_ item id.
|
||||
if routing.ReasoningDelta != "" {
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.delta",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentReasoningID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tReasoningContentIndex,
|
||||
Delta: strPtr(routing.ReasoningDelta),
|
||||
Logprobs: emptyLogprobs(),
|
||||
})
|
||||
sequenceNumber++
|
||||
c.Response().Flush()
|
||||
}
|
||||
|
||||
// Only emit message content if there's actual content (not just reasoning)
|
||||
if contentDelta != "" {
|
||||
// Emit text delta
|
||||
// Open the message item lazily on the first content delta.
|
||||
if routing.OpenMessageItem {
|
||||
outputIndex++
|
||||
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
|
||||
messageItem = &schema.ORItemField{
|
||||
Type: "message",
|
||||
ID: currentMessageID,
|
||||
Status: "in_progress",
|
||||
Role: "assistant",
|
||||
Content: []schema.ORContentPart{},
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: messageItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.added
|
||||
currentContentIndex = 0
|
||||
emptyTextPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Part: &emptyTextPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Emit text delta against the msg_ item id.
|
||||
if routing.ContentDelta != "" {
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.delta",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Delta: strPtr(contentDelta),
|
||||
Delta: strPtr(routing.ContentDelta),
|
||||
Logprobs: emptyLogprobs(),
|
||||
})
|
||||
sequenceNumber++
|
||||
@@ -2561,40 +2555,78 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
// Convert logprobs for streaming events
|
||||
mcpStreamLogprobs := convertLogprobsForStreaming(noToolLogprobs)
|
||||
|
||||
// Emit output_text.done
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.done",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Text: strPtr(result),
|
||||
Logprobs: logprobsPtr(mcpStreamLogprobs),
|
||||
})
|
||||
sequenceNumber++
|
||||
// The message item is created lazily on the first content delta (issue
|
||||
// #9658). If no content streamed but final extraction produced text (e.g.
|
||||
// the autoparser delivered everything at once), open the message item now
|
||||
// so the closing events below are valid. A pure-reasoning turn (no content
|
||||
// at all) leaves messageItem nil and emits no message item.
|
||||
if messageItem == nil && result != "" {
|
||||
outputIndex++
|
||||
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
|
||||
messageItem = &schema.ORItemField{
|
||||
Type: "message",
|
||||
ID: currentMessageID,
|
||||
Status: "in_progress",
|
||||
Role: "assistant",
|
||||
Content: []schema.ORContentPart{},
|
||||
}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: messageItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.done (with actual logprobs)
|
||||
resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs)
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.done",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Part: &resultPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
currentContentIndex = 0
|
||||
emptyTextPart := makeOutputTextPart("")
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.added",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Part: &emptyTextPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Emit output_item.done (with actual logprobs)
|
||||
messageItem.Status = "completed"
|
||||
messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.done",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: messageItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
if messageItem != nil {
|
||||
// Emit output_text.done
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.done",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Text: strPtr(result),
|
||||
Logprobs: logprobsPtr(mcpStreamLogprobs),
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit content_part.done (with actual logprobs)
|
||||
resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs)
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.content_part.done",
|
||||
SequenceNumber: sequenceNumber,
|
||||
ItemID: currentMessageID,
|
||||
OutputIndex: &outputIndex,
|
||||
ContentIndex: ¤tContentIndex,
|
||||
Part: &resultPart,
|
||||
})
|
||||
sequenceNumber++
|
||||
|
||||
// Emit output_item.done (with actual logprobs)
|
||||
messageItem.Status = "completed"
|
||||
messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)}
|
||||
sendSSEEvent(c, &schema.ORStreamEvent{
|
||||
Type: "response.output_item.done",
|
||||
SequenceNumber: sequenceNumber,
|
||||
OutputIndex: &outputIndex,
|
||||
Item: messageItem,
|
||||
})
|
||||
sequenceNumber++
|
||||
}
|
||||
|
||||
// Emit function_call items from automatic tool parsing fallback
|
||||
for _, fc := range streamFallbackToolCalls {
|
||||
@@ -2631,10 +2663,13 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
// Emit response.completed
|
||||
now := time.Now().Unix()
|
||||
|
||||
// Collect final output items (reasoning first, then messages, then tool calls)
|
||||
// Collect final output items, ordered reasoning -> message -> tool calls.
|
||||
// Issue #9658: reasoning is emitted as its own item ahead of the message,
|
||||
// matching the streamed order (reasoning item is opened before the message
|
||||
// item when the model thinks first).
|
||||
var finalOutputItems []schema.ORItemField
|
||||
// Add reasoning item if it exists
|
||||
if currentReasoningID != "" && finalReasoning != "" {
|
||||
// Add reasoning item if one was streamed.
|
||||
if router.ReasoningStreamed() && finalReasoning != "" {
|
||||
finalOutputItems = append(finalOutputItems, schema.ORItemField{
|
||||
Type: "reasoning",
|
||||
ID: currentReasoningID,
|
||||
@@ -2642,18 +2677,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
Content: []schema.ORContentPart{makeOutputTextPart(finalReasoning)},
|
||||
})
|
||||
}
|
||||
// Add message item
|
||||
if len(collectedOutputItems) > 0 {
|
||||
// Use collected items (may include reasoning already)
|
||||
for _, item := range collectedOutputItems {
|
||||
if item.Type == "message" {
|
||||
finalOutputItems = append(finalOutputItems, item)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Add the message item if one was produced (created lazily, so it may be
|
||||
// nil for a pure-reasoning turn).
|
||||
if messageItem != nil {
|
||||
finalOutputItems = append(finalOutputItems, *messageItem)
|
||||
}
|
||||
// Add function_call items from fallback
|
||||
// Add function_call items from fallback parsing.
|
||||
for _, item := range collectedOutputItems {
|
||||
if item.Type == "function_call" {
|
||||
finalOutputItems = append(finalOutputItems, item)
|
||||
|
||||
114
core/http/endpoints/openresponses/responses_stream_reasoning.go
Normal file
114
core/http/endpoints/openresponses/responses_stream_reasoning.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package openresponses
|
||||
|
||||
import (
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
reason "github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// streamTokenRouting describes how a single streamed token's deltas should be
|
||||
// routed to Open Responses output items: the reasoning/content split and
|
||||
// whether a new reasoning or message output item must be opened before the
|
||||
// corresponding delta can be emitted.
|
||||
type streamTokenRouting struct {
|
||||
ReasoningDelta string
|
||||
ContentDelta string
|
||||
// OpenReasoningItem is true when a reasoning output item must be created
|
||||
// before emitting ReasoningDelta (the first reasoning delta of the stream).
|
||||
OpenReasoningItem bool
|
||||
// OpenMessageItem is true when a message output item must be created before
|
||||
// emitting ContentDelta (the first content delta of the stream).
|
||||
OpenMessageItem bool
|
||||
}
|
||||
|
||||
// streamReasoningRouter classifies streamed tokens into reasoning vs message
|
||||
// deltas and tracks which output items have been opened, so the SSE-emitting
|
||||
// code in handleOpenResponsesStream becomes a thin shell over a unit-testable
|
||||
// decision.
|
||||
//
|
||||
// It mirrors the sticky-preferAutoparser logic in the OpenAI chat streaming
|
||||
// worker (core/http/endpoints/openai/chat_stream_workers.go, processStream):
|
||||
// once the C++ autoparser has surfaced reasoning_content, we trust its
|
||||
// classification for the rest of the stream; until then we fall back to the
|
||||
// Go-side reasoning extractor so a pure-content autoparser (the non-jinja PEG
|
||||
// fallback, issue #9985) does not leak <think>...</think> tokens into content.
|
||||
//
|
||||
// Crucially, the decision to open and target a reasoning item keys off the
|
||||
// per-token reasoningDelta, NOT extractor.Reasoning(): the autoparser path
|
||||
// computes reasoning through ProcessChatDeltaReasoning, which updates a
|
||||
// separate accumulator that extractor.Reasoning() never exposes. Gating on
|
||||
// extractor.Reasoning() (issue #9658) dropped live reasoning whenever the
|
||||
// autoparser drove it via reasoning_content, surfacing it only after the
|
||||
// stream completed and mis-routing earlier deltas onto the msg_ item.
|
||||
type streamReasoningRouter struct {
|
||||
extractor *reason.ReasoningExtractor
|
||||
preferAutoparser bool
|
||||
reasoningOpened bool
|
||||
messageOpened bool
|
||||
}
|
||||
|
||||
func newStreamReasoningRouter(extractor *reason.ReasoningExtractor) *streamReasoningRouter {
|
||||
return &streamReasoningRouter{extractor: extractor}
|
||||
}
|
||||
|
||||
// classify splits a token into reasoning/content deltas using the sticky
|
||||
// preferAutoparser preference. Once the C++ autoparser has surfaced
|
||||
// reasoning_content we trust it for the rest of the stream; until then we fall
|
||||
// back to the Go-side extractor so a pure-content autoparser (zero
|
||||
// reasoning_content, issue #9985) does not leak <think>...</think> tokens into
|
||||
// content.
|
||||
func (r *streamReasoningRouter) classify(token string, usage backend.TokenUsage) (reasoningDelta, contentDelta string) {
|
||||
goReasoning, goContent := r.extractor.ProcessToken(token)
|
||||
if usage.HasChatDeltaContent() {
|
||||
rawReasoning, cd := usage.ChatDeltaReasoningAndContent()
|
||||
if rawReasoning != "" {
|
||||
r.preferAutoparser = true
|
||||
}
|
||||
if r.preferAutoparser {
|
||||
contentDelta = cd
|
||||
reasoningDelta = r.extractor.ProcessChatDeltaReasoning(rawReasoning)
|
||||
} else {
|
||||
reasoningDelta = goReasoning
|
||||
contentDelta = goContent
|
||||
}
|
||||
} else {
|
||||
reasoningDelta = goReasoning
|
||||
contentDelta = goContent
|
||||
}
|
||||
return reasoningDelta, contentDelta
|
||||
}
|
||||
|
||||
// route classifies a token and decides which output items its deltas target,
|
||||
// flipping the opened-flags as items are created.
|
||||
//
|
||||
// The reasoning gate keys off reasoningDelta, NOT extractor.Reasoning(): the
|
||||
// autoparser path computes reasoning via ProcessChatDeltaReasoning into a
|
||||
// separate accumulator that extractor.Reasoning() never reflects (issue #9658).
|
||||
func (r *streamReasoningRouter) route(token string, usage backend.TokenUsage) streamTokenRouting {
|
||||
reasoningDelta, contentDelta := r.classify(token, usage)
|
||||
out := streamTokenRouting{ReasoningDelta: reasoningDelta, ContentDelta: contentDelta}
|
||||
if reasoningDelta != "" && !r.reasoningOpened {
|
||||
out.OpenReasoningItem = true
|
||||
r.reasoningOpened = true
|
||||
}
|
||||
if contentDelta != "" && !r.messageOpened {
|
||||
out.OpenMessageItem = true
|
||||
r.messageOpened = true
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// resetForIteration clears the per-stream routing state for an MCP re-inference
|
||||
// iteration, mirroring extractor.Reset() on the underlying extractor.
|
||||
func (r *streamReasoningRouter) resetForIteration() {
|
||||
r.preferAutoparser = false
|
||||
r.reasoningOpened = false
|
||||
r.messageOpened = false
|
||||
r.extractor.Reset()
|
||||
}
|
||||
|
||||
// ReasoningStreamed reports whether a reasoning output item was opened during
|
||||
// the stream. The end-of-stream closing blocks key off this rather than a
|
||||
// reasoning-id string so the ordering (reasoning before message) is explicit.
|
||||
func (r *streamReasoningRouter) ReasoningStreamed() bool {
|
||||
return r.reasoningOpened
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
package openresponses
|
||||
|
||||
import (
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
|
||||
reason "github.com/mudler/LocalAI/pkg/reasoning"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// usageWithChatDeltas builds a TokenUsage carrying a single C++ autoparser
|
||||
// ChatDelta with the given content / reasoning_content split.
|
||||
func usageWithChatDeltas(content, reasoningContent string) backend.TokenUsage {
|
||||
return backend.TokenUsage{
|
||||
ChatDeltas: []*pb.ChatDelta{
|
||||
{Content: content, ReasoningContent: reasoningContent},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Regression tests for issue #9658: in the /v1/responses streaming handler the
|
||||
// thinking monologue from a reasoning model was streamed to the client as a
|
||||
// normal message (msg_ item, output_text.delta) instead of as a reasoning
|
||||
// item, and was only re-classified into a reasoning item AFTER the stream
|
||||
// completed.
|
||||
//
|
||||
// Root cause: the live reasoning item was gated on extractor.Reasoning(),
|
||||
// which is only updated by the Go-side raw-tag parser (ProcessToken). When the
|
||||
// C++ autoparser drives reasoning through reasoning_content ChatDeltas, the
|
||||
// reasoning is computed via ProcessChatDeltaReasoning into a SEPARATE
|
||||
// accumulator, so extractor.Reasoning() stays empty and the gate never fires.
|
||||
var _ = Describe("streamReasoningRouter", func() {
|
||||
Context("autoparser drives reasoning via reasoning_content (issue #9658)", func() {
|
||||
It("opens a reasoning item during streaming and targets it (not the message)", func() {
|
||||
extractor := reason.NewReasoningExtractor("", reason.Config{})
|
||||
router := newStreamReasoningRouter(extractor)
|
||||
|
||||
// The raw token is empty: the autoparser carries the reasoning in
|
||||
// ChatDelta.ReasoningContent, so the Go-side extractor's
|
||||
// Reasoning() stays "" — exactly the state in which the buggy
|
||||
// extractor.Reasoning() gate failed to open a reasoning item.
|
||||
routing := router.route("", usageWithChatDeltas("", "Let me think about this"))
|
||||
|
||||
Expect(routing.ReasoningDelta).To(Equal("Let me think about this"),
|
||||
"the autoparser's reasoning_content must surface as a reasoning delta during streaming")
|
||||
Expect(routing.OpenReasoningItem).To(BeTrue(),
|
||||
"a reasoning output item must be opened live, not deferred to end-of-stream (#9658)")
|
||||
Expect(routing.ContentDelta).To(BeEmpty())
|
||||
Expect(routing.OpenMessageItem).To(BeFalse(),
|
||||
"reasoning deltas must target the reasoning_ item, never open/route to a msg_ item")
|
||||
})
|
||||
|
||||
It("does not re-open the reasoning item on subsequent reasoning deltas", func() {
|
||||
extractor := reason.NewReasoningExtractor("", reason.Config{})
|
||||
router := newStreamReasoningRouter(extractor)
|
||||
|
||||
_ = router.route("", usageWithChatDeltas("", "first "))
|
||||
routing := router.route("", usageWithChatDeltas("", "second"))
|
||||
|
||||
Expect(routing.ReasoningDelta).To(Equal("second"))
|
||||
Expect(routing.OpenReasoningItem).To(BeFalse())
|
||||
})
|
||||
})
|
||||
|
||||
Context("pure content stream", func() {
|
||||
It("never opens a reasoning item", func() {
|
||||
extractor := reason.NewReasoningExtractor("", reason.Config{})
|
||||
router := newStreamReasoningRouter(extractor)
|
||||
|
||||
// Content-only with no reasoning_content: the autoparser is in its
|
||||
// pure-content mode, so the router stays on the Go-side extractor,
|
||||
// which sees the content via the raw token.
|
||||
routing := router.route("hello world", usageWithChatDeltas("hello world", ""))
|
||||
|
||||
Expect(routing.ContentDelta).To(Equal("hello world"))
|
||||
Expect(routing.OpenMessageItem).To(BeTrue())
|
||||
Expect(routing.OpenReasoningItem).To(BeFalse(),
|
||||
"a content-only stream must never open a reasoning item")
|
||||
Expect(router.ReasoningStreamed()).To(BeFalse())
|
||||
})
|
||||
})
|
||||
|
||||
Context("content-only autoparser with embedded <think> (issue #9985 fallback)", func() {
|
||||
It("falls back to Go-side extraction instead of leaking <think> into content", func() {
|
||||
extractor := reason.NewReasoningExtractor("", reason.Config{})
|
||||
router := newStreamReasoningRouter(extractor)
|
||||
|
||||
// The autoparser is in its non-jinja pure-content fallback: it
|
||||
// surfaces the whole string as Content with zero reasoning_content,
|
||||
// tags and all. The router must NOT trust it (preferAutoparser must
|
||||
// stay false) and instead use the Go-side split.
|
||||
routing := router.route("<think>reasoning here</think>answer",
|
||||
usageWithChatDeltas("<think>reasoning here</think>answer", ""))
|
||||
|
||||
Expect(routing.ContentDelta).To(Equal("answer"),
|
||||
"content must be the cleaned answer, not the raw <think>...</think> string")
|
||||
Expect(routing.ReasoningDelta).To(Equal("reasoning here"))
|
||||
Expect(routing.OpenReasoningItem).To(BeTrue())
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user