Compare commits

..

1 Commits

Author SHA1 Message Date
Ettore Di Giacinto
390664ff72 fix(responses): classify streamed reasoning as a reasoning item live (#9658)
In the /v1/responses streaming handler a reasoning model's thinking
monologue was streamed to the client as normal message text (a msg_
output item with output_text.delta) and only reclassified into a
reasoning item after the stream completed. Subsequent output_text.delta
events also kept referencing the old msg_ item id instead of the
reasoning_ id.

Root causes:

1. The live reasoning item was gated on extractor.Reasoning(), which is
   only updated by the Go-side raw-tag parser (ProcessToken). When the
   C++ autoparser drives reasoning through reasoning_content ChatDeltas,
   the reasoning delta is computed via ProcessChatDeltaReasoning into a
   separate accumulator, so extractor.Reasoning() stays empty and the
   gate never fired. The reasoning item was thus only reconstructed at
   end-of-stream.

2. The non-tool-call path created the message/msg_ output item eagerly
   before any token, forcing reasoning to a higher output index and
   making mis-split <think> text land on the pre-existing message item.

3. Neither path carried the sticky preferAutoparser flag, so a
   content-only autoparser (the non-jinja pure-content fallback, #9985)
   could leak <think>...</think> tokens into content.

Extract the per-token reasoning-vs-message classification into a pure,
unit-tested streamReasoningRouter (mirroring chooseDeferredReasoning and
processStream in the chat streaming worker): it gates the reasoning item
on the reasoning delta, opens the message item lazily on the first
content delta, and keeps a sticky preferAutoparser fallback. Both
streaming paths now route reasoning deltas to the reasoning_ id and order
the reasoning item ahead of the message at completion.

Assisted-by: claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-12 21:39:42 +00:00
20 changed files with 485 additions and 1981 deletions

View File

@@ -10,7 +10,7 @@ JOBS?=$(shell nproc --ignore=1)
# this on `master` always picks up the latest C-API surface (incl. the
# per-detection accessor functions used by golocateanythingcpp.go).
LOCATEANYTHING_REPO?=https://github.com/mudler/locate-anything.cpp.git
LOCATEANYTHING_VERSION?=92c1682da792c1e8a5dec91acc2be4b02c742ded
LOCATEANYTHING_VERSION?=60e450945476d5e97e0754a8c0e71a9ea81690e0
ifeq ($(NATIVE),false)
CMAKE_ARGS+=-DGGML_NATIVE=OFF

View File

@@ -1,5 +1,5 @@
--extra-index-url https://download.pytorch.org/whl/rocm7.0
torch==2.12.0+cpu
torch==2.10.0+rocm7.0
torchaudio
torchvision

View File

@@ -1,5 +1,5 @@
--extra-index-url https://download.pytorch.org/whl/cpu
torch==2.12.0+cpu
torch==2.10.0
transformers>=4.56.2
huggingface-hub>=1.3.0
sentencepiece

View File

@@ -1,4 +1,4 @@
torch==2.12.0+cpu
torch==2.10.0
transformers>=4.56.2
huggingface-hub>=1.3.0
sentencepiece

View File

@@ -1,7 +1,6 @@
--extra-index-url https://download.pytorch.org/whl/cpu
accelerate
torch==2.8.0
torchaudio==2.8.0
transformers==4.56.1
librosa==0.11.0
neucodec>=0.0.4

View File

@@ -3,7 +3,6 @@ neucodec>=0.0.4
phonemizer==3.3.0
soundfile==0.13.1
torch==2.8.0
torchaudio==2.8.0
transformers==4.56.1
resemble-perth==1.0.1
accelerate

View File

@@ -1,6 +1,6 @@
--extra-index-url https://download.pytorch.org/whl/cpu
accelerate
torch==2.12.0+cpu
torch==2.9.0
torchvision
torchaudio
transformers

View File

@@ -6,7 +6,7 @@
# for cublas12 so uv consults this index alongside PyPI.
--extra-index-url https://download.pytorch.org/whl/cu128
accelerate
torch==2.12.0+cpu
torch==2.9.1
torchvision
torchaudio
transformers

View File

@@ -1,4 +1,4 @@
accelerate
torch==2.12.0+cu130
torch==2.7.0
transformers
bitsandbytes

View File

@@ -307,19 +307,11 @@ func gRPCPredictOpts(c config.ModelConfig, modelPath string) *pb.PredictOptions
}
}
// TopK may be nil after SetDefaults for backends that don't use llama.cpp's
// top_k=40 default (issue #6632, e.g. mlx). proto3 int32 can't be unset, so
// send 0 — the value mlx actually wants (top-k disabled).
var topK int32
if c.TopK != nil {
topK = int32(*c.TopK)
}
pbOpts := &pb.PredictOptions{
Temperature: float32(*c.Temperature),
TopP: float32(*c.TopP),
NDraft: c.NDraft,
TopK: topK,
TopK: int32(*c.TopK),
MinP: float32(*c.MinP),
Tokens: int32(*c.Maxtokens),
Threads: int32(*c.Threads),

View File

@@ -517,33 +517,6 @@ func NormalizeBackendName(backend string) string {
return strings.ReplaceAll(backend, ".", "-")
}
// nonLlamaSamplerBackends lists backends whose native sampler defaults differ
// from llama.cpp's, so LocalAI must NOT inject llama.cpp's top_k=40 default for
// them (issue #6632). mlx_lm's intended default is top_k=0 (disabled) and mlx
// does not remap 0->40, so shipping 40 silently changes sampling for clients
// that omit top_k. Leaving TopK nil lets the wire value default to 0.
//
// This is intentionally a small allow-list of KNOWN non-llama backends: empty
// and unknown backends fall through to the llama.cpp default to preserve the
// GGUF auto-detect path's behavior.
var nonLlamaSamplerBackends = map[string]struct{}{
"mlx": {},
"mlx-vlm": {},
"mlx-distributed": {},
}
// UsesLlamaSamplerDefaults reports whether a backend should receive llama.cpp's
// sampler defaults (e.g. top_k=40). Empty/unknown backends return true so the
// GGUF auto-detect path (which resolves to llama.cpp) keeps today's behavior;
// only the known non-llama backends in nonLlamaSamplerBackends return false.
func UsesLlamaSamplerDefaults(backend string) bool {
if backend == "" {
return true
}
_, isNonLlama := nonLlamaSamplerBackends[NormalizeBackendName(backend)]
return !isNonLlama
}
// GetBackendCapability returns the capability info for a backend, or nil if unknown.
// Handles backend name normalization.
func GetBackendCapability(backend string) *BackendCapability {

View File

@@ -867,12 +867,7 @@ func (cfg *ModelConfig) SetDefaults(opts ...ConfigLoaderOption) {
cfg.Seed = &defaultSeed
}
// top_k=40 is llama.cpp's sampling default and is wrong for backends whose
// native default differs (issue #6632). Only inject it for the llama.cpp
// family and the empty/auto backend; leave TopK nil for known non-llama
// backends (e.g. mlx, whose intended default is top_k=0) so the wire value
// is 0 rather than a silently-changed 40.
if cfg.TopK == nil && UsesLlamaSamplerDefaults(cfg.Backend) {
if cfg.TopK == nil {
cfg.TopK = &defaultTopK
}

View File

@@ -529,72 +529,4 @@ concurrency_groups:
"models that template in Go still rely on the Go-generated grammar")
})
})
// The default top_k=40 is llama.cpp's sampling default and is WRONG for
// backends whose native default differs. mlx_lm's intended default is
// top_k=0 (disabled) and mlx does not remap 0->40, so injecting 40 silently
// changes sampling for mlx clients that omit top_k (issue #6632). Gate the
// injection on backend family: keep 40 for the llama.cpp family and for the
// empty/auto backend (the GGUF auto-detect path resolves to llama.cpp), but
// leave TopK nil for the mlx family so the wire value is 0.
Context("TopK default is backend-gated (issue #6632)", func() {
It("injects top_k=40 for the llama.cpp backend", func() {
cfg := &ModelConfig{}
cfg.Backend = "llama-cpp"
cfg.SetDefaults()
Expect(cfg.TopK).NotTo(BeNil(), "llama.cpp must keep its top_k=40 default")
Expect(*cfg.TopK).To(Equal(40))
})
It("injects top_k=40 for the empty/auto backend (GGUF auto-detect)", func() {
cfg := &ModelConfig{}
cfg.SetDefaults()
Expect(cfg.TopK).NotTo(BeNil(), "empty backend resolves to llama.cpp; default unchanged")
Expect(*cfg.TopK).To(Equal(40))
})
It("leaves TopK nil for the mlx backend", func() {
cfg := &ModelConfig{}
cfg.Backend = "mlx"
cfg.SetDefaults()
Expect(cfg.TopK).To(BeNil(),
"mlx_lm's intended default is top_k=0 (disabled); LocalAI must not inject 40")
})
It("leaves TopK nil for the mlx-vlm backend", func() {
cfg := &ModelConfig{}
cfg.Backend = "mlx-vlm"
cfg.SetDefaults()
Expect(cfg.TopK).To(BeNil())
})
It("leaves TopK nil for the mlx-distributed backend", func() {
cfg := &ModelConfig{}
cfg.Backend = "mlx-distributed"
cfg.SetDefaults()
Expect(cfg.TopK).To(BeNil())
})
It("respects an explicit top_k even for the mlx backend", func() {
explicit := 7
cfg := &ModelConfig{}
cfg.Backend = "mlx"
cfg.TopK = &explicit
cfg.SetDefaults()
Expect(cfg.TopK).NotTo(BeNil())
Expect(*cfg.TopK).To(Equal(7))
})
})
})

View File

@@ -990,18 +990,8 @@ func updateSession(session *Session, update *types.SessionUnion, cl *config.Mode
}
if rt.Audio != nil && rt.Audio.Input != nil && rt.Audio.Input.Transcription != nil {
trUpd := rt.Audio.Input.Transcription
// A language-only update (e.g. a client forcing the STT language) carries
// an empty Model. Preserve the pipeline's configured transcription backend
// instead of blanking it — otherwise the next utterance transcribes against
// an empty model and the backend RPC fails with "unimplemented".
if trUpd.Model == "" && session.InputAudioTranscription != nil {
trUpd.Model = session.InputAudioTranscription.Model
}
session.InputAudioTranscription = trUpd
if trUpd.Model != "" {
session.ModelConfig.Pipeline.Transcription = trUpd.Model
}
session.InputAudioTranscription = rt.Audio.Input.Transcription
session.ModelConfig.Pipeline.Transcription = rt.Audio.Input.Transcription.Model
}
if rt.Model != "" || (rt.Audio != nil && rt.Audio.Output != nil && rt.Audio.Output.Voice != "") || (rt.Audio != nil && rt.Audio.Input != nil && rt.Audio.Input.Transcription != nil) {

View File

@@ -1648,6 +1648,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
var currentReasoningContentIndex int
var reasoningTokens int
extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig)
// router classifies each streamed token into reasoning vs message deltas
// and decides which output item they target. It encapsulates the
// sticky-preferAutoparser fallback and the reasoningDelta-based gate that
// fix issue #9658 (live reasoning was mis-routed onto the msg_ item and
// only re-classified as a reasoning item after the stream completed).
router := newStreamReasoningRouter(extractor)
// Collect all output items for storage
var collectedOutputItems []schema.ORItemField
@@ -1671,7 +1677,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// Reset reasoning and tool-call state for re-inference so reasoning
// extraction runs again on subsequent iterations
inToolCallMode = false
extractor.Reset()
router.resetForIteration()
currentMessageID = ""
lastEmittedToolCallCount = 0
currentReasoningID = ""
@@ -1832,110 +1838,101 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// If no tool calls detected yet, handle reasoning and text
if !inToolCallMode {
var reasoningDelta, contentDelta string
goReasoning, goContent := extractor.ProcessToken(token)
routing := router.route(token, tokenUsage)
if tokenUsage.HasChatDeltaContent() {
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
// Handle reasoning item. The reasoning item is opened lazily
// on the first reasoning delta - gating on routing, not
// extractor.Reasoning() (issue #9658): when the C++
// autoparser drives reasoning via reasoning_content,
// extractor.Reasoning() stays empty and the old gate dropped
// the live reasoning item.
if routing.OpenReasoningItem {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Handle reasoning item
if extractor.Reasoning() != "" {
// Check if we need to create reasoning item
if currentReasoningID == "" {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Emit reasoning delta if there's new content
if reasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(reasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Emit reasoning delta against the reasoning_ item id.
if routing.ReasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(routing.ReasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Only emit message content if there's actual content (not just reasoning)
if contentDelta != "" {
if currentMessageID == "" {
// Emit output_item.added for message
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem := &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.added
currentContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyPart,
})
sequenceNumber++
// Open the message item lazily on the first content delta.
if routing.OpenMessageItem {
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem := &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit text delta
// Emit content_part.added
currentContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Emit text delta against the msg_ item id.
if routing.ContentDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Delta: strPtr(contentDelta),
Delta: strPtr(routing.ContentDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
@@ -2331,112 +2328,109 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
return nil
}
// Non-tool-call streaming path
// Emit output_item.added for message
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem := &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.added
currentContentIndex = 0
emptyTextPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyTextPart,
})
sequenceNumber++
// Non-tool-call streaming path.
//
// The message output item is created LAZILY on the first content delta
// (mirroring the tool-call path), not eagerly before the first token.
// Issue #9658: an eager msg_ item forced reasoning to a higher output
// index and made mis-split <think> text land on the pre-existing message,
// so the thinking monologue streamed as message text instead of reasoning.
var messageItem *schema.ORItemField
// Stream text deltas with reasoning extraction
tokenCallback := func(token string, tokenUsage backend.TokenUsage) bool {
accumulatedText += token
var reasoningDelta, contentDelta string
goReasoning, goContent := extractor.ProcessToken(token)
routing := router.route(token, tokenUsage)
if tokenUsage.HasChatDeltaContent() {
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
// Open the reasoning item lazily on the first reasoning delta.
if routing.OpenReasoningItem {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Handle reasoning item
if extractor.Reasoning() != "" {
// Check if we need to create reasoning item
if currentReasoningID == "" {
outputIndex++
currentReasoningID = fmt.Sprintf("reasoning_%s", uuid.New().String())
reasoningItem := &schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
Status: "in_progress",
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: reasoningItem,
})
sequenceNumber++
// Emit content_part.added for reasoning
currentReasoningContentIndex = 0
emptyPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Part: &emptyPart,
})
sequenceNumber++
}
// Emit reasoning delta if there's new content
if reasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(reasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Emit reasoning delta against the reasoning_ item id.
if routing.ReasoningDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentReasoningID,
OutputIndex: &outputIndex,
ContentIndex: &currentReasoningContentIndex,
Delta: strPtr(routing.ReasoningDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
c.Response().Flush()
}
// Only emit message content if there's actual content (not just reasoning)
if contentDelta != "" {
// Emit text delta
// Open the message item lazily on the first content delta.
if routing.OpenMessageItem {
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem = &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.added
currentContentIndex = 0
emptyTextPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyTextPart,
})
sequenceNumber++
}
// Emit text delta against the msg_ item id.
if routing.ContentDelta != "" {
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.delta",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Delta: strPtr(contentDelta),
Delta: strPtr(routing.ContentDelta),
Logprobs: emptyLogprobs(),
})
sequenceNumber++
@@ -2561,40 +2555,78 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// Convert logprobs for streaming events
mcpStreamLogprobs := convertLogprobsForStreaming(noToolLogprobs)
// Emit output_text.done
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Text: strPtr(result),
Logprobs: logprobsPtr(mcpStreamLogprobs),
})
sequenceNumber++
// The message item is created lazily on the first content delta (issue
// #9658). If no content streamed but final extraction produced text (e.g.
// the autoparser delivered everything at once), open the message item now
// so the closing events below are valid. A pure-reasoning turn (no content
// at all) leaves messageItem nil and emits no message item.
if messageItem == nil && result != "" {
outputIndex++
currentMessageID = fmt.Sprintf("msg_%s", uuid.New().String())
messageItem = &schema.ORItemField{
Type: "message",
ID: currentMessageID,
Status: "in_progress",
Role: "assistant",
Content: []schema.ORContentPart{},
}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.added",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
// Emit content_part.done (with actual logprobs)
resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs)
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &resultPart,
})
sequenceNumber++
currentContentIndex = 0
emptyTextPart := makeOutputTextPart("")
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.added",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &emptyTextPart,
})
sequenceNumber++
}
// Emit output_item.done (with actual logprobs)
messageItem.Status = "completed"
messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.done",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
if messageItem != nil {
// Emit output_text.done
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_text.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Text: strPtr(result),
Logprobs: logprobsPtr(mcpStreamLogprobs),
})
sequenceNumber++
// Emit content_part.done (with actual logprobs)
resultPart := makeOutputTextPartWithLogprobs(result, noToolLogprobs)
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.content_part.done",
SequenceNumber: sequenceNumber,
ItemID: currentMessageID,
OutputIndex: &outputIndex,
ContentIndex: &currentContentIndex,
Part: &resultPart,
})
sequenceNumber++
// Emit output_item.done (with actual logprobs)
messageItem.Status = "completed"
messageItem.Content = []schema.ORContentPart{makeOutputTextPartWithLogprobs(result, noToolLogprobs)}
sendSSEEvent(c, &schema.ORStreamEvent{
Type: "response.output_item.done",
SequenceNumber: sequenceNumber,
OutputIndex: &outputIndex,
Item: messageItem,
})
sequenceNumber++
}
// Emit function_call items from automatic tool parsing fallback
for _, fc := range streamFallbackToolCalls {
@@ -2631,10 +2663,13 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
// Emit response.completed
now := time.Now().Unix()
// Collect final output items (reasoning first, then messages, then tool calls)
// Collect final output items, ordered reasoning -> message -> tool calls.
// Issue #9658: reasoning is emitted as its own item ahead of the message,
// matching the streamed order (reasoning item is opened before the message
// item when the model thinks first).
var finalOutputItems []schema.ORItemField
// Add reasoning item if it exists
if currentReasoningID != "" && finalReasoning != "" {
// Add reasoning item if one was streamed.
if router.ReasoningStreamed() && finalReasoning != "" {
finalOutputItems = append(finalOutputItems, schema.ORItemField{
Type: "reasoning",
ID: currentReasoningID,
@@ -2642,18 +2677,12 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
Content: []schema.ORContentPart{makeOutputTextPart(finalReasoning)},
})
}
// Add message item
if len(collectedOutputItems) > 0 {
// Use collected items (may include reasoning already)
for _, item := range collectedOutputItems {
if item.Type == "message" {
finalOutputItems = append(finalOutputItems, item)
}
}
} else {
// Add the message item if one was produced (created lazily, so it may be
// nil for a pure-reasoning turn).
if messageItem != nil {
finalOutputItems = append(finalOutputItems, *messageItem)
}
// Add function_call items from fallback
// Add function_call items from fallback parsing.
for _, item := range collectedOutputItems {
if item.Type == "function_call" {
finalOutputItems = append(finalOutputItems, item)

View File

@@ -0,0 +1,114 @@
package openresponses
import (
"github.com/mudler/LocalAI/core/backend"
reason "github.com/mudler/LocalAI/pkg/reasoning"
)
// streamTokenRouting describes how a single streamed token's deltas should be
// routed to Open Responses output items: the reasoning/content split and
// whether a new reasoning or message output item must be opened before the
// corresponding delta can be emitted.
type streamTokenRouting struct {
ReasoningDelta string
ContentDelta string
// OpenReasoningItem is true when a reasoning output item must be created
// before emitting ReasoningDelta (the first reasoning delta of the stream).
OpenReasoningItem bool
// OpenMessageItem is true when a message output item must be created before
// emitting ContentDelta (the first content delta of the stream).
OpenMessageItem bool
}
// streamReasoningRouter classifies streamed tokens into reasoning vs message
// deltas and tracks which output items have been opened, so the SSE-emitting
// code in handleOpenResponsesStream becomes a thin shell over a unit-testable
// decision.
//
// It mirrors the sticky-preferAutoparser logic in the OpenAI chat streaming
// worker (core/http/endpoints/openai/chat_stream_workers.go, processStream):
// once the C++ autoparser has surfaced reasoning_content, we trust its
// classification for the rest of the stream; until then we fall back to the
// Go-side reasoning extractor so a pure-content autoparser (the non-jinja PEG
// fallback, issue #9985) does not leak <think>...</think> tokens into content.
//
// Crucially, the decision to open and target a reasoning item keys off the
// per-token reasoningDelta, NOT extractor.Reasoning(): the autoparser path
// computes reasoning through ProcessChatDeltaReasoning, which updates a
// separate accumulator that extractor.Reasoning() never exposes. Gating on
// extractor.Reasoning() (issue #9658) dropped live reasoning whenever the
// autoparser drove it via reasoning_content, surfacing it only after the
// stream completed and mis-routing earlier deltas onto the msg_ item.
type streamReasoningRouter struct {
extractor *reason.ReasoningExtractor
preferAutoparser bool
reasoningOpened bool
messageOpened bool
}
func newStreamReasoningRouter(extractor *reason.ReasoningExtractor) *streamReasoningRouter {
return &streamReasoningRouter{extractor: extractor}
}
// classify splits a token into reasoning/content deltas using the sticky
// preferAutoparser preference. Once the C++ autoparser has surfaced
// reasoning_content we trust it for the rest of the stream; until then we fall
// back to the Go-side extractor so a pure-content autoparser (zero
// reasoning_content, issue #9985) does not leak <think>...</think> tokens into
// content.
func (r *streamReasoningRouter) classify(token string, usage backend.TokenUsage) (reasoningDelta, contentDelta string) {
goReasoning, goContent := r.extractor.ProcessToken(token)
if usage.HasChatDeltaContent() {
rawReasoning, cd := usage.ChatDeltaReasoningAndContent()
if rawReasoning != "" {
r.preferAutoparser = true
}
if r.preferAutoparser {
contentDelta = cd
reasoningDelta = r.extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
return reasoningDelta, contentDelta
}
// route classifies a token and decides which output items its deltas target,
// flipping the opened-flags as items are created.
//
// The reasoning gate keys off reasoningDelta, NOT extractor.Reasoning(): the
// autoparser path computes reasoning via ProcessChatDeltaReasoning into a
// separate accumulator that extractor.Reasoning() never reflects (issue #9658).
func (r *streamReasoningRouter) route(token string, usage backend.TokenUsage) streamTokenRouting {
reasoningDelta, contentDelta := r.classify(token, usage)
out := streamTokenRouting{ReasoningDelta: reasoningDelta, ContentDelta: contentDelta}
if reasoningDelta != "" && !r.reasoningOpened {
out.OpenReasoningItem = true
r.reasoningOpened = true
}
if contentDelta != "" && !r.messageOpened {
out.OpenMessageItem = true
r.messageOpened = true
}
return out
}
// resetForIteration clears the per-stream routing state for an MCP re-inference
// iteration, mirroring extractor.Reset() on the underlying extractor.
func (r *streamReasoningRouter) resetForIteration() {
r.preferAutoparser = false
r.reasoningOpened = false
r.messageOpened = false
r.extractor.Reset()
}
// ReasoningStreamed reports whether a reasoning output item was opened during
// the stream. The end-of-stream closing blocks key off this rather than a
// reasoning-id string so the ordering (reasoning before message) is explicit.
func (r *streamReasoningRouter) ReasoningStreamed() bool {
return r.reasoningOpened
}

View File

@@ -0,0 +1,101 @@
package openresponses
import (
"github.com/mudler/LocalAI/core/backend"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
reason "github.com/mudler/LocalAI/pkg/reasoning"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// usageWithChatDeltas builds a TokenUsage carrying a single C++ autoparser
// ChatDelta with the given content / reasoning_content split.
func usageWithChatDeltas(content, reasoningContent string) backend.TokenUsage {
return backend.TokenUsage{
ChatDeltas: []*pb.ChatDelta{
{Content: content, ReasoningContent: reasoningContent},
},
}
}
// Regression tests for issue #9658: in the /v1/responses streaming handler the
// thinking monologue from a reasoning model was streamed to the client as a
// normal message (msg_ item, output_text.delta) instead of as a reasoning
// item, and was only re-classified into a reasoning item AFTER the stream
// completed.
//
// Root cause: the live reasoning item was gated on extractor.Reasoning(),
// which is only updated by the Go-side raw-tag parser (ProcessToken). When the
// C++ autoparser drives reasoning through reasoning_content ChatDeltas, the
// reasoning is computed via ProcessChatDeltaReasoning into a SEPARATE
// accumulator, so extractor.Reasoning() stays empty and the gate never fires.
var _ = Describe("streamReasoningRouter", func() {
Context("autoparser drives reasoning via reasoning_content (issue #9658)", func() {
It("opens a reasoning item during streaming and targets it (not the message)", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
// The raw token is empty: the autoparser carries the reasoning in
// ChatDelta.ReasoningContent, so the Go-side extractor's
// Reasoning() stays "" — exactly the state in which the buggy
// extractor.Reasoning() gate failed to open a reasoning item.
routing := router.route("", usageWithChatDeltas("", "Let me think about this"))
Expect(routing.ReasoningDelta).To(Equal("Let me think about this"),
"the autoparser's reasoning_content must surface as a reasoning delta during streaming")
Expect(routing.OpenReasoningItem).To(BeTrue(),
"a reasoning output item must be opened live, not deferred to end-of-stream (#9658)")
Expect(routing.ContentDelta).To(BeEmpty())
Expect(routing.OpenMessageItem).To(BeFalse(),
"reasoning deltas must target the reasoning_ item, never open/route to a msg_ item")
})
It("does not re-open the reasoning item on subsequent reasoning deltas", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
_ = router.route("", usageWithChatDeltas("", "first "))
routing := router.route("", usageWithChatDeltas("", "second"))
Expect(routing.ReasoningDelta).To(Equal("second"))
Expect(routing.OpenReasoningItem).To(BeFalse())
})
})
Context("pure content stream", func() {
It("never opens a reasoning item", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
// Content-only with no reasoning_content: the autoparser is in its
// pure-content mode, so the router stays on the Go-side extractor,
// which sees the content via the raw token.
routing := router.route("hello world", usageWithChatDeltas("hello world", ""))
Expect(routing.ContentDelta).To(Equal("hello world"))
Expect(routing.OpenMessageItem).To(BeTrue())
Expect(routing.OpenReasoningItem).To(BeFalse(),
"a content-only stream must never open a reasoning item")
Expect(router.ReasoningStreamed()).To(BeFalse())
})
})
Context("content-only autoparser with embedded <think> (issue #9985 fallback)", func() {
It("falls back to Go-side extraction instead of leaking <think> into content", func() {
extractor := reason.NewReasoningExtractor("", reason.Config{})
router := newStreamReasoningRouter(extractor)
// The autoparser is in its non-jinja pure-content fallback: it
// surfaces the whole string as Content with zero reasoning_content,
// tags and all. The router must NOT trust it (preferAutoparser must
// stay false) and instead use the Go-side split.
routing := router.route("<think>reasoning here</think>answer",
usageWithChatDeltas("<think>reasoning here</think>answer", ""))
Expect(routing.ContentDelta).To(Equal("answer"),
"content must be the cleaned answer, not the raw <think>...</think> string")
Expect(routing.ReasoningDelta).To(Equal("reasoning here"))
Expect(routing.OpenReasoningItem).To(BeTrue())
})
})
})

View File

File diff suppressed because it is too large Load Diff

2
go.mod
View File

@@ -36,7 +36,7 @@ require (
github.com/mholt/archiver/v3 v3.5.1
github.com/microcosm-cc/bluemonday v1.0.27
github.com/modelcontextprotocol/go-sdk v1.5.0
github.com/mudler/cogito v0.10.1-0.20260609212329-bf4010d31047
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b
github.com/mudler/edgevpn v0.34.0
github.com/mudler/go-processmanager v0.1.1
github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8

4
go.sum
View File

@@ -968,8 +968,8 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mudler/LocalAGI v0.0.0-20260606071251-14aed1ae4336 h1:iKBkSnpisOvMVxFoYsAObvAuOqXBakRPMD0PWxWG5EE=
github.com/mudler/LocalAGI v0.0.0-20260606071251-14aed1ae4336/go.mod h1:U+g6u8mF2wQxhkdBl3dr8G4db1cv3n7KTKmraoJ7D0c=
github.com/mudler/cogito v0.10.1-0.20260609212329-bf4010d31047 h1:wJ8WbDah1YcpBNRDmovQro8JiR228YFk7TUqPCS4m04=
github.com/mudler/cogito v0.10.1-0.20260609212329-bf4010d31047/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4=
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU=
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4=
github.com/mudler/edgevpn v0.34.0 h1:qDrD/rCPFY/FdURbXudIZWihVKY4VOX3nMn3CcbeQEU=
github.com/mudler/edgevpn v0.34.0/go.mod h1:yki7uMi5LR9gSMrw8PdPieuxsrk8BLV2Ui7VBEmbbIA=
github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX28EkpdAyL4yQVnJMrbquKospjHyA=