mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-30 11:36:31 -04:00
test(streaming/tools): cover the autoparser-correctly-working path
Extract the JSON tool-call streaming emit loop into emitJSONToolCallDeltas and unit-test it against every shape that can hit the streaming worker: * the bug case — a healing-marker stub at index 0 must NOT bump lastEmittedCount, so subsequent content chunks keep flowing; * the autoparser-correctly-working case — empty jsonResults (because the C++ autoparser cleared the raw text and delivers tool calls via TokenUsage.ChatDeltas) is a no-op, leaving the deferred end-of-stream emitter to ship the autoparser's tool calls; * a single complete tool call — emit one chunk, advance to 1; * arguments arriving as a JSON-string vs as a nested object — both serialize to the wire as JSON-string arguments; * multiple parallel tool calls — one chunk each; * a real tool call followed by a partial stub — emit the real one, stop at the stub, resume on a later chunk once the stub completes. Locks down the no-regression guarantee the user asked for: this PR's fix is scoped to the pure-content fallback path; when the autoparser actually classifies tool calls (jinja-recognized chat format with tool support), the helper is a no-op and nothing changes. Assisted-by: Claude:opus-4-7 [Read] [Edit] [Bash] [Write] Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -12,6 +12,77 @@ import (
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
// emitJSONToolCallDeltas iterates the JSON tool-call objects produced by the
|
||||
// streaming tool-call detector and emits SSE chunks for the ones the caller
|
||||
// hasn't already emitted. It returns the new lastEmittedCount.
|
||||
//
|
||||
// Semantics:
|
||||
// - Skips entries before lastEmittedCount (already emitted).
|
||||
// - Emits one tool_call chunk per consecutive entry that has a usable
|
||||
// `name` string.
|
||||
// - Stops at the first entry without a name (typically the partial-JSON
|
||||
// tail or a healing-marker stub — see issue #9988) so the caller doesn't
|
||||
// advance past it. Bumping lastEmittedCount past an unparsed stub
|
||||
// permanently gates off content emission for the rest of the stream.
|
||||
// - When jsonResults is empty (the autoparser-working case, where the raw
|
||||
// text result is cleared and only ChatDeltas carry tool calls), this is
|
||||
// a no-op and lastEmittedCount is returned unchanged.
|
||||
//
|
||||
// The autoparser-correctly-classifying-tool-calls path is unaffected: it
|
||||
// delivers tool calls via TokenUsage.ChatDeltas, and the deferred
|
||||
// end-of-stream block (ToolCallsFromChatDeltas → buildDeferredToolCallChunks)
|
||||
// emits them; this helper sees an empty jsonResults and emits nothing.
|
||||
func emitJSONToolCallDeltas(
|
||||
jsonResults []map[string]any,
|
||||
lastEmittedCount int,
|
||||
id, model string,
|
||||
created int,
|
||||
responses chan<- schema.OpenAIResponse,
|
||||
) int {
|
||||
for i := lastEmittedCount; i < len(jsonResults); i++ {
|
||||
jsonObj := jsonResults[i]
|
||||
name, ok := jsonObj["name"].(string)
|
||||
if !ok || name == "" {
|
||||
break
|
||||
}
|
||||
args := "{}"
|
||||
if argsVal, ok := jsonObj["arguments"]; ok {
|
||||
if argsStr, ok := argsVal.(string); ok {
|
||||
args = argsStr
|
||||
} else {
|
||||
argsBytes, _ := json.Marshal(argsVal)
|
||||
args = string(argsBytes)
|
||||
}
|
||||
}
|
||||
responses <- schema.OpenAIResponse{
|
||||
ID: id,
|
||||
Created: created,
|
||||
Model: model,
|
||||
Choices: []schema.Choice{{
|
||||
Delta: &schema.Message{
|
||||
Role: "assistant",
|
||||
ToolCalls: []schema.ToolCall{
|
||||
{
|
||||
Index: i,
|
||||
ID: id,
|
||||
Type: "function",
|
||||
FunctionCall: schema.FunctionCall{
|
||||
Name: name,
|
||||
Arguments: args,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Index: 0,
|
||||
FinishReason: nil,
|
||||
}},
|
||||
Object: "chat.completion.chunk",
|
||||
}
|
||||
lastEmittedCount = i + 1
|
||||
}
|
||||
return lastEmittedCount
|
||||
}
|
||||
|
||||
// processStream is the streaming worker for chat completions with no
|
||||
// tool/function calling involved. It pushes SSE-shaped chunks onto
|
||||
// `responses` and returns the authoritative cumulative TokenUsage from
|
||||
@@ -263,60 +334,11 @@ func processStreamWithTools(
|
||||
} else {
|
||||
// Try JSON tool call parsing for streaming.
|
||||
// Only emit NEW tool calls (same guard as XML parser above).
|
||||
//
|
||||
// Issue #9988 defense: ParseJSONIterative may return stub objects
|
||||
// for partial input that has not yet committed a tool name (e.g.
|
||||
// `{"n` healed to `{"n":1}`). Treat any entry without a usable
|
||||
// `name` as "not yet a tool call" — break instead of continue, and
|
||||
// advance lastEmittedCount only past actually-emitted entries. The
|
||||
// previous version of this block set
|
||||
// `lastEmittedCount = len(jsonResults)` unconditionally, which
|
||||
// gated off ALL subsequent content emission as soon as one stub
|
||||
// landed in results (the qwen3 + streaming + tools "{\"" leak).
|
||||
jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true)
|
||||
if jsonErr == nil && len(jsonResults) > lastEmittedCount {
|
||||
for i := lastEmittedCount; i < len(jsonResults); i++ {
|
||||
jsonObj := jsonResults[i]
|
||||
name, ok := jsonObj["name"].(string)
|
||||
if !ok || name == "" {
|
||||
break
|
||||
}
|
||||
args := "{}"
|
||||
if argsVal, ok := jsonObj["arguments"]; ok {
|
||||
if argsStr, ok := argsVal.(string); ok {
|
||||
args = argsStr
|
||||
} else {
|
||||
argsBytes, _ := json.Marshal(argsVal)
|
||||
args = string(argsBytes)
|
||||
}
|
||||
}
|
||||
initialMessage := schema.OpenAIResponse{
|
||||
ID: id,
|
||||
Created: created,
|
||||
Model: req.Model,
|
||||
Choices: []schema.Choice{{
|
||||
Delta: &schema.Message{
|
||||
Role: "assistant",
|
||||
ToolCalls: []schema.ToolCall{
|
||||
{
|
||||
Index: i,
|
||||
ID: id,
|
||||
Type: "function",
|
||||
FunctionCall: schema.FunctionCall{
|
||||
Name: name,
|
||||
Arguments: args,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Index: 0,
|
||||
FinishReason: nil,
|
||||
}},
|
||||
Object: "chat.completion.chunk",
|
||||
}
|
||||
responses <- initialMessage
|
||||
lastEmittedCount = i + 1
|
||||
}
|
||||
if jsonErr == nil {
|
||||
lastEmittedCount = emitJSONToolCallDeltas(
|
||||
jsonResults, lastEmittedCount, id, req.Model, created, responses,
|
||||
)
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
197
core/http/endpoints/openai/chat_stream_workers_test.go
Normal file
197
core/http/endpoints/openai/chat_stream_workers_test.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// drainChannel reads everything currently buffered on a channel without
|
||||
// blocking on close. The helper test channels are sized for the assertions.
|
||||
func drainChannel(ch <-chan schema.OpenAIResponse) []schema.OpenAIResponse {
|
||||
var out []schema.OpenAIResponse
|
||||
for {
|
||||
select {
|
||||
case r, ok := <-ch:
|
||||
if !ok {
|
||||
return out
|
||||
}
|
||||
out = append(out, r)
|
||||
default:
|
||||
return out
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nameOf returns the name of the first tool call carried on the choice's
|
||||
// delta, or "" if none.
|
||||
func nameOf(r schema.OpenAIResponse) string {
|
||||
if len(r.Choices) == 0 || r.Choices[0].Delta == nil {
|
||||
return ""
|
||||
}
|
||||
if len(r.Choices[0].Delta.ToolCalls) == 0 {
|
||||
return ""
|
||||
}
|
||||
return r.Choices[0].Delta.ToolCalls[0].FunctionCall.Name
|
||||
}
|
||||
|
||||
var _ = Describe("emitJSONToolCallDeltas", func() {
|
||||
const (
|
||||
id = "test-stream"
|
||||
model = "test-model"
|
||||
created = 1700000000
|
||||
)
|
||||
|
||||
// The case that motivated this helper. With the previous version of
|
||||
// the streaming worker, ParseJSONIterative would hand back a stub
|
||||
// object like `{"4310046988783340008":1}` after the model had only
|
||||
// emitted `{`. The worker bumped lastEmittedCount unconditionally,
|
||||
// which permanently gated off content emission for the rest of the
|
||||
// stream (qwen3-4b with stream:true + tools dribbled only `{"` to
|
||||
// the client and then nothing). See issue #9988.
|
||||
Context("partial stub without a usable name", func() {
|
||||
It("does NOT bump lastEmittedCount and emits nothing", func() {
|
||||
responses := make(chan schema.OpenAIResponse, 4)
|
||||
// What ParseJSONIterative used to return for `{`:
|
||||
stubResults := []map[string]any{
|
||||
{"4310046988783340008": float64(1)},
|
||||
}
|
||||
|
||||
next := emitJSONToolCallDeltas(stubResults, 0, id, model, created, responses)
|
||||
|
||||
Expect(next).To(Equal(0),
|
||||
"lastEmittedCount must NOT advance past a stub without a name "+
|
||||
"— otherwise content emission gets permanently gated off")
|
||||
Expect(drainChannel(responses)).To(BeEmpty(),
|
||||
"no tool_call chunk should be emitted for a stub without a name")
|
||||
})
|
||||
})
|
||||
|
||||
// No-regression #1: the autoparser-correctly-working path. When the
|
||||
// C++ autoparser classifies tool calls itself, the raw text result is
|
||||
// cleared and ParseJSONIterative on it returns no results — this
|
||||
// helper must be a no-op so the deferred end-of-stream code can emit
|
||||
// the tool calls from TokenUsage.ChatDeltas.
|
||||
Context("empty jsonResults (autoparser-correctly-working path)", func() {
|
||||
It("is a no-op and leaves lastEmittedCount unchanged", func() {
|
||||
responses := make(chan schema.OpenAIResponse, 4)
|
||||
next := emitJSONToolCallDeltas(nil, 0, id, model, created, responses)
|
||||
Expect(next).To(Equal(0))
|
||||
Expect(drainChannel(responses)).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("leaves a non-zero lastEmittedCount unchanged when later called with the same length", func() {
|
||||
responses := make(chan schema.OpenAIResponse, 4)
|
||||
results := []map[string]any{
|
||||
{"name": "search", "arguments": map[string]any{"q": "hi"}},
|
||||
}
|
||||
// First call emits the one available tool call.
|
||||
next := emitJSONToolCallDeltas(results, 0, id, model, created, responses)
|
||||
Expect(next).To(Equal(1))
|
||||
Expect(drainChannel(responses)).To(HaveLen(1))
|
||||
|
||||
// Subsequent chunks haven't grown the slice — must be a no-op.
|
||||
next = emitJSONToolCallDeltas(results, next, id, model, created, responses)
|
||||
Expect(next).To(Equal(1))
|
||||
Expect(drainChannel(responses)).To(BeEmpty())
|
||||
})
|
||||
})
|
||||
|
||||
// No-regression #2: the normal completed-JSON path. When the model
|
||||
// emits a real, complete tool call as JSON in raw content (e.g. qwen3
|
||||
// without jinja but with tools), we should emit exactly one tool_call
|
||||
// SSE chunk on the first call and become a no-op on later calls.
|
||||
Context("single complete tool call", func() {
|
||||
It("emits one tool_call chunk and bumps lastEmittedCount to 1", func() {
|
||||
responses := make(chan schema.OpenAIResponse, 4)
|
||||
results := []map[string]any{
|
||||
{
|
||||
"name": "search",
|
||||
"arguments": map[string]any{
|
||||
"q": "hello",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
next := emitJSONToolCallDeltas(results, 0, id, model, created, responses)
|
||||
|
||||
Expect(next).To(Equal(1))
|
||||
out := drainChannel(responses)
|
||||
Expect(out).To(HaveLen(1))
|
||||
Expect(nameOf(out[0])).To(Equal("search"))
|
||||
Expect(out[0].Choices[0].Delta.ToolCalls[0].FunctionCall.Arguments).
|
||||
To(ContainSubstring(`"q":"hello"`))
|
||||
})
|
||||
|
||||
It("accepts arguments already serialized as a string", func() {
|
||||
responses := make(chan schema.OpenAIResponse, 4)
|
||||
results := []map[string]any{
|
||||
{
|
||||
"name": "search",
|
||||
"arguments": `{"q":"hello"}`,
|
||||
},
|
||||
}
|
||||
|
||||
emitJSONToolCallDeltas(results, 0, id, model, created, responses)
|
||||
|
||||
out := drainChannel(responses)
|
||||
Expect(out).To(HaveLen(1))
|
||||
Expect(out[0].Choices[0].Delta.ToolCalls[0].FunctionCall.Arguments).
|
||||
To(Equal(`{"q":"hello"}`))
|
||||
})
|
||||
})
|
||||
|
||||
// No-regression #3: multiple tool calls (parallel tool calling).
|
||||
// Both must be emitted, lastEmittedCount must end at 2.
|
||||
Context("multiple complete tool calls", func() {
|
||||
It("emits one chunk per tool call and bumps lastEmittedCount to len(results)", func() {
|
||||
responses := make(chan schema.OpenAIResponse, 8)
|
||||
results := []map[string]any{
|
||||
{"name": "search", "arguments": map[string]any{"q": "a"}},
|
||||
{"name": "browse", "arguments": map[string]any{"url": "b"}},
|
||||
}
|
||||
|
||||
next := emitJSONToolCallDeltas(results, 0, id, model, created, responses)
|
||||
|
||||
Expect(next).To(Equal(2))
|
||||
out := drainChannel(responses)
|
||||
Expect(out).To(HaveLen(2))
|
||||
Expect(nameOf(out[0])).To(Equal("search"))
|
||||
Expect(nameOf(out[1])).To(Equal("browse"))
|
||||
})
|
||||
})
|
||||
|
||||
// The streaming-tail case: incremental chunks. First parse returns
|
||||
// one complete tool call followed by a partial stub; later chunks
|
||||
// complete the second tool call. We must emit the first immediately
|
||||
// and the second on the later call — without ever bumping past the
|
||||
// stub mid-stream.
|
||||
Context("partial tail behind a real tool call", func() {
|
||||
It("emits the complete entry, stops at the stub, and resumes once the tail completes", func() {
|
||||
responses := make(chan schema.OpenAIResponse, 8)
|
||||
|
||||
// Chunk 1: one real call + a partial stub for the next.
|
||||
chunk1 := []map[string]any{
|
||||
{"name": "search", "arguments": map[string]any{"q": "a"}},
|
||||
{"4310046988783340008": float64(1)},
|
||||
}
|
||||
next := emitJSONToolCallDeltas(chunk1, 0, id, model, created, responses)
|
||||
Expect(next).To(Equal(1),
|
||||
"must NOT advance to 2 — the stub at index 1 has no usable name")
|
||||
out := drainChannel(responses)
|
||||
Expect(out).To(HaveLen(1))
|
||||
Expect(nameOf(out[0])).To(Equal("search"))
|
||||
|
||||
// Chunk 2: the stub completes into a real call.
|
||||
chunk2 := []map[string]any{
|
||||
{"name": "search", "arguments": map[string]any{"q": "a"}},
|
||||
{"name": "browse", "arguments": map[string]any{"url": "b"}},
|
||||
}
|
||||
next = emitJSONToolCallDeltas(chunk2, next, id, model, created, responses)
|
||||
Expect(next).To(Equal(2))
|
||||
out = drainChannel(responses)
|
||||
Expect(out).To(HaveLen(1))
|
||||
Expect(nameOf(out[0])).To(Equal("browse"))
|
||||
})
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user