diff --git a/.gitignore b/.gitignore index 25252eada..08873e8b2 100644 --- a/.gitignore +++ b/.gitignore @@ -77,3 +77,6 @@ local-backends/ tests/e2e-ui/ui-test-server core/http/react-ui/playwright-report/ core/http/react-ui/test-results/ + +# Local worktrees +.worktrees/ diff --git a/core/http/endpoints/openai/chat.go b/core/http/endpoints/openai/chat.go index 0951a88cc..db716e4b7 100644 --- a/core/http/endpoints/openai/chat.go +++ b/core/http/endpoints/openai/chat.go @@ -73,363 +73,6 @@ func mergeToolCallDeltas(existing []schema.ToolCall, deltas []schema.ToolCall) [ // @Success 200 {object} schema.OpenAIResponse "Response" // @Router /v1/chat/completions [post] func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator *templates.Evaluator, startupOptions *config.ApplicationConfig, natsClient mcpTools.MCPNATSClient, assistantHolder *mcpTools.LocalAIAssistantHolder) echo.HandlerFunc { - process := func(s string, req *schema.OpenAIRequest, config *config.ModelConfig, loader *model.ModelLoader, responses chan schema.OpenAIResponse, extraUsage bool, id string, created int) error { - initialMessage := schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, // we have to return what the user sent here, due to OpenAI spec. - Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0, FinishReason: nil}}, - Object: "chat.completion.chunk", - } - responses <- initialMessage - - // Detect if thinking token is already in prompt or template - // When UseTokenizerTemplate is enabled, predInput is empty, so we check the template - var template string - if config.TemplateConfig.UseTokenizerTemplate { - template = config.GetModelTemplate() - } else { - template = s - } - thinkingStartToken := reason.DetectThinkingStartToken(template, &config.ReasoningConfig) - extractor := reason.NewReasoningExtractor(thinkingStartToken, config.ReasoningConfig) - - _, _, _, err := ComputeChoices(req, s, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, tokenUsage backend.TokenUsage) bool { - var reasoningDelta, contentDelta string - - // Always keep the Go-side extractor in sync with raw tokens so it - // can serve as fallback for backends without an autoparser (e.g. vLLM). - goReasoning, goContent := extractor.ProcessToken(s) - - // When C++ autoparser chat deltas are available, prefer them — they - // handle model-specific formats (Gemma 4, etc.) without Go-side tags. - // Otherwise fall back to Go-side extraction. - if tokenUsage.HasChatDeltaContent() { - rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent() - contentDelta = cd - reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning) - } else { - reasoningDelta = goReasoning - contentDelta = goContent - } - - usage := schema.OpenAIUsage{ - PromptTokens: tokenUsage.Prompt, - CompletionTokens: tokenUsage.Completion, - TotalTokens: tokenUsage.Prompt + tokenUsage.Completion, - } - if extraUsage { - usage.TimingTokenGeneration = tokenUsage.TimingTokenGeneration - usage.TimingPromptProcessing = tokenUsage.TimingPromptProcessing - } - - delta := &schema.Message{} - if contentDelta != "" { - delta.Content = &contentDelta - } - if reasoningDelta != "" { - delta.Reasoning = &reasoningDelta - } - - // Usage rides as a struct field for the consumer to track the - // running cumulative — it is stripped before JSON marshal so the - // wire chunk stays spec-compliant (no `usage` on intermediate - // chunks). The dedicated trailer chunk (when include_usage=true) - // carries the final totals. - usageForChunk := usage - resp := schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, // we have to return what the user sent here, due to OpenAI spec. - Choices: []schema.Choice{{Delta: delta, Index: 0, FinishReason: nil}}, - Object: "chat.completion.chunk", - Usage: &usageForChunk, - } - - responses <- resp - return true - }) - close(responses) - return err - } - processTools := func(noAction string, prompt string, req *schema.OpenAIRequest, config *config.ModelConfig, loader *model.ModelLoader, responses chan schema.OpenAIResponse, extraUsage bool, id string, created int, textContentToReturn *string) error { - // Detect if thinking token is already in prompt or template - var template string - if config.TemplateConfig.UseTokenizerTemplate { - template = config.GetModelTemplate() - } else { - template = prompt - } - thinkingStartToken := reason.DetectThinkingStartToken(template, &config.ReasoningConfig) - extractor := reason.NewReasoningExtractor(thinkingStartToken, config.ReasoningConfig) - - result := "" - lastEmittedCount := 0 - sentInitialRole := false - sentReasoning := false - hasChatDeltaToolCalls := false - hasChatDeltaContent := false - - _, _, chatDeltas, err := ComputeChoices(req, prompt, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool { - result += s - - // Track whether ChatDeltas from the C++ autoparser contain - // tool calls or content, so the retry decision can account for them. - for _, d := range usage.ChatDeltas { - if len(d.ToolCalls) > 0 { - hasChatDeltaToolCalls = true - } - if d.Content != "" { - hasChatDeltaContent = true - } - } - - var reasoningDelta, contentDelta string - - goReasoning, goContent := extractor.ProcessToken(s) - - if usage.HasChatDeltaContent() { - rawReasoning, cd := usage.ChatDeltaReasoningAndContent() - contentDelta = cd - reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning) - } else { - reasoningDelta = goReasoning - contentDelta = goContent - } - - // Emit reasoning deltas in their own SSE chunks before any tool-call chunks - // (OpenAI spec: reasoning and tool_calls never share a delta) - if reasoningDelta != "" { - responses <- schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, - Choices: []schema.Choice{{ - Delta: &schema.Message{Reasoning: &reasoningDelta}, - Index: 0, - }}, - Object: "chat.completion.chunk", - } - sentReasoning = true - } - - // Stream content deltas (cleaned of reasoning tags) while no tool calls - // have been detected. Once the incremental parser finds tool calls, - // content stops — per OpenAI spec, content and tool_calls don't mix. - if lastEmittedCount == 0 && contentDelta != "" { - if !sentInitialRole { - responses <- schema.OpenAIResponse{ - ID: id, Created: created, Model: req.Model, - Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}}, - Object: "chat.completion.chunk", - } - sentInitialRole = true - } - responses <- schema.OpenAIResponse{ - ID: id, Created: created, Model: req.Model, - Choices: []schema.Choice{{ - Delta: &schema.Message{Content: &contentDelta}, - Index: 0, - }}, - Object: "chat.completion.chunk", - } - } - - // Try incremental XML parsing for streaming support using iterative parser - // This allows emitting partial tool calls as they're being generated - cleanedResult := functions.CleanupLLMResult(result, config.FunctionsConfig) - - // Determine XML format from config - var xmlFormat *functions.XMLToolCallFormat - if config.FunctionsConfig.XMLFormat != nil { - xmlFormat = config.FunctionsConfig.XMLFormat - } else if config.FunctionsConfig.XMLFormatPreset != "" { - xmlFormat = functions.GetXMLFormatPreset(config.FunctionsConfig.XMLFormatPreset) - } - - // Use iterative parser for streaming (partial parsing enabled) - // Try XML parsing first - partialResults, parseErr := functions.ParseXMLIterative(cleanedResult, xmlFormat, true) - if parseErr == nil && len(partialResults) > 0 { - // Emit new XML tool calls that weren't emitted before - if len(partialResults) > lastEmittedCount { - for i := lastEmittedCount; i < len(partialResults); i++ { - toolCall := partialResults[i] - initialMessage := schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, - Choices: []schema.Choice{{ - Delta: &schema.Message{ - Role: "assistant", - ToolCalls: []schema.ToolCall{ - { - Index: i, - ID: id, - Type: "function", - FunctionCall: schema.FunctionCall{ - Name: toolCall.Name, - }, - }, - }, - }, - Index: 0, - FinishReason: nil, - }}, - Object: "chat.completion.chunk", - } - select { - case responses <- initialMessage: - default: - } - } - lastEmittedCount = len(partialResults) - } - } else { - // Try JSON tool call parsing for streaming. - // Only emit NEW tool calls (same guard as XML parser above). - jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true) - if jsonErr == nil && len(jsonResults) > lastEmittedCount { - for i := lastEmittedCount; i < len(jsonResults); i++ { - jsonObj := jsonResults[i] - name, ok := jsonObj["name"].(string) - if !ok || name == "" { - continue - } - args := "{}" - if argsVal, ok := jsonObj["arguments"]; ok { - if argsStr, ok := argsVal.(string); ok { - args = argsStr - } else { - argsBytes, _ := json.Marshal(argsVal) - args = string(argsBytes) - } - } - initialMessage := schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, - Choices: []schema.Choice{{ - Delta: &schema.Message{ - Role: "assistant", - ToolCalls: []schema.ToolCall{ - { - Index: i, - ID: id, - Type: "function", - FunctionCall: schema.FunctionCall{ - Name: name, - Arguments: args, - }, - }, - }, - }, - Index: 0, - FinishReason: nil, - }}, - Object: "chat.completion.chunk", - } - responses <- initialMessage - } - lastEmittedCount = len(jsonResults) - } - } - return true - }, - func(attempt int) bool { - // After streaming completes: check if we got actionable content - cleaned := extractor.CleanedContent() - // Check for tool calls from chat deltas (will be re-checked after ComputeChoices, - // but we need to know here whether to retry). - // Also check ChatDelta flags — when the C++ autoparser is active, - // tool calls and content are delivered via ChatDeltas while the - // raw message is cleared. Without this check, we'd retry - // unnecessarily, losing valid results and concatenating output. - hasToolCalls := lastEmittedCount > 0 || hasChatDeltaToolCalls - hasContent := cleaned != "" || hasChatDeltaContent - if !hasContent && !hasToolCalls { - xlog.Warn("Streaming: backend produced only reasoning, retrying", - "reasoning_len", len(extractor.Reasoning()), "attempt", attempt+1) - extractor.ResetAndSuppressReasoning() - result = "" - lastEmittedCount = 0 - sentInitialRole = false - hasChatDeltaToolCalls = false - hasChatDeltaContent = false - return true - } - return false - }, - ) - if err != nil { - return err - } - // Try using pre-parsed tool calls from C++ autoparser (chat deltas) - var functionResults []functions.FuncCallResults - var reasoning string - - if deltaToolCalls := functions.ToolCallsFromChatDeltas(chatDeltas); len(deltaToolCalls) > 0 { - xlog.Debug("[ChatDeltas] Using pre-parsed tool calls from C++ autoparser", "count", len(deltaToolCalls)) - functionResults = deltaToolCalls - // Use content/reasoning from deltas too - *textContentToReturn = functions.ContentFromChatDeltas(chatDeltas) - reasoning = functions.ReasoningFromChatDeltas(chatDeltas) - } else { - // Fallback: parse tool calls from raw text (no chat deltas from backend) - xlog.Debug("[ChatDeltas] no pre-parsed tool calls, falling back to Go-side text parsing") - reasoning = extractor.Reasoning() - cleanedResult := extractor.CleanedContent() - *textContentToReturn = functions.ParseTextContent(cleanedResult, config.FunctionsConfig) - cleanedResult = functions.CleanupLLMResult(cleanedResult, config.FunctionsConfig) - functionResults = functions.ParseFunctionCall(cleanedResult, config.FunctionsConfig) - } - xlog.Debug("[ChatDeltas] final tool call decision", "tool_calls", len(functionResults), "text_content", *textContentToReturn) - // noAction is a sentinel "just answer" pseudo-function — not a real - // tool call. Scan the whole slice rather than only index 0 so we - // don't drop a real tool call that happens to follow a noAction - // entry, and so the default branch isn't entered with only noAction - // entries to emit as tool_calls. - noActionToRun := !hasRealCall(functionResults, noAction) - - switch { - case noActionToRun: - // Token-cumulative usage is communicated to the streaming - // consumer via the per-token callback's chunk struct (stripped - // before wire marshal). The final usage trailer — when the - // caller opted in with stream_options.include_usage — is built - // by the outer streaming loop, not here. - var result string - if !sentInitialRole { - var hqErr error - result, hqErr = handleQuestion(config, functionResults, extractor.CleanedContent(), prompt) - if hqErr != nil { - xlog.Error("error handling question", "error", hqErr) - return hqErr - } - } - for _, chunk := range buildNoActionFinalChunks( - id, req.Model, created, - sentInitialRole, sentReasoning, - result, reasoning, - ) { - responses <- chunk - } - - default: - for _, chunk := range buildDeferredToolCallChunks( - id, req.Model, created, - functionResults, lastEmittedCount, - sentInitialRole, *textContentToReturn, - sentReasoning, reasoning, - ) { - responses <- chunk - } - } - - close(responses) - return err - } - return func(c echo.Context) error { var textContentToReturn string id := uuid.New().String() @@ -697,17 +340,19 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator } responses := make(chan schema.OpenAIResponse) - ended := make(chan error, 1) + ended := make(chan streamWorkerResult, 1) go func() { if !shouldUseFn { - ended <- process(predInput, input, config, ml, responses, extraUsage, id, created) + u, err := processStream(predInput, input, config, cl, startupOptions, ml, responses, id, created) + ended <- streamWorkerResult{usage: u, err: err} } else { - ended <- processTools(noActionName, predInput, input, config, ml, responses, extraUsage, id, created, &textContentToReturn) + u, err := processStreamWithTools(noActionName, predInput, input, config, cl, startupOptions, ml, responses, id, created, &textContentToReturn) + ended <- streamWorkerResult{usage: u, err: err} } }() - usage := &schema.OpenAIUsage{} + var finalUsage backend.TokenUsage toolsCalled := false var collectedToolCalls []schema.ToolCall var collectedContent string @@ -725,13 +370,6 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator xlog.Debug("No choices in the response, skipping") continue } - // Capture the running cumulative usage from this chunk - // (when present) so the include_usage trailer can carry - // the final totals. Usage is stripped before marshal - // below so the wire chunk stays spec-compliant. - if ev.Usage != nil { - usage = ev.Usage - } if len(ev.Choices[0].Delta.ToolCalls) > 0 { toolsCalled = true // Collect and merge tool call deltas for MCP execution @@ -747,11 +385,6 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator collectedContent += *sp } } - // OpenAI streaming spec: intermediate chunks must NOT - // carry a `usage` field. Strip the tracking copy - // before marshalling — usage is delivered via the - // dedicated trailer chunk when include_usage=true. - ev.Usage = nil respData, err := json.Marshal(ev) if err != nil { xlog.Debug("Failed to marshal response", "error", err) @@ -766,15 +399,16 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator return err } c.Response().Flush() - case err := <-ended: - if err == nil { + case res := <-ended: + if res.err == nil { + finalUsage = res.usage break LOOP } - xlog.Error("Stream ended with error", "error", err) + xlog.Error("Stream ended with error", "error", res.err) errorResp := schema.ErrorResponse{ Error: &schema.APIError{ - Message: err.Error(), + Message: res.err.Error(), Type: "server_error", Code: "server_error", }, @@ -797,7 +431,10 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator // still trying to send (e.g., after client disconnect). The goroutine // calls close(responses) when done, which terminates the drain. if input.Context.Err() != nil { - go func() { for range responses {} }() + go func() { + for range responses { + } + }() <-ended } @@ -921,8 +558,16 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator // Trailing usage chunk per OpenAI spec: emit only when the // caller opted in via stream_options.include_usage. Shape: // {"choices":[],"usage":{...},"object":"chat.completion.chunk",...} - if input.StreamOptions != nil && input.StreamOptions.IncludeUsage && usage != nil { - trailer := streamUsageTrailerJSON(id, input.Model, created, *usage) + // + // finalUsage is the authoritative TokenUsage returned by the + // worker function (process / processTools) via the `ended` + // channel. The worker reads it from ComputeChoices' return + // value, which is the cumulative count produced by the backend + // over the whole prediction. Issue #9927 was caused by the + // tools-path worker not surfacing this value at all. + if input.StreamOptions != nil && input.StreamOptions.IncludeUsage { + trailerUsage := streamUsageFromTokenUsage(finalUsage, extraUsage) + trailer := streamUsageTrailerJSON(id, input.Model, created, trailerUsage) _, _ = fmt.Fprintf(c.Response().Writer, "data: %s\n\n", trailer) } diff --git a/core/http/endpoints/openai/chat_emit.go b/core/http/endpoints/openai/chat_emit.go index ba182e77d..43d4ab2af 100644 --- a/core/http/endpoints/openai/chat_emit.go +++ b/core/http/endpoints/openai/chat_emit.go @@ -4,10 +4,39 @@ import ( "encoding/json" "fmt" + "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/functions" ) +// streamWorkerResult is what the streaming workers (process / processTools) +// hand back to the outer ChatEndpoint loop through the `ended` channel. +// Threading the final TokenUsage here, instead of piggy-backing it on the +// `responses` SSE channel, keeps the SSE channel single-purpose (wire chunks) +// and gives the trailer emitter a plain Go value to read after LOOP exits. +// Fix for issue #9927: the previous tools-path worker never surfaced the +// cumulative token counts at all, so the include_usage trailer reported zeros. +type streamWorkerResult struct { + usage backend.TokenUsage + err error +} + +// streamUsageFromTokenUsage converts the backend's cumulative TokenUsage into +// the OpenAI-spec OpenAIUsage shape used on the wire. `extraUsage` controls +// whether the non-standard timing fields are forwarded. +func streamUsageFromTokenUsage(usage backend.TokenUsage, extraUsage bool) schema.OpenAIUsage { + out := schema.OpenAIUsage{ + PromptTokens: usage.Prompt, + CompletionTokens: usage.Completion, + TotalTokens: usage.Prompt + usage.Completion, + } + if extraUsage { + out.TimingTokenGeneration = usage.TimingTokenGeneration + out.TimingPromptProcessing = usage.TimingPromptProcessing + } + return out +} + // streamUsageTrailerJSON returns the bytes of the OpenAI-spec trailing usage // chunk emitted in streaming completions when the request opts in via // `stream_options.include_usage: true`. The shape is: diff --git a/core/http/endpoints/openai/chat_stream_usage_test.go b/core/http/endpoints/openai/chat_stream_usage_test.go index 2bba7b91c..7f7338e55 100644 --- a/core/http/endpoints/openai/chat_stream_usage_test.go +++ b/core/http/endpoints/openai/chat_stream_usage_test.go @@ -1,10 +1,14 @@ package openai import ( + "context" "encoding/json" + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/functions" + "github.com/mudler/LocalAI/pkg/model" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -152,6 +156,28 @@ var _ = Describe("streaming usage spec compliance", func() { }) }) + Describe("streamUsageFromTokenUsage", func() { + It("converts backend TokenUsage to schema OpenAIUsage", func() { + tu := backend.TokenUsage{Prompt: 18, Completion: 213} + u := streamUsageFromTokenUsage(tu, false) + Expect(u.PromptTokens).To(Equal(18)) + Expect(u.CompletionTokens).To(Equal(213)) + Expect(u.TotalTokens).To(Equal(231)) + Expect(u.TimingTokenGeneration).To(BeZero()) + Expect(u.TimingPromptProcessing).To(BeZero()) + }) + It("includes timings when extraUsage is true", func() { + tu := backend.TokenUsage{ + Prompt: 10, Completion: 20, + TimingPromptProcessing: 0.5, + TimingTokenGeneration: 1.5, + } + u := streamUsageFromTokenUsage(tu, true) + Expect(u.TimingPromptProcessing).To(Equal(0.5)) + Expect(u.TimingTokenGeneration).To(Equal(1.5)) + }) + }) + Describe("OpenAIRequest.StreamOptions", func() { It("parses stream_options.include_usage=true", func() { body := []byte(`{ @@ -177,3 +203,160 @@ var _ = Describe("streaming usage spec compliance", func() { }) }) }) + +// Functional regression coverage for issue #9927: the streaming workers +// must surface the cumulative TokenUsage returned by ComputeChoices to +// their caller. The earlier broken implementations discarded that value +// (`_, _, chatDeltas, err := ComputeChoices(...)`) and threw away the +// counts on the floor, so the include_usage trailer always reported +// zeros when tools were enabled. +// +// These tests stub backend.ModelInferenceFunc so the worker exercises the +// real ComputeChoices → predFunc → LLMResponse pipeline. If a future change +// drops the TokenUsage somewhere along that path, the assertions on the +// returned value fail with a concrete count mismatch (e.g. 0 vs 213), +// not with a "function undefined" compile error. +var _ = Describe("streaming workers surface final TokenUsage (issue #9927)", func() { + var ( + origInference modelInferenceFunc + appCfg *config.ApplicationConfig + ) + + BeforeEach(func() { + origInference = backend.ModelInferenceFunc + appCfg = config.NewApplicationConfig() + }) + + AfterEach(func() { + backend.ModelInferenceFunc = origInference + }) + + // mockBackendUsage installs a stub backend that yields one LLMResponse + // carrying the supplied TokenUsage. ComputeChoices' single-attempt path + // copies these counts into the value it returns to the worker. + mockBackendUsage := func(usage backend.TokenUsage, response string) { + backend.ModelInferenceFunc = func( + ctx context.Context, s string, messages schema.Messages, + images, videos, audios []string, + loader *model.ModelLoader, c *config.ModelConfig, cl *config.ModelConfigLoader, + o *config.ApplicationConfig, + tokenCallback func(string, backend.TokenUsage) bool, + tools, toolChoice string, + logprobs, topLogprobs *int, + logitBias map[string]float64, + metadata map[string]string, + ) (func() (backend.LLMResponse, error), error) { + return func() (backend.LLMResponse, error) { + return backend.LLMResponse{ + Response: response, + Usage: usage, + }, nil + }, nil + } + } + + makeReq := func() *schema.OpenAIRequest { + ctx, cancel := context.WithCancel(context.Background()) + req := &schema.OpenAIRequest{ + Context: ctx, + Cancel: cancel, + } + req.Model = "test-model" // promoted from BasicModelRequest + return req + } + + // drainResponses consumes everything the worker pushes onto the channel + // so the worker is never blocked on its send. The channel is unbuffered + // (matching production), so the drain goroutine must be running before + // the worker is called. + drainResponses := func(ch <-chan schema.OpenAIResponse) <-chan struct{} { + done := make(chan struct{}) + go func() { + for range ch { + } + close(done) + }() + return done + } + + Describe("processStream (no-tools path)", func() { + It("returns the cumulative TokenUsage produced by the backend", func() { + mockBackendUsage(backend.TokenUsage{Prompt: 18, Completion: 213}, "Hello there") + + req := makeReq() + cfg := &config.ModelConfig{} + responses := make(chan schema.OpenAIResponse) + done := drainResponses(responses) + + actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0) + <-done + + Expect(err).ToNot(HaveOccurred()) + Expect(actual.Prompt).To(Equal(18), + "prompt tokens must round-trip from backend through processStream") + Expect(actual.Completion).To(Equal(213), + "completion tokens must round-trip from backend through processStream") + }) + + It("returns zero TokenUsage when the backend reports zero (negative control)", func() { + mockBackendUsage(backend.TokenUsage{}, "x") + + req := makeReq() + cfg := &config.ModelConfig{} + responses := make(chan schema.OpenAIResponse) + done := drainResponses(responses) + + actual, err := processStream("prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0) + <-done + + Expect(err).ToNot(HaveOccurred()) + Expect(actual.Prompt).To(BeZero()) + Expect(actual.Completion).To(BeZero()) + }) + }) + + Describe("processStreamWithTools (tools path)", func() { + It("returns the cumulative TokenUsage produced by the backend", func() { + // This is the direct regression check for issue #9927: with tools + // enabled, the trailer was reporting {0,0,0} because the worker + // discarded ComputeChoices' second return value. + mockBackendUsage(backend.TokenUsage{Prompt: 18, Completion: 213}, "answer") + + req := makeReq() + cfg := &config.ModelConfig{} + responses := make(chan schema.OpenAIResponse) + done := drainResponses(responses) + var textContent string + + actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent) + <-done + + Expect(err).ToNot(HaveOccurred()) + Expect(actual.Prompt).To(Equal(18), + "prompt tokens must round-trip from backend through processStreamWithTools (issue #9927)") + Expect(actual.Completion).To(Equal(213), + "completion tokens must round-trip from backend through processStreamWithTools (issue #9927)") + }) + + It("forwards timing fields when the backend supplies them", func() { + mockBackendUsage(backend.TokenUsage{ + Prompt: 10, Completion: 20, + TimingPromptProcessing: 0.5, + TimingTokenGeneration: 1.5, + }, "answer") + + req := makeReq() + cfg := &config.ModelConfig{} + responses := make(chan schema.OpenAIResponse) + done := drainResponses(responses) + var textContent string + + actual, err := processStreamWithTools("none", "prompt", req, cfg, nil, appCfg, nil, responses, "req-1", 0, &textContent) + <-done + + Expect(err).ToNot(HaveOccurred()) + Expect(actual.TimingPromptProcessing).To(Equal(0.5)) + Expect(actual.TimingTokenGeneration).To(Equal(1.5)) + }) + }) +}) diff --git a/core/http/endpoints/openai/chat_stream_workers.go b/core/http/endpoints/openai/chat_stream_workers.go new file mode 100644 index 000000000..f06da9111 --- /dev/null +++ b/core/http/endpoints/openai/chat_stream_workers.go @@ -0,0 +1,390 @@ +package openai + +import ( + "encoding/json" + + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/functions" + "github.com/mudler/LocalAI/pkg/model" + reason "github.com/mudler/LocalAI/pkg/reasoning" + "github.com/mudler/xlog" +) + +// processStream is the streaming worker for chat completions with no +// tool/function calling involved. It pushes SSE-shaped chunks onto +// `responses` and returns the authoritative cumulative TokenUsage from +// the prediction so the caller can populate the include_usage trailer +// without having to peek inside the chunks. +// +// The caller owns the `responses` channel and is expected to read from +// it while this function runs; processStream closes the channel before +// returning. +func processStream( + s string, + req *schema.OpenAIRequest, + cfg *config.ModelConfig, + cl *config.ModelConfigLoader, + startupOptions *config.ApplicationConfig, + loader *model.ModelLoader, + responses chan schema.OpenAIResponse, + id string, + created int, +) (backend.TokenUsage, error) { + responses <- schema.OpenAIResponse{ + ID: id, + Created: created, + Model: req.Model, // we have to return what the user sent here, due to OpenAI spec. + Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0, FinishReason: nil}}, + Object: "chat.completion.chunk", + } + + // Detect if thinking token is already in prompt or template + // When UseTokenizerTemplate is enabled, predInput is empty, so we check the template + var template string + if cfg.TemplateConfig.UseTokenizerTemplate { + template = cfg.GetModelTemplate() + } else { + template = s + } + thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig) + extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig) + + _, finalUsage, _, err := ComputeChoices(req, s, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, tokenUsage backend.TokenUsage) bool { + var reasoningDelta, contentDelta string + + // Always keep the Go-side extractor in sync with raw tokens so it + // can serve as fallback for backends without an autoparser (e.g. vLLM). + goReasoning, goContent := extractor.ProcessToken(s) + + // When C++ autoparser chat deltas are available, prefer them: they + // handle model-specific formats (Gemma 4, etc.) without Go-side tags. + // Otherwise fall back to Go-side extraction. + if tokenUsage.HasChatDeltaContent() { + rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent() + contentDelta = cd + reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning) + } else { + reasoningDelta = goReasoning + contentDelta = goContent + } + + delta := &schema.Message{} + if contentDelta != "" { + delta.Content = &contentDelta + } + if reasoningDelta != "" { + delta.Reasoning = &reasoningDelta + } + + responses <- schema.OpenAIResponse{ + ID: id, + Created: created, + Model: req.Model, // we have to return what the user sent here, due to OpenAI spec. + Choices: []schema.Choice{{Delta: delta, Index: 0, FinishReason: nil}}, + Object: "chat.completion.chunk", + } + return true + }) + close(responses) + return finalUsage, err +} + +// processStreamWithTools is the streaming worker for chat completions +// with tools / function calling. Same contract as processStream: pushes +// chunks onto `responses`, closes the channel, returns the cumulative +// TokenUsage. +// +// Returning the TokenUsage as a normal Go value (rather than smuggling +// it on a sentinel chunk) is the fix for issue #9927 — the previous +// implementation discarded the value from ComputeChoices, so the +// include_usage trailer reported zeros whenever `tools` was in play. +func processStreamWithTools( + noAction string, + prompt string, + req *schema.OpenAIRequest, + cfg *config.ModelConfig, + cl *config.ModelConfigLoader, + startupOptions *config.ApplicationConfig, + loader *model.ModelLoader, + responses chan schema.OpenAIResponse, + id string, + created int, + textContentToReturn *string, +) (backend.TokenUsage, error) { + // Detect if thinking token is already in prompt or template + var template string + if cfg.TemplateConfig.UseTokenizerTemplate { + template = cfg.GetModelTemplate() + } else { + template = prompt + } + thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig) + extractor := reason.NewReasoningExtractor(thinkingStartToken, cfg.ReasoningConfig) + + result := "" + lastEmittedCount := 0 + sentInitialRole := false + sentReasoning := false + hasChatDeltaToolCalls := false + hasChatDeltaContent := false + + _, finalUsage, chatDeltas, err := ComputeChoices(req, prompt, cfg, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool { + result += s + + // Track whether ChatDeltas from the C++ autoparser contain + // tool calls or content, so the retry decision can account for them. + for _, d := range usage.ChatDeltas { + if len(d.ToolCalls) > 0 { + hasChatDeltaToolCalls = true + } + if d.Content != "" { + hasChatDeltaContent = true + } + } + + var reasoningDelta, contentDelta string + + goReasoning, goContent := extractor.ProcessToken(s) + + if usage.HasChatDeltaContent() { + rawReasoning, cd := usage.ChatDeltaReasoningAndContent() + contentDelta = cd + reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning) + } else { + reasoningDelta = goReasoning + contentDelta = goContent + } + + // Emit reasoning deltas in their own SSE chunks before any tool-call chunks + // (OpenAI spec: reasoning and tool_calls never share a delta) + if reasoningDelta != "" { + responses <- schema.OpenAIResponse{ + ID: id, + Created: created, + Model: req.Model, + Choices: []schema.Choice{{ + Delta: &schema.Message{Reasoning: &reasoningDelta}, + Index: 0, + }}, + Object: "chat.completion.chunk", + } + sentReasoning = true + } + + // Stream content deltas (cleaned of reasoning tags) while no tool calls + // have been detected. Once the incremental parser finds tool calls, + // content stops: per OpenAI spec, content and tool_calls don't mix. + if lastEmittedCount == 0 && contentDelta != "" { + if !sentInitialRole { + responses <- schema.OpenAIResponse{ + ID: id, Created: created, Model: req.Model, + Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}}, + Object: "chat.completion.chunk", + } + sentInitialRole = true + } + responses <- schema.OpenAIResponse{ + ID: id, Created: created, Model: req.Model, + Choices: []schema.Choice{{ + Delta: &schema.Message{Content: &contentDelta}, + Index: 0, + }}, + Object: "chat.completion.chunk", + } + } + + // Try incremental XML parsing for streaming support using iterative parser + // This allows emitting partial tool calls as they're being generated + cleanedResult := functions.CleanupLLMResult(result, cfg.FunctionsConfig) + + // Determine XML format from config + var xmlFormat *functions.XMLToolCallFormat + if cfg.FunctionsConfig.XMLFormat != nil { + xmlFormat = cfg.FunctionsConfig.XMLFormat + } else if cfg.FunctionsConfig.XMLFormatPreset != "" { + xmlFormat = functions.GetXMLFormatPreset(cfg.FunctionsConfig.XMLFormatPreset) + } + + // Use iterative parser for streaming (partial parsing enabled) + // Try XML parsing first + partialResults, parseErr := functions.ParseXMLIterative(cleanedResult, xmlFormat, true) + if parseErr == nil && len(partialResults) > 0 { + // Emit new XML tool calls that weren't emitted before + if len(partialResults) > lastEmittedCount { + for i := lastEmittedCount; i < len(partialResults); i++ { + toolCall := partialResults[i] + initialMessage := schema.OpenAIResponse{ + ID: id, + Created: created, + Model: req.Model, + Choices: []schema.Choice{{ + Delta: &schema.Message{ + Role: "assistant", + ToolCalls: []schema.ToolCall{ + { + Index: i, + ID: id, + Type: "function", + FunctionCall: schema.FunctionCall{ + Name: toolCall.Name, + }, + }, + }, + }, + Index: 0, + FinishReason: nil, + }}, + Object: "chat.completion.chunk", + } + select { + case responses <- initialMessage: + default: + } + } + lastEmittedCount = len(partialResults) + } + } else { + // Try JSON tool call parsing for streaming. + // Only emit NEW tool calls (same guard as XML parser above). + jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true) + if jsonErr == nil && len(jsonResults) > lastEmittedCount { + for i := lastEmittedCount; i < len(jsonResults); i++ { + jsonObj := jsonResults[i] + name, ok := jsonObj["name"].(string) + if !ok || name == "" { + continue + } + args := "{}" + if argsVal, ok := jsonObj["arguments"]; ok { + if argsStr, ok := argsVal.(string); ok { + args = argsStr + } else { + argsBytes, _ := json.Marshal(argsVal) + args = string(argsBytes) + } + } + initialMessage := schema.OpenAIResponse{ + ID: id, + Created: created, + Model: req.Model, + Choices: []schema.Choice{{ + Delta: &schema.Message{ + Role: "assistant", + ToolCalls: []schema.ToolCall{ + { + Index: i, + ID: id, + Type: "function", + FunctionCall: schema.FunctionCall{ + Name: name, + Arguments: args, + }, + }, + }, + }, + Index: 0, + FinishReason: nil, + }}, + Object: "chat.completion.chunk", + } + responses <- initialMessage + } + lastEmittedCount = len(jsonResults) + } + } + return true + }, + func(attempt int) bool { + // After streaming completes: check if we got actionable content + cleaned := extractor.CleanedContent() + // Check for tool calls from chat deltas (will be re-checked after ComputeChoices, + // but we need to know here whether to retry). + // Also check ChatDelta flags: when the C++ autoparser is active, + // tool calls and content are delivered via ChatDeltas while the + // raw message is cleared. Without this check, we'd retry + // unnecessarily, losing valid results and concatenating output. + hasToolCalls := lastEmittedCount > 0 || hasChatDeltaToolCalls + hasContent := cleaned != "" || hasChatDeltaContent + if !hasContent && !hasToolCalls { + xlog.Warn("Streaming: backend produced only reasoning, retrying", + "reasoning_len", len(extractor.Reasoning()), "attempt", attempt+1) + extractor.ResetAndSuppressReasoning() + result = "" + lastEmittedCount = 0 + sentInitialRole = false + hasChatDeltaToolCalls = false + hasChatDeltaContent = false + return true + } + return false + }, + ) + if err != nil { + return finalUsage, err + } + // Try using pre-parsed tool calls from C++ autoparser (chat deltas) + var functionResults []functions.FuncCallResults + var reasoning string + + if deltaToolCalls := functions.ToolCallsFromChatDeltas(chatDeltas); len(deltaToolCalls) > 0 { + xlog.Debug("[ChatDeltas] Using pre-parsed tool calls from C++ autoparser", "count", len(deltaToolCalls)) + functionResults = deltaToolCalls + // Use content/reasoning from deltas too + *textContentToReturn = functions.ContentFromChatDeltas(chatDeltas) + reasoning = functions.ReasoningFromChatDeltas(chatDeltas) + } else { + // Fallback: parse tool calls from raw text (no chat deltas from backend) + xlog.Debug("[ChatDeltas] no pre-parsed tool calls, falling back to Go-side text parsing") + reasoning = extractor.Reasoning() + cleanedResult := extractor.CleanedContent() + *textContentToReturn = functions.ParseTextContent(cleanedResult, cfg.FunctionsConfig) + cleanedResult = functions.CleanupLLMResult(cleanedResult, cfg.FunctionsConfig) + functionResults = functions.ParseFunctionCall(cleanedResult, cfg.FunctionsConfig) + } + xlog.Debug("[ChatDeltas] final tool call decision", "tool_calls", len(functionResults), "text_content", *textContentToReturn) + // noAction is a sentinel "just answer" pseudo-function: not a real + // tool call. Scan the whole slice rather than only index 0 so we + // don't drop a real tool call that happens to follow a noAction + // entry, and so the default branch isn't entered with only noAction + // entries to emit as tool_calls. + noActionToRun := !hasRealCall(functionResults, noAction) + + switch { + case noActionToRun: + // The final usage trailer (when the caller opted in with + // stream_options.include_usage) is built by the outer streaming + // loop from the TokenUsage this function returns, not from any + // chunk on the responses channel. + var result string + if !sentInitialRole { + var hqErr error + result, hqErr = handleQuestion(cfg, functionResults, extractor.CleanedContent(), prompt) + if hqErr != nil { + xlog.Error("error handling question", "error", hqErr) + return finalUsage, hqErr + } + } + for _, chunk := range buildNoActionFinalChunks( + id, req.Model, created, + sentInitialRole, sentReasoning, + result, reasoning, + ) { + responses <- chunk + } + + default: + for _, chunk := range buildDeferredToolCallChunks( + id, req.Model, created, + functionResults, lastEmittedCount, + sentInitialRole, *textContentToReturn, + sentReasoning, reasoning, + ) { + responses <- chunk + } + } + + close(responses) + return finalUsage, err +}