From d05d83ff367bcd83ea11243145ea250f5273596a Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 5 Jun 2026 07:48:00 +0000 Subject: [PATCH] feat(realtime): stream tool-call turns via tokenizer-template autoparser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review (richiejp): tool-call deltas exist, so streaming should work with tools too. It does — for models that use their tokenizer template. The C++ autoparser then clears reply.Message and delivers content + tool calls via ChatDeltas, so the streamed transcript carries only spoken content (no tool-call JSON leak) and the tool calls are parsed from the final response. - Drop the len(tools)==0 gate; stream when no tools OR use_tokenizer_template (grammar-based function calling still buffers, since its call is emitted as JSON in the token stream and would leak into the transcript). - streamLLMResponse takes tools/toolChoice/toolTurn, reads ChatDelta content in the token callback, parses tool calls from the final ChatDeltas, and creates the assistant content item lazily so a content-less tool turn emits only the tool calls. - Extract emitToolCallItems from the buffered path so both paths finalize tool calls, response.done, and server-side assistant-tool follow-ups identically. Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 51 ++-- core/http/endpoints/openai/realtime_stream.go | 219 +++++++++++------- .../endpoints/openai/realtime_stream_test.go | 70 +++++- docs/content/features/openai-realtime.md | 4 +- 4 files changed, 234 insertions(+), 110 deletions(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index bc2a80785..7d76444aa 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1496,18 +1496,21 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }, }) - // Streamed LLM path: when the pipeline opts into LLM streaming and the turn - // cannot produce a tool call (no tools), stream tokens straight to the client - // as transcript deltas and sentence-pipe them into TTS. Tool turns fall - // through to the buffered path below, since partial tool-call output can't be - // safely spoken mid-stream. - if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() && len(tools) == 0 { + // Streamed LLM path: when the pipeline opts into LLM streaming, stream the + // transcript to the client as it is generated and synthesize the buffered + // message once. Tool turns are supported only when the model uses its + // tokenizer template: the C++ autoparser then delivers content and tool + // calls via ChatDeltas (clearing the text stream), so the spoken transcript + // never leaks tool-call tokens. Grammar-based function calling emits the + // call as JSON in the token stream, so those turns keep the buffered path. + if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() { + canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate var respMods []types.Modality if overrides != nil { respMods = overrides.OutputModalities } - if modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) { - if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config) { + if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) { + if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) { return } } @@ -1814,17 +1817,27 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }) } - // Handle Tool Calls. Two paths: - // - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run - // server-side; we append both the call and its output to conv.Items - // and re-trigger a follow-up response so the model can speak the - // result. The client only sees observability events. - // - All other tools follow the standard OpenAI flow: emit - // function_call_arguments.done and wait for the client to send - // conversation.item.create back. - xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls)) + // Emit the parsed tool calls, the terminal response.done, and (for + // server-side assistant tools) the follow-up response. Shared with the + // streamed path so both finalize tool calls identically. + emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn) +} + +// emitToolCallItems emits the realtime function_call items for the parsed tool +// calls, the terminal response.done, and — for server-side LocalAI Assistant +// tools — re-triggers a follow-up response so the model can speak the result. +// hasContent shifts the tool-call output index past the assistant content item +// when the same turn also produced spoken/text content. Two tool paths: +// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side; +// we append both the call and its output to conv.Items and re-trigger. The +// client only sees observability events. +// - All other tools follow the standard OpenAI flow: emit +// function_call_arguments.done and wait for the client to send +// conversation.item.create back. +func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) { + xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls)) executedAssistantTool := false - for i, tc := range finalToolCalls { + for i, tc := range toolCalls { toolCallID := generateItemID() callID := "call_" + generateUniqueID() // OpenAI uses call_xyz @@ -1844,7 +1857,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa conv.Lock.Unlock() outputIndex := i - if finalSpeech != "" { + if hasContent { outputIndex++ } diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index 9eca643d3..f6c70e82d 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -9,6 +9,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/functions" "github.com/mudler/LocalAI/pkg/reasoning" ) @@ -25,6 +26,12 @@ type transcriptStreamer struct { responseID string itemID string extractor *reasoning.ReasoningExtractor + + // announce, if set, is invoked once just before the first transcript delta. + // It lets the caller create the assistant item lazily, so a content-less + // tool-call turn never emits a spurious empty assistant item. + announce func() + announced bool } func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer { @@ -46,6 +53,12 @@ func (s *transcriptStreamer) onToken(token string) { if content == "" { return } + if !s.announced { + s.announced = true + if s.announce != nil { + s.announce() + } + } _ = s.t.SendEvent(types.ResponseOutputAudioTranscriptDeltaEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: s.responseID, @@ -61,50 +74,61 @@ func (s *transcriptStreamer) content() string { return s.extractor.CleanedContent() } -// streamLLMResponse drives a streamed, plain-content (no tools) realtime reply. -// It announces the assistant item before tokens arrive, streams transcript -// deltas as the LLM generates, then synthesizes the whole buffered message once -// (streaming the audio chunks when the TTS backend supports it, otherwise a -// single unary delta) and emits the terminal events. It returns true when it has -// fully handled the response so the caller can return; callers must only invoke -// it for turns with no tools and an audio modality (see triggerResponseAtTurn). -func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool { - // Announce the assistant item up front so streamed deltas target a known item. +// streamLLMResponse drives a streamed realtime reply. It streams the assistant +// transcript as the LLM generates, then synthesizes the whole buffered message +// once (streaming the audio chunks when the TTS backend supports it, otherwise a +// single unary delta). Tool calls parsed from the autoparser ChatDeltas are +// emitted after the spoken content. The assistant content item is created lazily +// on the first content delta, so a content-less tool-call turn emits only the +// tool calls. It returns true when it has fully handled the response so the +// caller can return; callers must only invoke it for an audio modality, and with +// tools only when the model uses its tokenizer template (see triggerResponseAtTurn). +func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, toolTurn int) bool { + itemID := generateItemID() item := types.MessageItemUnion{ Assistant: &types.MessageItemAssistant{ - ID: generateItemID(), + ID: itemID, Status: types.ItemStatusInProgress, Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}}, }, } - conv.Lock.Lock() - conv.Items = append(conv.Items, &item) - conv.Lock.Unlock() - sendEvent(t, types.ResponseOutputItemAddedEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - OutputIndex: 0, - Item: item, - }) - sendEvent(t, types.ResponseContentPartAddedEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Part: item.Assistant.Content[0], - }) + // announce creates the assistant content item lazily, just before the first + // transcript delta — a tool-only turn never produces content, so it stays out + // of the conversation and the client sees only the tool calls. + announced := false + announce := func() { + announced = true + conv.Lock.Lock() + conv.Items = append(conv.Items, &item) + conv.Lock.Unlock() + sendEvent(t, types.ResponseOutputItemAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + sendEvent(t, types.ResponseContentPartAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + } cancel := func() { - conv.Lock.Lock() - for i := len(conv.Items) - 1; i >= 0; i-- { - if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == item.Assistant.ID { - conv.Items = append(conv.Items[:i], conv.Items[i+1:]...) - break + if announced { + conv.Lock.Lock() + for i := len(conv.Items) - 1; i >= 0; i-- { + if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == itemID { + conv.Items = append(conv.Items[:i], conv.Items[i+1:]...) + break + } } + conv.Lock.Unlock() } - conv.Lock.Unlock() sendEvent(t, types.ResponseDoneEvent{ ServerEventBase: types.ServerEventBase{}, Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled}, @@ -119,26 +143,36 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation } thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) - streamer := newTranscriptStreamer(ctx, t, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) - cb := func(token string, _ backend.TokenUsage) bool { + streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, llmCfg.ReasoningConfig) + streamer.announce = announce + cb := func(token string, usage backend.TokenUsage) bool { if ctx.Err() != nil { return false } - streamer.onToken(token) + // Plain-content models stream text via the token; autoparser tool turns + // clear the text and deliver content via ChatDeltas, so prefer the latter + // when present. Either way only content reaches the transcript — tool-call + // deltas are parsed from the final response below. + text := token + if len(usage.ChatDeltas) > 0 { + text = functions.ContentFromChatDeltas(usage.ChatDeltas) + } + streamer.onToken(text) return true } - predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, nil, nil, nil, nil, nil) + predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, tools, toolChoice, nil, nil, nil) if err != nil { - sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", itemID) return true } - if _, err := predFunc(); err != nil { + pred, err := predFunc() + if err != nil { if ctx.Err() != nil { cancel() return true } - sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", itemID) return true } if ctx.Err() != nil { @@ -146,65 +180,74 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation return true } - // Buffer the whole message, then synthesize it once. emitSpeech streams the - // audio chunks when the TTS backend supports TTSStream, otherwise it sends a - // single unary delta — no per-sentence segmentation either way. content := streamer.content() - audio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, content) - if err != nil { - if ctx.Err() != nil { - cancel() + toolCalls := functions.ToolCallsFromChatDeltas(pred.ChatDeltas) + + // Finalize the spoken content item only when the turn produced content. A + // tool-only turn skips this entirely (no empty assistant item). + if content != "" { + if !announced { + announce() + } + // Buffer the whole message, then synthesize it once. emitSpeech streams + // the audio chunks when the TTS backend supports TTSStream, otherwise it + // sends a single unary delta — no per-sentence segmentation either way. + audio, err := emitSpeech(ctx, t, session, responseID, itemID, content) + if err != nil { + if ctx.Err() != nil { + cancel() + return true + } + sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", itemID) return true } - sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) - return true - } - _, isWebRTC := t.(*WebRTCTransport) + _, isWebRTC := t.(*WebRTCTransport) - sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Transcript: content, - }) - if !isWebRTC { - sendEvent(t, types.ResponseOutputAudioDoneEvent{ + sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, - ItemID: item.Assistant.ID, + ItemID: itemID, OutputIndex: 0, ContentIndex: 0, + Transcript: content, + }) + if !isWebRTC { + sendEvent(t, types.ResponseOutputAudioDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + }) + } + + conv.Lock.Lock() + item.Assistant.Status = types.ItemStatusCompleted + item.Assistant.Content[0].Transcript = content + if !isWebRTC { + item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio) + } + conv.Lock.Unlock() + + sendEvent(t, types.ResponseContentPartDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + sendEvent(t, types.ResponseOutputItemDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, }) } - conv.Lock.Lock() - item.Assistant.Status = types.ItemStatusCompleted - item.Assistant.Content[0].Transcript = content - if !isWebRTC { - item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio) - } - conv.Lock.Unlock() - - sendEvent(t, types.ResponseContentPartDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Part: item.Assistant.Content[0], - }) - sendEvent(t, types.ResponseOutputItemDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - OutputIndex: 0, - Item: item, - }) - sendEvent(t, types.ResponseDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCompleted}, - }) + // Emit any tool calls, the terminal response.done, and (for server-side + // assistant tools) the follow-up turn — shared with the buffered path. + emitToolCallItems(ctx, session, conv, t, responseID, toolCalls, content != "", toolTurn) return true } diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go index ccdd31cca..f7042f772 100644 --- a/core/http/endpoints/openai/realtime_stream_test.go +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -9,6 +9,7 @@ import ( "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/pkg/grpc/proto" "github.com/mudler/LocalAI/pkg/reasoning" ) @@ -69,7 +70,7 @@ var _ = Describe("streamLLMResponse", func() { t := &fakeTransport{} llmCfg := &config.ModelConfig{} - handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg) + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) Expect(handled).To(BeTrue()) // One live transcript delta per streamed token. @@ -79,4 +80,71 @@ var _ = Describe("streamLLMResponse", func() { Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) }) + + It("streams content deltas and emits tool-call items (autoparser tool turn)", func() { + on := true + // Autoparser path: reply.Message is empty; content + tool calls arrive via + // ChatDeltas. Chunk 1 carries content, chunk 2 carries the tool call. + contentDelta := []*proto.ChatDelta{{Content: "Let me check."}} + toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Paris"}`}}}} + m := &fakeModel{ + predictTokens: []string{"", ""}, + predictChunkDeltas: [][]*proto.ChatDelta{contentDelta, toolDelta}, + predictResp: backend.LLMResponse{ChatDeltas: append(append([]*proto.ChatDelta{}, contentDelta...), toolDelta...)}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + llmCfg.TemplateConfig.UseTokenizerTemplate = true + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // The spoken content was streamed live. + Expect(t.transcriptDeltaText()).To(Equal("Let me check.")) + // The tool call is emitted as a function_call item. + Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1)) + // Exactly one terminal response.done. + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) + + It("emits only tool-call items for a content-less tool turn (no empty assistant item)", func() { + on := true + toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Rome"}`}}}} + m := &fakeModel{ + predictTokens: []string{""}, + predictChunkDeltas: [][]*proto.ChatDelta{toolDelta}, + predictResp: backend.LLMResponse{ChatDeltas: toolDelta}, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + llmCfg.TemplateConfig.UseTokenizerTemplate = true + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // No content → no transcript deltas and no spurious assistant content item. + Expect(t.transcriptDeltaText()).To(Equal("")) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(0)) + // The tool call is still emitted. + Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1)) + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) }) diff --git a/docs/content/features/openai-realtime.md b/docs/content/features/openai-realtime.md index b9af56ca2..dd152f19c 100644 --- a/docs/content/features/openai-realtime.md +++ b/docs/content/features/openai-realtime.md @@ -48,9 +48,9 @@ pipeline: transcription: true # stream transcript text deltas of the user's speech ``` -- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces, instead of one delta for the whole utterance. +- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces (requires a backend that supports streaming synthesis), instead of one delta for the whole utterance. Falls back to a single unary delta otherwise. - **streaming.transcription**: stream `conversation.item.input_audio_transcription.delta` events as the transcript is produced (requires a transcription backend that supports streaming). -- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events and, when `streaming.tts` is also enabled, synthesize each completed sentence as soon as it is ready — overlapping generation, synthesis and playback. Streaming is used only for turns that cannot produce a tool call; turns with tools fall back to the buffered path so partial tool-call output is never spoken. +- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events. The full reply is buffered and synthesized once it is complete — streamed as audio chunks when `streaming.tts` is enabled (and the TTS backend supports it), otherwise as a single unary delta. Reasoning/thinking is always stripped from the spoken transcript. Tool calls are supported while streaming when the LLM uses its tokenizer template (`use_tokenizer_template: true`): the backend's autoparser then delivers content and tool calls separately, so the spoken transcript never leaks tool-call tokens. Grammar-based function calling keeps the buffered path. All streaming flags are off by default, so existing pipelines are unaffected.