Compare commits

...

4 Commits

Author SHA1 Message Date
LocalAI [bot]
0b2ae3c6ca fix(openai): stream usage non-zero when tools are enabled (#9941)
* chore: ignore local .worktrees directory

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(openai): stream usage non-zero when tools are enabled

The streaming chat-completions worker for tool-bearing requests
(processTools in core/http/endpoints/openai/chat.go) never forwarded the
cumulative TokenUsage from ComputeChoices to the chunks it placed on the
responses channel. The outer streaming loop's running usage tracker
therefore stayed at the zero value, and the include_usage trailer
reported {prompt_tokens:0, completion_tokens:0, total_tokens:0} whenever
the request carried a `tools` array. Without tools, the alternative
`process` path stamps Usage on every chunk, so that path was unaffected.

Forward the final TokenUsage via a usage-only sentinel chunk (empty
Choices, populated Usage) emitted right before close(responses). The
outer loop's per-chunk Usage capture moves above the empty-Choices skip
so the sentinel updates the tracker without ever reaching the wire,
keeping the existing OpenAI spec contract (intermediate chunks carry no
`usage` field, and the deferred-final-chunk helpers remain Usage-free
per the regression test for issue #8546).

Adds streamUsageFromTokenUsage, usageSentinelChunk, and
applyChunkToUsage helpers with focused Ginkgo coverage plus a flow-level
test that mirrors the outer-loop sequence.

Fixes #9927

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:opus-4-7 [Claude Code]

* refactor(openai): return final TokenUsage from stream workers

Replace the usage-only sentinel SSE chunk introduced in the previous
commit with a plain return value. The streaming workers process and
processTools (now extracted as package-level processStream and
processStreamWithTools) return (backend.TokenUsage, error); the outer
ChatEndpoint loop reads the cumulative counts off the existing `ended`
channel (now carrying streamWorkerResult{usage, err}) and builds the
include_usage trailer from a normal Go value after the LOOP exits.

This drops the empty-Choices "skip but capture Usage" rule from the
outer loop and removes the usageSentinelChunk / applyChunkToUsage
helpers entirely. The SSE responses channel is back to a single
purpose: wire chunks only.

processStream and processStreamWithTools move into chat_stream_workers.go
so they can be exercised directly from tests. The chat_stream_usage_test.go
suite now drives the workers with a mocked backend.ModelInferenceFunc
and asserts on the returned TokenUsage. The regression coverage for
issue #9927 is therefore behavioral: reverting the fix (discarding
ComputeChoices' usage return) makes the assertions fail with concrete
count mismatches.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:opus-4-7 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 10:13:41 +02:00
LocalAI [bot]
4735345105 chore: ⬆️ Update ggml-org/llama.cpp to bb28c1fe246b72276ee1d00ce89306be7b865766 (#9934)
⬆️ Update ggml-org/llama.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-22 09:49:33 +02:00
LocalAI [bot]
7384fd800b chore: ⬆️ Update antirez/ds4 to 8d576642c39b9a2d782a80159ba84ef5a81c0b81 (#9932)
⬆️ Update antirez/ds4

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-22 08:31:49 +02:00
LocalAI [bot]
6942713d85 chore: ⬆️ Update leejet/stable-diffusion.cpp to 3a8788cb7d74f185d6b18688e9563015524ecaf5 (#9933)
⬆️ Update leejet/stable-diffusion.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-22 00:31:19 +02:00
8 changed files with 634 additions and 384 deletions

3
.gitignore vendored
View File

@@ -77,3 +77,6 @@ local-backends/
tests/e2e-ui/ui-test-server
core/http/react-ui/playwright-report/
core/http/react-ui/test-results/
# Local worktrees
.worktrees/

View File

@@ -1,10 +1,10 @@
# ds4 backend Makefile.
#
# Upstream pin lives below as DS4_VERSION?=2606543be7a8c125a32cee37f5d1d85dc78f2fcf
# Upstream pin lives below as DS4_VERSION?=8d576642c39b9a2d782a80159ba84ef5a81c0b81
# (.github/bump_deps.sh) can find and update it - matches the
# llama-cpp / ik-llama-cpp / turboquant convention.
DS4_VERSION?=2606543be7a8c125a32cee37f5d1d85dc78f2fcf
DS4_VERSION?=8d576642c39b9a2d782a80159ba84ef5a81c0b81
DS4_REPO?=https://github.com/antirez/ds4
CURRENT_MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=ad277572619fcfb6ddd38f4c6437283a4b2b8636
LLAMA_VERSION?=bb28c1fe246b72276ee1d00ce89306be7b865766
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=5b0267e941cade15bd80089d89838795d9f4baa6
STABLEDIFFUSION_GGML_VERSION?=3a8788cb7d74f185d6b18688e9563015524ecaf5
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -73,363 +73,6 @@ func mergeToolCallDeltas(existing []schema.ToolCall, deltas []schema.ToolCall) [
// @Success 200 {object} schema.OpenAIResponse "Response"
// @Router /v1/chat/completions [post]
func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator *templates.Evaluator, startupOptions *config.ApplicationConfig, natsClient mcpTools.MCPNATSClient, assistantHolder *mcpTools.LocalAIAssistantHolder) echo.HandlerFunc {
process := func(s string, req *schema.OpenAIRequest, config *config.ModelConfig, loader *model.ModelLoader, responses chan schema.OpenAIResponse, extraUsage bool, id string, created int) error {
initialMessage := schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0, FinishReason: nil}},
Object: "chat.completion.chunk",
}
responses <- initialMessage
// Detect if thinking token is already in prompt or template
// When UseTokenizerTemplate is enabled, predInput is empty, so we check the template
var template string
if config.TemplateConfig.UseTokenizerTemplate {
template = config.GetModelTemplate()
} else {
template = s
}
thinkingStartToken := reason.DetectThinkingStartToken(template, &config.ReasoningConfig)
extractor := reason.NewReasoningExtractor(thinkingStartToken, config.ReasoningConfig)
_, _, _, err := ComputeChoices(req, s, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, tokenUsage backend.TokenUsage) bool {
var reasoningDelta, contentDelta string
// Always keep the Go-side extractor in sync with raw tokens so it
// can serve as fallback for backends without an autoparser (e.g. vLLM).
goReasoning, goContent := extractor.ProcessToken(s)
// When C++ autoparser chat deltas are available, prefer them — they
// handle model-specific formats (Gemma 4, etc.) without Go-side tags.
// Otherwise fall back to Go-side extraction.
if tokenUsage.HasChatDeltaContent() {
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
usage := schema.OpenAIUsage{
PromptTokens: tokenUsage.Prompt,
CompletionTokens: tokenUsage.Completion,
TotalTokens: tokenUsage.Prompt + tokenUsage.Completion,
}
if extraUsage {
usage.TimingTokenGeneration = tokenUsage.TimingTokenGeneration
usage.TimingPromptProcessing = tokenUsage.TimingPromptProcessing
}
delta := &schema.Message{}
if contentDelta != "" {
delta.Content = &contentDelta
}
if reasoningDelta != "" {
delta.Reasoning = &reasoningDelta
}
// Usage rides as a struct field for the consumer to track the
// running cumulative — it is stripped before JSON marshal so the
// wire chunk stays spec-compliant (no `usage` on intermediate
// chunks). The dedicated trailer chunk (when include_usage=true)
// carries the final totals.
usageForChunk := usage
resp := schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
Choices: []schema.Choice{{Delta: delta, Index: 0, FinishReason: nil}},
Object: "chat.completion.chunk",
Usage: &usageForChunk,
}
responses <- resp
return true
})
close(responses)
return err
}
processTools := func(noAction string, prompt string, req *schema.OpenAIRequest, config *config.ModelConfig, loader *model.ModelLoader, responses chan schema.OpenAIResponse, extraUsage bool, id string, created int, textContentToReturn *string) error {
// Detect if thinking token is already in prompt or template
var template string
if config.TemplateConfig.UseTokenizerTemplate {
template = config.GetModelTemplate()
} else {
template = prompt
}
thinkingStartToken := reason.DetectThinkingStartToken(template, &config.ReasoningConfig)
extractor := reason.NewReasoningExtractor(thinkingStartToken, config.ReasoningConfig)
result := ""
lastEmittedCount := 0
sentInitialRole := false
sentReasoning := false
hasChatDeltaToolCalls := false
hasChatDeltaContent := false
_, _, chatDeltas, err := ComputeChoices(req, prompt, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool {
result += s
// Track whether ChatDeltas from the C++ autoparser contain
// tool calls or content, so the retry decision can account for them.
for _, d := range usage.ChatDeltas {
if len(d.ToolCalls) > 0 {
hasChatDeltaToolCalls = true
}
if d.Content != "" {
hasChatDeltaContent = true
}
}
var reasoningDelta, contentDelta string
goReasoning, goContent := extractor.ProcessToken(s)
if usage.HasChatDeltaContent() {
rawReasoning, cd := usage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
// Emit reasoning deltas in their own SSE chunks before any tool-call chunks
// (OpenAI spec: reasoning and tool_calls never share a delta)
if reasoningDelta != "" {
responses <- schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{Reasoning: &reasoningDelta},
Index: 0,
}},
Object: "chat.completion.chunk",
}
sentReasoning = true
}
// Stream content deltas (cleaned of reasoning tags) while no tool calls
// have been detected. Once the incremental parser finds tool calls,
// content stops — per OpenAI spec, content and tool_calls don't mix.
if lastEmittedCount == 0 && contentDelta != "" {
if !sentInitialRole {
responses <- schema.OpenAIResponse{
ID: id, Created: created, Model: req.Model,
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}},
Object: "chat.completion.chunk",
}
sentInitialRole = true
}
responses <- schema.OpenAIResponse{
ID: id, Created: created, Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{Content: &contentDelta},
Index: 0,
}},
Object: "chat.completion.chunk",
}
}
// Try incremental XML parsing for streaming support using iterative parser
// This allows emitting partial tool calls as they're being generated
cleanedResult := functions.CleanupLLMResult(result, config.FunctionsConfig)
// Determine XML format from config
var xmlFormat *functions.XMLToolCallFormat
if config.FunctionsConfig.XMLFormat != nil {
xmlFormat = config.FunctionsConfig.XMLFormat
} else if config.FunctionsConfig.XMLFormatPreset != "" {
xmlFormat = functions.GetXMLFormatPreset(config.FunctionsConfig.XMLFormatPreset)
}
// Use iterative parser for streaming (partial parsing enabled)
// Try XML parsing first
partialResults, parseErr := functions.ParseXMLIterative(cleanedResult, xmlFormat, true)
if parseErr == nil && len(partialResults) > 0 {
// Emit new XML tool calls that weren't emitted before
if len(partialResults) > lastEmittedCount {
for i := lastEmittedCount; i < len(partialResults); i++ {
toolCall := partialResults[i]
initialMessage := schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{
Role: "assistant",
ToolCalls: []schema.ToolCall{
{
Index: i,
ID: id,
Type: "function",
FunctionCall: schema.FunctionCall{
Name: toolCall.Name,
},
},
},
},
Index: 0,
FinishReason: nil,
}},
Object: "chat.completion.chunk",
}
select {
case responses <- initialMessage:
default:
}
}
lastEmittedCount = len(partialResults)
}
} else {
// Try JSON tool call parsing for streaming.
// Only emit NEW tool calls (same guard as XML parser above).
jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true)
if jsonErr == nil && len(jsonResults) > lastEmittedCount {
for i := lastEmittedCount; i < len(jsonResults); i++ {
jsonObj := jsonResults[i]
name, ok := jsonObj["name"].(string)
if !ok || name == "" {
continue
}
args := "{}"
if argsVal, ok := jsonObj["arguments"]; ok {
if argsStr, ok := argsVal.(string); ok {
args = argsStr
} else {
argsBytes, _ := json.Marshal(argsVal)
args = string(argsBytes)
}
}
initialMessage := schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{
Role: "assistant",
ToolCalls: []schema.ToolCall{
{
Index: i,
ID: id,
Type: "function",
FunctionCall: schema.FunctionCall{
Name: name,
Arguments: args,
},
},
},
},
Index: 0,
FinishReason: nil,
}},
Object: "chat.completion.chunk",
}
responses <- initialMessage
}
lastEmittedCount = len(jsonResults)
}
}
return true
},
func(attempt int) bool {
// After streaming completes: check if we got actionable content
cleaned := extractor.CleanedContent()
// Check for tool calls from chat deltas (will be re-checked after ComputeChoices,
// but we need to know here whether to retry).
// Also check ChatDelta flags — when the C++ autoparser is active,
// tool calls and content are delivered via ChatDeltas while the
// raw message is cleared. Without this check, we'd retry
// unnecessarily, losing valid results and concatenating output.
hasToolCalls := lastEmittedCount > 0 || hasChatDeltaToolCalls
hasContent := cleaned != "" || hasChatDeltaContent
if !hasContent && !hasToolCalls {
xlog.Warn("Streaming: backend produced only reasoning, retrying",
"reasoning_len", len(extractor.Reasoning()), "attempt", attempt+1)
extractor.ResetAndSuppressReasoning()
result = ""
lastEmittedCount = 0
sentInitialRole = false
hasChatDeltaToolCalls = false
hasChatDeltaContent = false
return true
}
return false
},
)
if err != nil {
return err
}
// Try using pre-parsed tool calls from C++ autoparser (chat deltas)
var functionResults []functions.FuncCallResults
var reasoning string
if deltaToolCalls := functions.ToolCallsFromChatDeltas(chatDeltas); len(deltaToolCalls) > 0 {
xlog.Debug("[ChatDeltas] Using pre-parsed tool calls from C++ autoparser", "count", len(deltaToolCalls))
functionResults = deltaToolCalls
// Use content/reasoning from deltas too
*textContentToReturn = functions.ContentFromChatDeltas(chatDeltas)
reasoning = functions.ReasoningFromChatDeltas(chatDeltas)
} else {
// Fallback: parse tool calls from raw text (no chat deltas from backend)
xlog.Debug("[ChatDeltas] no pre-parsed tool calls, falling back to Go-side text parsing")
reasoning = extractor.Reasoning()
cleanedResult := extractor.CleanedContent()
*textContentToReturn = functions.ParseTextContent(cleanedResult, config.FunctionsConfig)
cleanedResult = functions.CleanupLLMResult(cleanedResult, config.FunctionsConfig)
functionResults = functions.ParseFunctionCall(cleanedResult, config.FunctionsConfig)
}
xlog.Debug("[ChatDeltas] final tool call decision", "tool_calls", len(functionResults), "text_content", *textContentToReturn)
// noAction is a sentinel "just answer" pseudo-function — not a real
// tool call. Scan the whole slice rather than only index 0 so we
// don't drop a real tool call that happens to follow a noAction
// entry, and so the default branch isn't entered with only noAction
// entries to emit as tool_calls.
noActionToRun := !hasRealCall(functionResults, noAction)
switch {
case noActionToRun:
// Token-cumulative usage is communicated to the streaming
// consumer via the per-token callback's chunk struct (stripped
// before wire marshal). The final usage trailer — when the
// caller opted in with stream_options.include_usage — is built
// by the outer streaming loop, not here.
var result string
if !sentInitialRole {
var hqErr error
result, hqErr = handleQuestion(config, functionResults, extractor.CleanedContent(), prompt)
if hqErr != nil {
xlog.Error("error handling question", "error", hqErr)
return hqErr
}
}
for _, chunk := range buildNoActionFinalChunks(
id, req.Model, created,
sentInitialRole, sentReasoning,
result, reasoning,
) {
responses <- chunk
}
default:
for _, chunk := range buildDeferredToolCallChunks(
id, req.Model, created,
functionResults, lastEmittedCount,
sentInitialRole, *textContentToReturn,
sentReasoning, reasoning,
) {
responses <- chunk
}
}
close(responses)
return err
}
return func(c echo.Context) error {
var textContentToReturn string
id := uuid.New().String()
@@ -697,17 +340,19 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
}
responses := make(chan schema.OpenAIResponse)
ended := make(chan error, 1)
ended := make(chan streamWorkerResult, 1)
go func() {
if !shouldUseFn {
ended <- process(predInput, input, config, ml, responses, extraUsage, id, created)
u, err := processStream(predInput, input, config, cl, startupOptions, ml, responses, id, created)
ended <- streamWorkerResult{usage: u, err: err}
} else {
ended <- processTools(noActionName, predInput, input, config, ml, responses, extraUsage, id, created, &textContentToReturn)
u, err := processStreamWithTools(noActionName, predInput, input, config, cl, startupOptions, ml, responses, id, created, &textContentToReturn)
ended <- streamWorkerResult{usage: u, err: err}
}
}()
usage := &schema.OpenAIUsage{}
var finalUsage backend.TokenUsage
toolsCalled := false
var collectedToolCalls []schema.ToolCall
var collectedContent string
@@ -725,13 +370,6 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
xlog.Debug("No choices in the response, skipping")
continue
}
// Capture the running cumulative usage from this chunk
// (when present) so the include_usage trailer can carry
// the final totals. Usage is stripped before marshal
// below so the wire chunk stays spec-compliant.
if ev.Usage != nil {
usage = ev.Usage
}
if len(ev.Choices[0].Delta.ToolCalls) > 0 {
toolsCalled = true
// Collect and merge tool call deltas for MCP execution
@@ -747,11 +385,6 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
collectedContent += *sp
}
}
// OpenAI streaming spec: intermediate chunks must NOT
// carry a `usage` field. Strip the tracking copy
// before marshalling — usage is delivered via the
// dedicated trailer chunk when include_usage=true.
ev.Usage = nil
respData, err := json.Marshal(ev)
if err != nil {
xlog.Debug("Failed to marshal response", "error", err)
@@ -766,15 +399,16 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
return err
}
c.Response().Flush()
case err := <-ended:
if err == nil {
case res := <-ended:
if res.err == nil {
finalUsage = res.usage
break LOOP
}
xlog.Error("Stream ended with error", "error", err)
xlog.Error("Stream ended with error", "error", res.err)
errorResp := schema.ErrorResponse{
Error: &schema.APIError{
Message: err.Error(),
Message: res.err.Error(),
Type: "server_error",
Code: "server_error",
},
@@ -797,7 +431,10 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
// still trying to send (e.g., after client disconnect). The goroutine
// calls close(responses) when done, which terminates the drain.
if input.Context.Err() != nil {
go func() { for range responses {} }()
go func() {
for range responses {
}
}()
<-ended
}
@@ -921,8 +558,16 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
// Trailing usage chunk per OpenAI spec: emit only when the
// caller opted in via stream_options.include_usage. Shape:
// {"choices":[],"usage":{...},"object":"chat.completion.chunk",...}
if input.StreamOptions != nil && input.StreamOptions.IncludeUsage && usage != nil {
trailer := streamUsageTrailerJSON(id, input.Model, created, *usage)
//
// finalUsage is the authoritative TokenUsage returned by the
// worker function (process / processTools) via the `ended`
// channel. The worker reads it from ComputeChoices' return
// value, which is the cumulative count produced by the backend
// over the whole prediction. Issue #9927 was caused by the
// tools-path worker not surfacing this value at all.
if input.StreamOptions != nil && input.StreamOptions.IncludeUsage {
trailerUsage := streamUsageFromTokenUsage(finalUsage, extraUsage)
trailer := streamUsageTrailerJSON(id, input.Model, created, trailerUsage)
_, _ = fmt.Fprintf(c.Response().Writer, "data: %s\n\n", trailer)
}

View File

@@ -4,10 +4,39 @@ import (
"encoding/json"
"fmt"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/functions"
)
// streamWorkerResult is what the streaming workers (process / processTools)
// hand back to the outer ChatEndpoint loop through the `ended` channel.
// Threading the final TokenUsage here, instead of piggy-backing it on the
// `responses` SSE channel, keeps the SSE channel single-purpose (wire chunks)
// and gives the trailer emitter a plain Go value to read after LOOP exits.
// Fix for issue #9927: the previous tools-path worker never surfaced the
// cumulative token counts at all, so the include_usage trailer reported zeros.
type streamWorkerResult struct {
usage backend.TokenUsage
err error
}
// streamUsageFromTokenUsage converts the backend's cumulative TokenUsage into
// the OpenAI-spec OpenAIUsage shape used on the wire. `extraUsage` controls
// whether the non-standard timing fields are forwarded.
func streamUsageFromTokenUsage(usage backend.TokenUsage, extraUsage bool) schema.OpenAIUsage {
out := schema.OpenAIUsage{
PromptTokens: usage.Prompt,
CompletionTokens: usage.Completion,
TotalTokens: usage.Prompt + usage.Completion,
}
if extraUsage {
out.TimingTokenGeneration = usage.TimingTokenGeneration
out.TimingPromptProcessing = usage.TimingPromptProcessing
}
return out
}
// streamUsageTrailerJSON returns the bytes of the OpenAI-spec trailing usage
// chunk emitted in streaming completions when the request opts in via
// `stream_options.include_usage: true`. The shape is:

View File

@@ -1,10 +1,14 @@
package openai
import (
"context"
"encoding/json"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/functions"
"github.com/mudler/LocalAI/pkg/model"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
@@ -152,6 +156,28 @@ var _ = Describe("streaming usage spec compliance", func() {
})
})
Describe("streamUsageFromTokenUsage", func() {
It("converts backend TokenUsage to schema OpenAIUsage", func() {
tu := backend.TokenUsage{Prompt: 18, Completion: 213}
u := streamUsageFromTokenUsage(tu, false)
Expect(u.PromptTokens).To(Equal(18))
Expect(u.CompletionTokens).To(Equal(213))
Expect(u.TotalTokens).To(Equal(231))
Expect(u.TimingTokenGeneration).To(BeZero())
Expect(u.TimingPromptProcessing).To(BeZero())
})
It("includes timings when extraUsage is true", func() {
tu := backend.TokenUsage{
Prompt: 10, Completion: 20,
TimingPromptProcessing: 0.5,
TimingTokenGeneration: 1.5,
}
u := streamUsageFromTokenUsage(tu, true)
Expect(u.TimingPromptProcessing).To(Equal(0.5))
Expect(u.TimingTokenGeneration).To(Equal(1.5))
})
})
Describe("OpenAIRequest.StreamOptions", func() {
It("parses stream_options.include_usage=true", func() {
body := []byte(`{
@@ -177,3 +203,160 @@ var _ = Describe("streaming usage spec compliance", func() {
})
})
})
// Functional regression coverage for issue #9927: the streaming workers
// must surface the cumulative TokenUsage returned by ComputeChoices to
// their caller. The earlier broken implementations discarded that value
// (`_, _, chatDeltas, err := ComputeChoices(...)`) and threw away the
// counts on the floor, so the include_usage trailer always reported
// zeros when tools were enabled.
//
// These tests stub backend.ModelInferenceFunc so the worker exercises the
// real ComputeChoices → predFunc → LLMResponse pipeline. If a future change
// drops the TokenUsage somewhere along that path, the assertions on the
// returned value fail with a concrete count mismatch (e.g. 0 vs 213),
// not with a "function undefined" compile error.
var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", func() {
var (
origInference modelInferenceFunc
appCfg *config.ApplicationConfig
)
BeforeEach(func() {
origInference = backend.ModelInferenceFunc
appCfg = config.NewApplicationConfig()
})
AfterEach(func() {
backend.ModelInferenceFunc = origInference
})
// mockBackendUsage installs a stub backend that yields one LLMResponse
// carrying the supplied TokenUsage. ComputeChoices' single-attempt path
// copies these counts into the value it returns to the worker.
mockBackendUsage := func(usage backend.TokenUsage, response string) {
backend.ModelInferenceFunc = func(
ctx context.Context, s string, messages schema.Messages,
images, videos, audios []string,
loader *model.ModelLoader, c *config.ModelConfig, cl *config.ModelConfigLoader,
o *config.ApplicationConfig,
tokenCallback func(string, backend.TokenUsage) bool,
tools, toolChoice string,
logprobs, topLogprobs *int,
logitBias map[string]float64,
metadata map[string]string,
) (func() (backend.LLMResponse, error), error) {
return func() (backend.LLMResponse, error) {
return backend.LLMResponse{
Response: response,
Usage: usage,
}, nil
}, nil
}
}
makeReq := func() *schema.OpenAIRequest {
ctx, cancel := context.WithCancel(context.Background())
req := &schema.OpenAIRequest{
Context: ctx,
Cancel: cancel,
}
req.Model = "test-model" // promoted from BasicModelRequest
return req
}
// drainResponses consumes everything the worker pushes onto the channel
// so the worker is never blocked on its send. The channel is unbuffered
// (matching production), so the drain goroutine must be running before
// the worker is called.
drainResponses := func(ch <-chan schema.OpenAIResponse) <-chan struct{} {
done := make(chan struct{})
go func() {
for range ch {
}
close(done)
}()
return done
}
Describe("processStream (no-tools path)", func() {
It("returns the cumulative TokenUsage produced by the backend", func() {
mockBackendUsage(backend.TokenUsage{Prompt: 18, Completion: 213}, "Hello there")
req := makeReq()
cfg := &config.ModelConfig{}
responses := make(chan schema.OpenAIResponse)
done := drainResponses(responses)
actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0)
<-done
Expect(err).ToNot(HaveOccurred())
Expect(actual.Prompt).To(Equal(18),
"prompt tokens must round-trip from backend through processStream")
Expect(actual.Completion).To(Equal(213),
"completion tokens must round-trip from backend through processStream")
})
It("returns zero TokenUsage when the backend reports zero (negative control)", func() {
mockBackendUsage(backend.TokenUsage{}, "x")
req := makeReq()
cfg := &config.ModelConfig{}
responses := make(chan schema.OpenAIResponse)
done := drainResponses(responses)
actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0)
<-done
Expect(err).ToNot(HaveOccurred())
Expect(actual.Prompt).To(BeZero())
Expect(actual.Completion).To(BeZero())
})
})
Describe("processStreamWithTools (tools path)", func() {
It("returns the cumulative TokenUsage produced by the backend", func() {
// This is the direct regression check for issue #9927: with tools
// enabled, the trailer was reporting {0,0,0} because the worker
// discarded ComputeChoices' second return value.
mockBackendUsage(backend.TokenUsage{Prompt: 18, Completion: 213}, "answer")
req := makeReq()
cfg := &config.ModelConfig{}
responses := make(chan schema.OpenAIResponse)
done := drainResponses(responses)
var textContent string
actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent)
<-done
Expect(err).ToNot(HaveOccurred())
Expect(actual.Prompt).To(Equal(18),
"prompt tokens must round-trip from backend through processStreamWithTools (issue #9927)")
Expect(actual.Completion).To(Equal(213),
"completion tokens must round-trip from backend through processStreamWithTools (issue #9927)")
})
It("forwards timing fields when the backend supplies them", func() {
mockBackendUsage(backend.TokenUsage{
Prompt: 10, Completion: 20,
TimingPromptProcessing: 0.5,
TimingTokenGeneration: 1.5,
}, "answer")
req := makeReq()
cfg := &config.ModelConfig{}
responses := make(chan schema.OpenAIResponse)
done := drainResponses(responses)
var textContent string
actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent)
<-done
Expect(err).ToNot(HaveOccurred())
Expect(actual.TimingPromptProcessing).To(Equal(0.5))
Expect(actual.TimingTokenGeneration).To(Equal(1.5))
})
})
})

View File

@@ -0,0 +1,390 @@
package openai
import (
"encoding/json"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/functions"
"github.com/mudler/LocalAI/pkg/model"
reason "github.com/mudler/LocalAI/pkg/reasoning"
"github.com/mudler/xlog"
)
// processStream is the streaming worker for chat completions with no
// tool/function calling involved. It pushes SSE-shaped chunks onto
// `responses` and returns the authoritative cumulative TokenUsage from
// the prediction so the caller can populate the include_usage trailer
// without having to peek inside the chunks.
//
// The caller owns the `responses` channel and is expected to read from
// it while this function runs; processStream closes the channel before
// returning.
func processStream(
s string,
req *schema.OpenAIRequest,
cfg *config.ModelConfig,
cl *config.ModelConfigLoader,
startupOptions *config.ApplicationConfig,
loader *model.ModelLoader,
responses chan schema.OpenAIResponse,
id string,
created int,
) (backend.TokenUsage, error) {
responses <- schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0, FinishReason: nil}},
Object: "chat.completion.chunk",
}
// Detect if thinking token is already in prompt or template
// When UseTokenizerTemplate is enabled, predInput is empty, so we check the template
var template string
if cfg.TemplateConfig.UseTokenizerTemplate {
template = cfg.GetModelTemplate()
} else {
template = s
}
thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig)
extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig)
_, finalUsage, _, err := ComputeChoices(req, s, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, tokenUsage backend.TokenUsage) bool {
var reasoningDelta, contentDelta string
// Always keep the Go-side extractor in sync with raw tokens so it
// can serve as fallback for backends without an autoparser (e.g. vLLM).
goReasoning, goContent := extractor.ProcessToken(s)
// When C++ autoparser chat deltas are available, prefer them: they
// handle model-specific formats (Gemma 4, etc.) without Go-side tags.
// Otherwise fall back to Go-side extraction.
if tokenUsage.HasChatDeltaContent() {
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
delta := &schema.Message{}
if contentDelta != "" {
delta.Content = &contentDelta
}
if reasoningDelta != "" {
delta.Reasoning = &reasoningDelta
}
responses <- schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
Choices: []schema.Choice{{Delta: delta, Index: 0, FinishReason: nil}},
Object: "chat.completion.chunk",
}
return true
})
close(responses)
return finalUsage, err
}
// processStreamWithTools is the streaming worker for chat completions
// with tools / function calling. Same contract as processStream: pushes
// chunks onto `responses`, closes the channel, returns the cumulative
// TokenUsage.
//
// Returning the TokenUsage as a normal Go value (rather than smuggling
// it on a sentinel chunk) is the fix for issue #9927 — the previous
// implementation discarded the value from ComputeChoices, so the
// include_usage trailer reported zeros whenever `tools` was in play.
func processStreamWithTools(
noAction string,
prompt string,
req *schema.OpenAIRequest,
cfg *config.ModelConfig,
cl *config.ModelConfigLoader,
startupOptions *config.ApplicationConfig,
loader *model.ModelLoader,
responses chan schema.OpenAIResponse,
id string,
created int,
textContentToReturn *string,
) (backend.TokenUsage, error) {
// Detect if thinking token is already in prompt or template
var template string
if cfg.TemplateConfig.UseTokenizerTemplate {
template = cfg.GetModelTemplate()
} else {
template = prompt
}
thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig)
extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig)
result := ""
lastEmittedCount := 0
sentInitialRole := false
sentReasoning := false
hasChatDeltaToolCalls := false
hasChatDeltaContent := false
_, finalUsage, chatDeltas, err := ComputeChoices(req, prompt, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool {
result += s
// Track whether ChatDeltas from the C++ autoparser contain
// tool calls or content, so the retry decision can account for them.
for _, d := range usage.ChatDeltas {
if len(d.ToolCalls) > 0 {
hasChatDeltaToolCalls = true
}
if d.Content != "" {
hasChatDeltaContent = true
}
}
var reasoningDelta, contentDelta string
goReasoning, goContent := extractor.ProcessToken(s)
if usage.HasChatDeltaContent() {
rawReasoning, cd := usage.ChatDeltaReasoningAndContent()
contentDelta = cd
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
} else {
reasoningDelta = goReasoning
contentDelta = goContent
}
// Emit reasoning deltas in their own SSE chunks before any tool-call chunks
// (OpenAI spec: reasoning and tool_calls never share a delta)
if reasoningDelta != "" {
responses <- schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{Reasoning: &reasoningDelta},
Index: 0,
}},
Object: "chat.completion.chunk",
}
sentReasoning = true
}
// Stream content deltas (cleaned of reasoning tags) while no tool calls
// have been detected. Once the incremental parser finds tool calls,
// content stops: per OpenAI spec, content and tool_calls don't mix.
if lastEmittedCount == 0 && contentDelta != "" {
if !sentInitialRole {
responses <- schema.OpenAIResponse{
ID: id, Created: created, Model: req.Model,
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}},
Object: "chat.completion.chunk",
}
sentInitialRole = true
}
responses <- schema.OpenAIResponse{
ID: id, Created: created, Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{Content: &contentDelta},
Index: 0,
}},
Object: "chat.completion.chunk",
}
}
// Try incremental XML parsing for streaming support using iterative parser
// This allows emitting partial tool calls as they're being generated
cleanedResult := functions.CleanupLLMResult(result, cfg.FunctionsConfig)
// Determine XML format from config
var xmlFormat *functions.XMLToolCallFormat
if cfg.FunctionsConfig.XMLFormat != nil {
xmlFormat = cfg.FunctionsConfig.XMLFormat
} else if cfg.FunctionsConfig.XMLFormatPreset != "" {
xmlFormat = functions.GetXMLFormatPreset(cfg.FunctionsConfig.XMLFormatPreset)
}
// Use iterative parser for streaming (partial parsing enabled)
// Try XML parsing first
partialResults, parseErr := functions.ParseXMLIterative(cleanedResult, xmlFormat, true)
if parseErr == nil && len(partialResults) > 0 {
// Emit new XML tool calls that weren't emitted before
if len(partialResults) > lastEmittedCount {
for i := lastEmittedCount; i < len(partialResults); i++ {
toolCall := partialResults[i]
initialMessage := schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{
Role: "assistant",
ToolCalls: []schema.ToolCall{
{
Index: i,
ID: id,
Type: "function",
FunctionCall: schema.FunctionCall{
Name: toolCall.Name,
},
},
},
},
Index: 0,
FinishReason: nil,
}},
Object: "chat.completion.chunk",
}
select {
case responses <- initialMessage:
default:
}
}
lastEmittedCount = len(partialResults)
}
} else {
// Try JSON tool call parsing for streaming.
// Only emit NEW tool calls (same guard as XML parser above).
jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true)
if jsonErr == nil && len(jsonResults) > lastEmittedCount {
for i := lastEmittedCount; i < len(jsonResults); i++ {
jsonObj := jsonResults[i]
name, ok := jsonObj["name"].(string)
if !ok || name == "" {
continue
}
args := "{}"
if argsVal, ok := jsonObj["arguments"]; ok {
if argsStr, ok := argsVal.(string); ok {
args = argsStr
} else {
argsBytes, _ := json.Marshal(argsVal)
args = string(argsBytes)
}
}
initialMessage := schema.OpenAIResponse{
ID: id,
Created: created,
Model: req.Model,
Choices: []schema.Choice{{
Delta: &schema.Message{
Role: "assistant",
ToolCalls: []schema.ToolCall{
{
Index: i,
ID: id,
Type: "function",
FunctionCall: schema.FunctionCall{
Name: name,
Arguments: args,
},
},
},
},
Index: 0,
FinishReason: nil,
}},
Object: "chat.completion.chunk",
}
responses <- initialMessage
}
lastEmittedCount = len(jsonResults)
}
}
return true
},
func(attempt int) bool {
// After streaming completes: check if we got actionable content
cleaned := extractor.CleanedContent()
// Check for tool calls from chat deltas (will be re-checked after ComputeChoices,
// but we need to know here whether to retry).
// Also check ChatDelta flags: when the C++ autoparser is active,
// tool calls and content are delivered via ChatDeltas while the
// raw message is cleared. Without this check, we'd retry
// unnecessarily, losing valid results and concatenating output.
hasToolCalls := lastEmittedCount > 0 || hasChatDeltaToolCalls
hasContent := cleaned != "" || hasChatDeltaContent
if !hasContent && !hasToolCalls {
xlog.Warn("Streaming: backend produced only reasoning, retrying",
"reasoning_len", len(extractor.Reasoning()), "attempt", attempt+1)
extractor.ResetAndSuppressReasoning()
result = ""
lastEmittedCount = 0
sentInitialRole = false
hasChatDeltaToolCalls = false
hasChatDeltaContent = false
return true
}
return false
},
)
if err != nil {
return finalUsage, err
}
// Try using pre-parsed tool calls from C++ autoparser (chat deltas)
var functionResults []functions.FuncCallResults
var reasoning string
if deltaToolCalls := functions.ToolCallsFromChatDeltas(chatDeltas); len(deltaToolCalls) > 0 {
xlog.Debug("[ChatDeltas] Using pre-parsed tool calls from C++ autoparser", "count", len(deltaToolCalls))
functionResults = deltaToolCalls
// Use content/reasoning from deltas too
*textContentToReturn = functions.ContentFromChatDeltas(chatDeltas)
reasoning = functions.ReasoningFromChatDeltas(chatDeltas)
} else {
// Fallback: parse tool calls from raw text (no chat deltas from backend)
xlog.Debug("[ChatDeltas] no pre-parsed tool calls, falling back to Go-side text parsing")
reasoning = extractor.Reasoning()
cleanedResult := extractor.CleanedContent()
*textContentToReturn = functions.ParseTextContent(cleanedResult, cfg.FunctionsConfig)
cleanedResult = functions.CleanupLLMResult(cleanedResult, cfg.FunctionsConfig)
functionResults = functions.ParseFunctionCall(cleanedResult, cfg.FunctionsConfig)
}
xlog.Debug("[ChatDeltas] final tool call decision", "tool_calls", len(functionResults), "text_content", *textContentToReturn)
// noAction is a sentinel "just answer" pseudo-function: not a real
// tool call. Scan the whole slice rather than only index 0 so we
// don't drop a real tool call that happens to follow a noAction
// entry, and so the default branch isn't entered with only noAction
// entries to emit as tool_calls.
noActionToRun := !hasRealCall(functionResults, noAction)
switch {
case noActionToRun:
// The final usage trailer (when the caller opted in with
// stream_options.include_usage) is built by the outer streaming
// loop from the TokenUsage this function returns, not from any
// chunk on the responses channel.
var result string
if !sentInitialRole {
var hqErr error
result, hqErr = handleQuestion(cfg, functionResults, extractor.CleanedContent(), prompt)
if hqErr != nil {
xlog.Error("error handling question", "error", hqErr)
return finalUsage, hqErr
}
}
for _, chunk := range buildNoActionFinalChunks(
id, req.Model, created,
sentInitialRole, sentReasoning,
result, reasoning,
) {
responses <- chunk
}
default:
for _, chunk := range buildDeferredToolCallChunks(
id, req.Model, created,
functionResults, lastEmittedCount,
sentInitialRole, *textContentToReturn,
sentReasoning, reasoning,
) {
responses <- chunk
}
}
close(responses)
return finalUsage, err
}