From 076dcdbed876b68ec2e138434226e6dca2e6d543 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 5 Jun 2026 07:38:38 +0000 Subject: [PATCH] refactor(realtime): buffer whole message for TTS, drop sentence segmenter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review (richiejp): the sentence segmenter pipelined unary TTS by splitting on ASCII .!?/newline, which does nothing for languages without those boundaries (CJK/Thai) — there it already degraded to buffering the whole message anyway. Replace it with a uniform model: stream the LLM transcript live, buffer the full message, then synthesize it once. emitSpeech already streams the audio chunks when the backend implements TTSStream and falls back to a single unary delta otherwise, so this is real streaming TTS where supported and a clean whole-message synthesis elsewhere — no per-sentence emulation, no language assumptions. speechStreamer becomes transcriptStreamer (transcript deltas only); the whole-message synthesis moves into streamLLMResponse. Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/realtime_doubles_test.go | 28 +++++- .../endpoints/openai/realtime_segmenter.go | 61 ------------ .../openai/realtime_segmenter_test.go | 41 -------- core/http/endpoints/openai/realtime_stream.go | 95 +++++++------------ .../endpoints/openai/realtime_stream_test.go | 89 ++++++++--------- 5 files changed, 102 insertions(+), 212 deletions(-) delete mode 100644 core/http/endpoints/openai/realtime_segmenter.go delete mode 100644 core/http/endpoints/openai/realtime_segmenter_test.go diff --git a/core/http/endpoints/openai/realtime_doubles_test.go b/core/http/endpoints/openai/realtime_doubles_test.go index afb1f5e7a..accd6af51 100644 --- a/core/http/endpoints/openai/realtime_doubles_test.go +++ b/core/http/endpoints/openai/realtime_doubles_test.go @@ -74,6 +74,15 @@ type fakeModel struct { transcribeDeltas []string transcribeFinal *schema.TranscriptionResult + + // Predict streaming: predictTokens are replayed through the token callback + // (simulating streamed LLM output); predictResp/predictErr are returned by + // the deferred predict function. predictChunkDeltas, when set, are delivered + // per-token via TokenUsage.ChatDeltas to exercise the autoparser path. + predictTokens []string + predictChunkDeltas [][]*proto.ChatDelta + predictResp backend.LLMResponse + predictErr error } func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) { @@ -84,8 +93,23 @@ func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, stri return m.transcribeFinal, nil } -func (m *fakeModel) Predict(context.Context, schema.Messages, []string, []string, []string, func(string, backend.TokenUsage) bool, []types.ToolUnion, *types.ToolChoiceUnion, *int, *int, map[string]float64) (func() (backend.LLMResponse, error), error) { - return nil, nil +func (m *fakeModel) Predict(_ context.Context, _ schema.Messages, _, _, _ []string, cb func(string, backend.TokenUsage) bool, _ []types.ToolUnion, _ *types.ToolChoiceUnion, _, _ *int, _ map[string]float64) (func() (backend.LLMResponse, error), error) { + if m.predictErr != nil { + return nil, m.predictErr + } + return func() (backend.LLMResponse, error) { + for i, tok := range m.predictTokens { + if cb == nil { + continue + } + usage := backend.TokenUsage{} + if i < len(m.predictChunkDeltas) { + usage.ChatDeltas = m.predictChunkDeltas[i] + } + cb(tok, usage) + } + return m.predictResp, nil + }, nil } func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) { diff --git a/core/http/endpoints/openai/realtime_segmenter.go b/core/http/endpoints/openai/realtime_segmenter.go deleted file mode 100644 index 77116229f..000000000 --- a/core/http/endpoints/openai/realtime_segmenter.go +++ /dev/null @@ -1,61 +0,0 @@ -package openai - -import "strings" - -// streamSegmenter accumulates streamed LLM text and emits complete utterance -// segments (sentence/clause boundaries) so the realtime pipeline can hand each -// segment to TTS as soon as it's complete, overlapping generation, synthesis -// and playback instead of waiting for the whole reply. -// -// A segment is committed when a sentence terminator (. ! ?) is followed by -// whitespace, or at a newline. Terminators not followed by whitespace (e.g. -// decimals like "3.14" mid-stream) stay buffered until more text arrives or the -// stream is flushed. -type streamSegmenter struct { - buf strings.Builder -} - -func isSentenceTerminator(b byte) bool { - return b == '.' || b == '!' || b == '?' -} - -func isSpace(b byte) bool { - return b == ' ' || b == '\t' || b == '\n' || b == '\r' -} - -// Push appends text to the buffer and returns any newly-completed segments, -// trimmed of surrounding whitespace. Incomplete trailing text stays buffered. -func (s *streamSegmenter) Push(text string) []string { - s.buf.WriteString(text) - cur := s.buf.String() - - var segments []string - start := 0 - for i := 0; i < len(cur); i++ { - cut := -1 - switch { - case cur[i] == '\n': - cut = i // segment excludes the newline - case isSentenceTerminator(cur[i]) && i+1 < len(cur) && isSpace(cur[i+1]): - cut = i + 1 // segment includes the terminator - } - if cut >= 0 { - if seg := strings.TrimSpace(cur[start:cut]); seg != "" { - segments = append(segments, seg) - } - start = cut - } - } - - rem := cur[start:] - s.buf.Reset() - s.buf.WriteString(rem) - return segments -} - -// Flush returns the remaining buffered text (trimmed) and clears the buffer. -func (s *streamSegmenter) Flush() string { - seg := strings.TrimSpace(s.buf.String()) - s.buf.Reset() - return seg -} diff --git a/core/http/endpoints/openai/realtime_segmenter_test.go b/core/http/endpoints/openai/realtime_segmenter_test.go deleted file mode 100644 index 13d9a8c78..000000000 --- a/core/http/endpoints/openai/realtime_segmenter_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package openai - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -// streamSegmenter turns a stream of LLM token text into complete sentence/clause -// segments so TTS can start synthesizing before the full reply is generated. -var _ = Describe("streamSegmenter", func() { - It("buffers partial text until a sentence terminator followed by space", func() { - var s streamSegmenter - Expect(s.Push("Hello")).To(BeEmpty()) - Expect(s.Push(" world")).To(BeEmpty()) - Expect(s.Push(". ")).To(Equal([]string{"Hello world."})) - }) - - It("emits each complete sentence and keeps the trailing partial buffered", func() { - var s streamSegmenter - Expect(s.Push("One. Two! Three")).To(Equal([]string{"One.", "Two!"})) - Expect(s.Flush()).To(Equal("Three")) - }) - - It("splits on newlines", func() { - var s streamSegmenter - Expect(s.Push("Line one\nLine two")).To(Equal([]string{"Line one"})) - Expect(s.Flush()).To(Equal("Line two")) - }) - - It("does not split decimals or mid-token punctuation", func() { - var s streamSegmenter - Expect(s.Push("Pi is 3.14 today")).To(BeEmpty()) - Expect(s.Flush()).To(Equal("Pi is 3.14 today")) - }) - - It("flushes to empty when the buffer holds only consumed text", func() { - var s streamSegmenter - s.Push("Done. ") - Expect(s.Flush()).To(Equal("")) - }) -}) diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index 09526c561..9eca643d3 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -12,46 +12,36 @@ import ( "github.com/mudler/LocalAI/pkg/reasoning" ) -// speechStreamer consumes streamed LLM tokens and drives the realtime output: -// it strips reasoning incrementally, emits a transcript text delta for each -// content fragment, and — when the pipeline streams TTS — sentence-pipes the -// content so each completed sentence is synthesized as soon as it's ready, -// overlapping generation, synthesis and playback. -// -// It is used only for plain-content turns (no tools): tool-call output can't be -// safely spoken mid-stream, so those turns keep the buffered path. -type speechStreamer struct { +// transcriptStreamer turns streamed LLM tokens into the assistant's spoken +// transcript: it strips reasoning incrementally and sends one +// response.output_audio_transcript.delta per content fragment. It does NOT +// synthesize audio — the caller buffers the full message and synthesizes it +// once (streaming the audio chunks when the TTS backend supports TTSStream), +// which works uniformly for streaming and non-streaming TTS and for languages +// without sentence or word boundaries. +type transcriptStreamer struct { ctx context.Context t Transport - session *Session responseID string itemID string - - extractor *reasoning.ReasoningExtractor - seg streamSegmenter - audio []byte - streamTTS bool - err error + extractor *reasoning.ReasoningExtractor } -func newSpeechStreamer(ctx context.Context, t Transport, session *Session, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *speechStreamer { - // Spoken output must never contain reasoning, even when disable_thinking set - // DisableReasoning (which would otherwise turn the extractor's stripping off). - reasoningCfg = spokenReasoningConfig(reasoningCfg) - return &speechStreamer{ +func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer { + return &transcriptStreamer{ ctx: ctx, t: t, - session: session, responseID: responseID, itemID: itemID, - extractor: reasoning.NewReasoningExtractor(thinkingStartToken, reasoningCfg), - streamTTS: session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS(), + extractor: reasoning.NewReasoningExtractor(thinkingStartToken, spokenReasoningConfig(reasoningCfg)), } } -// onToken handles one streamed LLM token. It is shaped to be used directly as -// the backend token callback's text sink. -func (s *speechStreamer) onToken(token string) { +// onToken handles one streamed unit of model output, sending a transcript delta +// for the new content (reasoning stripped). For plain-content models the unit is +// the raw text token; for autoparser tool turns the backend clears the text and +// delivers content via ChatDeltas, so the caller passes that content here. +func (s *transcriptStreamer) onToken(token string) { _, content := s.extractor.ProcessToken(token) if content == "" { return @@ -64,41 +54,20 @@ func (s *speechStreamer) onToken(token string) { ContentIndex: 0, Delta: content, }) - if s.streamTTS { - for _, segment := range s.seg.Push(content) { - s.speak(segment) - } - } } -func (s *speechStreamer) speak(text string) { - pcm, err := emitSpeech(s.ctx, s.t, s.session, s.responseID, s.itemID, text) - if err != nil { - if s.err == nil { - s.err = err - } - return - } - s.audio = append(s.audio, pcm...) -} - -// finish flushes any buffered sentence to TTS and returns the full cleaned -// content, the accumulated PCM audio, and the first error encountered (if any). -func (s *speechStreamer) finish() (content string, audio []byte, err error) { - if s.streamTTS { - if rem := s.seg.Flush(); rem != "" { - s.speak(rem) - } - } - return s.extractor.CleanedContent(), s.audio, s.err +// content returns the full transcript so far with reasoning stripped. +func (s *transcriptStreamer) content() string { + return s.extractor.CleanedContent() } // streamLLMResponse drives a streamed, plain-content (no tools) realtime reply. -// It announces the assistant item before tokens arrive, feeds the LLM token -// callback through a speechStreamer (transcript deltas + sentence-piped TTS), -// then emits the terminal events. It returns true when it has fully handled the -// response so the caller can return; callers must only invoke it for turns with -// no tools and an audio modality (see triggerResponseAtTurn). +// It announces the assistant item before tokens arrive, streams transcript +// deltas as the LLM generates, then synthesizes the whole buffered message once +// (streaming the audio chunks when the TTS backend supports it, otherwise a +// single unary delta) and emits the terminal events. It returns true when it has +// fully handled the response so the caller can return; callers must only invoke +// it for turns with no tools and an audio modality (see triggerResponseAtTurn). func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool { // Announce the assistant item up front so streamed deltas target a known item. item := types.MessageItemUnion{ @@ -150,7 +119,7 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation } thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) - streamer := newSpeechStreamer(ctx, t, session, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) + streamer := newTranscriptStreamer(ctx, t, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) cb := func(token string, _ backend.TokenUsage) bool { if ctx.Err() != nil { return false @@ -177,8 +146,16 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation return true } - content, audio, err := streamer.finish() + // Buffer the whole message, then synthesize it once. emitSpeech streams the + // audio chunks when the TTS backend supports TTSStream, otherwise it sends a + // single unary delta — no per-sentence segmentation either way. + content := streamer.content() + audio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, content) if err != nil { + if ctx.Err() != nil { + cancel() + return true + } sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) return true } diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go index d8697c331..ccdd31cca 100644 --- a/core/http/endpoints/openai/realtime_stream_test.go +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -6,86 +6,77 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/pkg/reasoning" ) -// speechStreamer consumes streamed LLM tokens: it strips reasoning, emits a -// transcript delta per content fragment, and sentence-pipes content into TTS so -// audio starts before the full reply is generated. -var _ = Describe("speechStreamer", func() { - It("emits a transcript delta per token and speaks each completed sentence", func() { - on := true - m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000} - session := &Session{ - OutputSampleRate: 24000, - ModelInterface: m, - ModelConfig: &config.ModelConfig{ - Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &on}}, - }, - } +// transcriptStreamer turns streamed LLM tokens into incremental transcript +// deltas, stripping reasoning. Audio is synthesized once from the full message +// by the caller, so there is no per-sentence segmentation. +var _ = Describe("transcriptStreamer", func() { + It("emits one transcript delta per content token", func() { t := &fakeTransport{} - s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{}) + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{}) for _, tok := range []string{"Hello", " world.", " Bye"} { s.onToken(tok) } - content, audio, err := s.finish() - Expect(err).ToNot(HaveOccurred()) - Expect(content).To(Equal("Hello world. Bye")) - // One transcript delta per (non-empty) token. + Expect(s.content()).To(Equal("Hello world. Bye")) Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3)) - // Two sentences spoken: "Hello world." mid-stream + "Bye" on flush; one - // chunk each. - Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2)) - Expect(audio).To(Equal([]byte{7, 7})) + Expect(t.transcriptDeltaText()).To(Equal("Hello world. Bye")) }) It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() { - // disable_thinking maps to ReasoningConfig.DisableReasoning=true (it tells - // the backend enable_thinking=false). When the model ignores that and emits - // thinking anyway, the spoken stream must still not leak it: the streamer is - // the last line of defence and always strips reasoning from spoken content. + // disable_thinking maps to DisableReasoning=true (enable_thinking=false to + // the backend). If the model emits thinking anyway, the transcript must + // still not leak it: stripping always runs for spoken output. disable := true - session := &Session{ - OutputSampleRate: 24000, - ModelInterface: &fakeModel{}, - ModelConfig: &config.ModelConfig{}, // streaming.tts off - } t := &fakeTransport{} - s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{DisableReasoning: &disable}) s.onToken("secret plan") s.onToken("The answer is 42.") - content, _, err := s.finish() - Expect(err).ToNot(HaveOccurred()) - Expect(content).To(Equal("The answer is 42.")) - Expect(content).ToNot(ContainSubstring("secret plan")) - // The text streamed to the client must not carry the reasoning either. + Expect(s.content()).To(Equal("The answer is 42.")) + Expect(s.content()).ToNot(ContainSubstring("secret plan")) Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan")) }) +}) - It("does not synthesize audio when TTS streaming is disabled", func() { - m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000} +// streamLLMResponse drives a full streamed realtime turn: live transcript +// deltas while the LLM generates, then the whole message is synthesized once. +var _ = Describe("streamLLMResponse", func() { + It("streams transcript deltas then synthesizes the whole message once", func() { + on := true + m := &fakeModel{ + predictTokens: []string{"Hello", " world.", " How are you?"}, + predictResp: backend.LLMResponse{Response: "Hello world. How are you?"}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } session := &Session{ OutputSampleRate: 24000, ModelInterface: m, - ModelConfig: &config.ModelConfig{}, // streaming.tts off + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, } + conv := &Conversation{} t := &fakeTransport{} - s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{}) + llmCfg := &config.ModelConfig{} - s.onToken("Hello world.") - content, audio, err := s.finish() + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg) - Expect(err).ToNot(HaveOccurred()) - Expect(content).To(Equal("Hello world.")) - Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(1)) - Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(0)) - Expect(audio).To(BeEmpty()) + Expect(handled).To(BeTrue()) + // One live transcript delta per streamed token. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3)) + // The whole message is synthesized ONCE (not per sentence): a single + // emitSpeech replays the one TTS stream chunk. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) + Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) }) })