From 391c1a3dcc2c4fdc24313c046b2e94ac834fb0bc Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 25 May 2026 20:50:20 +0000 Subject: [PATCH] test(streaming/tools): cover the autoparser-correctly-working path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the JSON tool-call streaming emit loop into emitJSONToolCallDeltas and unit-test it against every shape that can hit the streaming worker: * the bug case — a healing-marker stub at index 0 must NOT bump lastEmittedCount, so subsequent content chunks keep flowing; * the autoparser-correctly-working case — empty jsonResults (because the C++ autoparser cleared the raw text and delivers tool calls via TokenUsage.ChatDeltas) is a no-op, leaving the deferred end-of-stream emitter to ship the autoparser's tool calls; * a single complete tool call — emit one chunk, advance to 1; * arguments arriving as a JSON-string vs as a nested object — both serialize to the wire as JSON-string arguments; * multiple parallel tool calls — one chunk each; * a real tool call followed by a partial stub — emit the real one, stop at the stub, resume on a later chunk once the stub completes. Locks down the no-regression guarantee the user asked for: this PR's fix is scoped to the pure-content fallback path; when the autoparser actually classifies tool calls (jinja-recognized chat format with tool support), the helper is a no-op and nothing changes. Assisted-by: Claude:opus-4-7 [Read] [Edit] [Bash] [Write] Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/chat_stream_workers.go | 128 +++++++----- .../openai/chat_stream_workers_test.go | 197 ++++++++++++++++++ 2 files changed, 272 insertions(+), 53 deletions(-) create mode 100644 core/http/endpoints/openai/chat_stream_workers_test.go diff --git a/core/http/endpoints/openai/chat_stream_workers.go b/core/http/endpoints/openai/chat_stream_workers.go index b45733ea0..9d962e9b9 100644 --- a/core/http/endpoints/openai/chat_stream_workers.go +++ b/core/http/endpoints/openai/chat_stream_workers.go @@ -12,6 +12,77 @@ import ( "github.com/mudler/xlog" ) +// emitJSONToolCallDeltas iterates the JSON tool-call objects produced by the +// streaming tool-call detector and emits SSE chunks for the ones the caller +// hasn't already emitted. It returns the new lastEmittedCount. +// +// Semantics: +// - Skips entries before lastEmittedCount (already emitted). +// - Emits one tool_call chunk per consecutive entry that has a usable +// `name` string. +// - Stops at the first entry without a name (typically the partial-JSON +// tail or a healing-marker stub — see issue #9988) so the caller doesn't +// advance past it. Bumping lastEmittedCount past an unparsed stub +// permanently gates off content emission for the rest of the stream. +// - When jsonResults is empty (the autoparser-working case, where the raw +// text result is cleared and only ChatDeltas carry tool calls), this is +// a no-op and lastEmittedCount is returned unchanged. +// +// The autoparser-correctly-classifying-tool-calls path is unaffected: it +// delivers tool calls via TokenUsage.ChatDeltas, and the deferred +// end-of-stream block (ToolCallsFromChatDeltas → buildDeferredToolCallChunks) +// emits them; this helper sees an empty jsonResults and emits nothing. +func emitJSONToolCallDeltas( + jsonResults []map[string]any, + lastEmittedCount int, + id, model string, + created int, + responses chan<- schema.OpenAIResponse, +) int { + for i := lastEmittedCount; i < len(jsonResults); i++ { + jsonObj := jsonResults[i] + name, ok := jsonObj["name"].(string) + if !ok || name == "" { + break + } + args := "{}" + if argsVal, ok := jsonObj["arguments"]; ok { + if argsStr, ok := argsVal.(string); ok { + args = argsStr + } else { + argsBytes, _ := json.Marshal(argsVal) + args = string(argsBytes) + } + } + responses <- schema.OpenAIResponse{ + ID: id, + Created: created, + Model: model, + Choices: []schema.Choice{{ + Delta: &schema.Message{ + Role: "assistant", + ToolCalls: []schema.ToolCall{ + { + Index: i, + ID: id, + Type: "function", + FunctionCall: schema.FunctionCall{ + Name: name, + Arguments: args, + }, + }, + }, + }, + Index: 0, + FinishReason: nil, + }}, + Object: "chat.completion.chunk", + } + lastEmittedCount = i + 1 + } + return lastEmittedCount +} + // processStream is the streaming worker for chat completions with no // tool/function calling involved. It pushes SSE-shaped chunks onto // `responses` and returns the authoritative cumulative TokenUsage from @@ -263,60 +334,11 @@ func processStreamWithTools( } else { // Try JSON tool call parsing for streaming. // Only emit NEW tool calls (same guard as XML parser above). - // - // Issue #9988 defense: ParseJSONIterative may return stub objects - // for partial input that has not yet committed a tool name (e.g. - // `{"n` healed to `{"n":1}`). Treat any entry without a usable - // `name` as "not yet a tool call" — break instead of continue, and - // advance lastEmittedCount only past actually-emitted entries. The - // previous version of this block set - // `lastEmittedCount = len(jsonResults)` unconditionally, which - // gated off ALL subsequent content emission as soon as one stub - // landed in results (the qwen3 + streaming + tools "{\"" leak). jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true) - if jsonErr == nil && len(jsonResults) > lastEmittedCount { - for i := lastEmittedCount; i < len(jsonResults); i++ { - jsonObj := jsonResults[i] - name, ok := jsonObj["name"].(string) - if !ok || name == "" { - break - } - args := "{}" - if argsVal, ok := jsonObj["arguments"]; ok { - if argsStr, ok := argsVal.(string); ok { - args = argsStr - } else { - argsBytes, _ := json.Marshal(argsVal) - args = string(argsBytes) - } - } - initialMessage := schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, - Choices: []schema.Choice{{ - Delta: &schema.Message{ - Role: "assistant", - ToolCalls: []schema.ToolCall{ - { - Index: i, - ID: id, - Type: "function", - FunctionCall: schema.FunctionCall{ - Name: name, - Arguments: args, - }, - }, - }, - }, - Index: 0, - FinishReason: nil, - }}, - Object: "chat.completion.chunk", - } - responses <- initialMessage - lastEmittedCount = i + 1 - } + if jsonErr == nil { + lastEmittedCount = emitJSONToolCallDeltas( + jsonResults, lastEmittedCount, id, req.Model, created, responses, + ) } } return true diff --git a/core/http/endpoints/openai/chat_stream_workers_test.go b/core/http/endpoints/openai/chat_stream_workers_test.go new file mode 100644 index 000000000..3f5340151 --- /dev/null +++ b/core/http/endpoints/openai/chat_stream_workers_test.go @@ -0,0 +1,197 @@ +package openai + +import ( + "github.com/mudler/LocalAI/core/schema" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// drainChannel reads everything currently buffered on a channel without +// blocking on close. The helper test channels are sized for the assertions. +func drainChannel(ch <-chan schema.OpenAIResponse) []schema.OpenAIResponse { + var out []schema.OpenAIResponse + for { + select { + case r, ok := <-ch: + if !ok { + return out + } + out = append(out, r) + default: + return out + } + } +} + +// nameOf returns the name of the first tool call carried on the choice's +// delta, or "" if none. +func nameOf(r schema.OpenAIResponse) string { + if len(r.Choices) == 0 || r.Choices[0].Delta == nil { + return "" + } + if len(r.Choices[0].Delta.ToolCalls) == 0 { + return "" + } + return r.Choices[0].Delta.ToolCalls[0].FunctionCall.Name +} + +var _ = Describe("emitJSONToolCallDeltas", func() { + const ( + id = "test-stream" + model = "test-model" + created = 1700000000 + ) + + // The case that motivated this helper. With the previous version of + // the streaming worker, ParseJSONIterative would hand back a stub + // object like `{"4310046988783340008":1}` after the model had only + // emitted `{`. The worker bumped lastEmittedCount unconditionally, + // which permanently gated off content emission for the rest of the + // stream (qwen3-4b with stream:true + tools dribbled only `{"` to + // the client and then nothing). See issue #9988. + Context("partial stub without a usable name", func() { + It("does NOT bump lastEmittedCount and emits nothing", func() { + responses := make(chan schema.OpenAIResponse, 4) + // What ParseJSONIterative used to return for `{`: + stubResults := []map[string]any{ + {"4310046988783340008": float64(1)}, + } + + next := emitJSONToolCallDeltas(stubResults, 0, id, model, created, responses) + + Expect(next).To(Equal(0), + "lastEmittedCount must NOT advance past a stub without a name "+ + "— otherwise content emission gets permanently gated off") + Expect(drainChannel(responses)).To(BeEmpty(), + "no tool_call chunk should be emitted for a stub without a name") + }) + }) + + // No-regression #1: the autoparser-correctly-working path. When the + // C++ autoparser classifies tool calls itself, the raw text result is + // cleared and ParseJSONIterative on it returns no results — this + // helper must be a no-op so the deferred end-of-stream code can emit + // the tool calls from TokenUsage.ChatDeltas. + Context("empty jsonResults (autoparser-correctly-working path)", func() { + It("is a no-op and leaves lastEmittedCount unchanged", func() { + responses := make(chan schema.OpenAIResponse, 4) + next := emitJSONToolCallDeltas(nil, 0, id, model, created, responses) + Expect(next).To(Equal(0)) + Expect(drainChannel(responses)).To(BeEmpty()) + }) + + It("leaves a non-zero lastEmittedCount unchanged when later called with the same length", func() { + responses := make(chan schema.OpenAIResponse, 4) + results := []map[string]any{ + {"name": "search", "arguments": map[string]any{"q": "hi"}}, + } + // First call emits the one available tool call. + next := emitJSONToolCallDeltas(results, 0, id, model, created, responses) + Expect(next).To(Equal(1)) + Expect(drainChannel(responses)).To(HaveLen(1)) + + // Subsequent chunks haven't grown the slice — must be a no-op. + next = emitJSONToolCallDeltas(results, next, id, model, created, responses) + Expect(next).To(Equal(1)) + Expect(drainChannel(responses)).To(BeEmpty()) + }) + }) + + // No-regression #2: the normal completed-JSON path. When the model + // emits a real, complete tool call as JSON in raw content (e.g. qwen3 + // without jinja but with tools), we should emit exactly one tool_call + // SSE chunk on the first call and become a no-op on later calls. + Context("single complete tool call", func() { + It("emits one tool_call chunk and bumps lastEmittedCount to 1", func() { + responses := make(chan schema.OpenAIResponse, 4) + results := []map[string]any{ + { + "name": "search", + "arguments": map[string]any{ + "q": "hello", + }, + }, + } + + next := emitJSONToolCallDeltas(results, 0, id, model, created, responses) + + Expect(next).To(Equal(1)) + out := drainChannel(responses) + Expect(out).To(HaveLen(1)) + Expect(nameOf(out[0])).To(Equal("search")) + Expect(out[0].Choices[0].Delta.ToolCalls[0].FunctionCall.Arguments). + To(ContainSubstring(`"q":"hello"`)) + }) + + It("accepts arguments already serialized as a string", func() { + responses := make(chan schema.OpenAIResponse, 4) + results := []map[string]any{ + { + "name": "search", + "arguments": `{"q":"hello"}`, + }, + } + + emitJSONToolCallDeltas(results, 0, id, model, created, responses) + + out := drainChannel(responses) + Expect(out).To(HaveLen(1)) + Expect(out[0].Choices[0].Delta.ToolCalls[0].FunctionCall.Arguments). + To(Equal(`{"q":"hello"}`)) + }) + }) + + // No-regression #3: multiple tool calls (parallel tool calling). + // Both must be emitted, lastEmittedCount must end at 2. + Context("multiple complete tool calls", func() { + It("emits one chunk per tool call and bumps lastEmittedCount to len(results)", func() { + responses := make(chan schema.OpenAIResponse, 8) + results := []map[string]any{ + {"name": "search", "arguments": map[string]any{"q": "a"}}, + {"name": "browse", "arguments": map[string]any{"url": "b"}}, + } + + next := emitJSONToolCallDeltas(results, 0, id, model, created, responses) + + Expect(next).To(Equal(2)) + out := drainChannel(responses) + Expect(out).To(HaveLen(2)) + Expect(nameOf(out[0])).To(Equal("search")) + Expect(nameOf(out[1])).To(Equal("browse")) + }) + }) + + // The streaming-tail case: incremental chunks. First parse returns + // one complete tool call followed by a partial stub; later chunks + // complete the second tool call. We must emit the first immediately + // and the second on the later call — without ever bumping past the + // stub mid-stream. + Context("partial tail behind a real tool call", func() { + It("emits the complete entry, stops at the stub, and resumes once the tail completes", func() { + responses := make(chan schema.OpenAIResponse, 8) + + // Chunk 1: one real call + a partial stub for the next. + chunk1 := []map[string]any{ + {"name": "search", "arguments": map[string]any{"q": "a"}}, + {"4310046988783340008": float64(1)}, + } + next := emitJSONToolCallDeltas(chunk1, 0, id, model, created, responses) + Expect(next).To(Equal(1), + "must NOT advance to 2 — the stub at index 1 has no usable name") + out := drainChannel(responses) + Expect(out).To(HaveLen(1)) + Expect(nameOf(out[0])).To(Equal("search")) + + // Chunk 2: the stub completes into a real call. + chunk2 := []map[string]any{ + {"name": "search", "arguments": map[string]any{"q": "a"}}, + {"name": "browse", "arguments": map[string]any{"url": "b"}}, + } + next = emitJSONToolCallDeltas(chunk2, next, id, model, created, responses) + Expect(next).To(Equal(2)) + out = drainChannel(responses) + Expect(out).To(HaveLen(1)) + Expect(nameOf(out[0])).To(Equal("browse")) + }) + }) +})