refactor(realtime): buffer whole message for TTS, drop sentence segmenter

Per review (richiejp): the sentence segmenter pipelined unary TTS by
splitting on ASCII .!?/newline, which does nothing for languages without
those boundaries (CJK/Thai) — there it already degraded to buffering the
whole message anyway.

Replace it with a uniform model: stream the LLM transcript live, buffer the
full message, then synthesize it once. emitSpeech already streams the audio
chunks when the backend implements TTSStream and falls back to a single
unary delta otherwise, so this is real streaming TTS where supported and a
clean whole-message synthesis elsewhere — no per-sentence emulation, no
language assumptions. speechStreamer becomes transcriptStreamer (transcript
deltas only); the whole-message synthesis moves into streamLLMResponse.

Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-05 07:38:38 +00:00
parent 9ec1456ec6
commit 076dcdbed8
5 changed files with 102 additions and 212 deletions

View File

@@ -74,6 +74,15 @@ type fakeModel struct {
transcribeDeltas []string
transcribeFinal *schema.TranscriptionResult
// Predict streaming: predictTokens are replayed through the token callback
// (simulating streamed LLM output); predictResp/predictErr are returned by
// the deferred predict function. predictChunkDeltas, when set, are delivered
// per-token via TokenUsage.ChatDeltas to exercise the autoparser path.
predictTokens []string
predictChunkDeltas [][]*proto.ChatDelta
predictResp backend.LLMResponse
predictErr error
}
func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) {
@@ -84,8 +93,23 @@ func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, stri
return m.transcribeFinal, nil
}
func (m *fakeModel) Predict(context.Context, schema.Messages, []string, []string, []string, func(string, backend.TokenUsage) bool, []types.ToolUnion, *types.ToolChoiceUnion, *int, *int, map[string]float64) (func() (backend.LLMResponse, error), error) {
return nil, nil
func (m *fakeModel) Predict(_ context.Context, _ schema.Messages, _, _, _ []string, cb func(string, backend.TokenUsage) bool, _ []types.ToolUnion, _ *types.ToolChoiceUnion, _, _ *int, _ map[string]float64) (func() (backend.LLMResponse, error), error) {
if m.predictErr != nil {
return nil, m.predictErr
}
return func() (backend.LLMResponse, error) {
for i, tok := range m.predictTokens {
if cb == nil {
continue
}
usage := backend.TokenUsage{}
if i < len(m.predictChunkDeltas) {
usage.ChatDeltas = m.predictChunkDeltas[i]
}
cb(tok, usage)
}
return m.predictResp, nil
}, nil
}
func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) {

View File

@@ -1,61 +0,0 @@
package openai
import "strings"
// streamSegmenter accumulates streamed LLM text and emits complete utterance
// segments (sentence/clause boundaries) so the realtime pipeline can hand each
// segment to TTS as soon as it's complete, overlapping generation, synthesis
// and playback instead of waiting for the whole reply.
//
// A segment is committed when a sentence terminator (. ! ?) is followed by
// whitespace, or at a newline. Terminators not followed by whitespace (e.g.
// decimals like "3.14" mid-stream) stay buffered until more text arrives or the
// stream is flushed.
type streamSegmenter struct {
buf strings.Builder
}
func isSentenceTerminator(b byte) bool {
return b == '.' || b == '!' || b == '?'
}
func isSpace(b byte) bool {
return b == ' ' || b == '\t' || b == '\n' || b == '\r'
}
// Push appends text to the buffer and returns any newly-completed segments,
// trimmed of surrounding whitespace. Incomplete trailing text stays buffered.
func (s *streamSegmenter) Push(text string) []string {
s.buf.WriteString(text)
cur := s.buf.String()
var segments []string
start := 0
for i := 0; i < len(cur); i++ {
cut := -1
switch {
case cur[i] == '\n':
cut = i // segment excludes the newline
case isSentenceTerminator(cur[i]) && i+1 < len(cur) && isSpace(cur[i+1]):
cut = i + 1 // segment includes the terminator
}
if cut >= 0 {
if seg := strings.TrimSpace(cur[start:cut]); seg != "" {
segments = append(segments, seg)
}
start = cut
}
}
rem := cur[start:]
s.buf.Reset()
s.buf.WriteString(rem)
return segments
}
// Flush returns the remaining buffered text (trimmed) and clears the buffer.
func (s *streamSegmenter) Flush() string {
seg := strings.TrimSpace(s.buf.String())
s.buf.Reset()
return seg
}

View File

@@ -1,41 +0,0 @@
package openai
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// streamSegmenter turns a stream of LLM token text into complete sentence/clause
// segments so TTS can start synthesizing before the full reply is generated.
var _ = Describe("streamSegmenter", func() {
It("buffers partial text until a sentence terminator followed by space", func() {
var s streamSegmenter
Expect(s.Push("Hello")).To(BeEmpty())
Expect(s.Push(" world")).To(BeEmpty())
Expect(s.Push(". ")).To(Equal([]string{"Hello world."}))
})
It("emits each complete sentence and keeps the trailing partial buffered", func() {
var s streamSegmenter
Expect(s.Push("One. Two! Three")).To(Equal([]string{"One.", "Two!"}))
Expect(s.Flush()).To(Equal("Three"))
})
It("splits on newlines", func() {
var s streamSegmenter
Expect(s.Push("Line one\nLine two")).To(Equal([]string{"Line one"}))
Expect(s.Flush()).To(Equal("Line two"))
})
It("does not split decimals or mid-token punctuation", func() {
var s streamSegmenter
Expect(s.Push("Pi is 3.14 today")).To(BeEmpty())
Expect(s.Flush()).To(Equal("Pi is 3.14 today"))
})
It("flushes to empty when the buffer holds only consumed text", func() {
var s streamSegmenter
s.Push("Done. ")
Expect(s.Flush()).To(Equal(""))
})
})

View File

@@ -12,46 +12,36 @@ import (
"github.com/mudler/LocalAI/pkg/reasoning"
)
// speechStreamer consumes streamed LLM tokens and drives the realtime output:
// it strips reasoning incrementally, emits a transcript text delta for each
// content fragment, and — when the pipeline streams TTS — sentence-pipes the
// content so each completed sentence is synthesized as soon as it's ready,
// overlapping generation, synthesis and playback.
//
// It is used only for plain-content turns (no tools): tool-call output can't be
// safely spoken mid-stream, so those turns keep the buffered path.
type speechStreamer struct {
// transcriptStreamer turns streamed LLM tokens into the assistant's spoken
// transcript: it strips reasoning incrementally and sends one
// response.output_audio_transcript.delta per content fragment. It does NOT
// synthesize audio — the caller buffers the full message and synthesizes it
// once (streaming the audio chunks when the TTS backend supports TTSStream),
// which works uniformly for streaming and non-streaming TTS and for languages
// without sentence or word boundaries.
type transcriptStreamer struct {
ctx context.Context
t Transport
session *Session
responseID string
itemID string
extractor *reasoning.ReasoningExtractor
seg streamSegmenter
audio []byte
streamTTS bool
err error
extractor *reasoning.ReasoningExtractor
}
func newSpeechStreamer(ctx context.Context, t Transport, session *Session, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *speechStreamer {
// Spoken output must never contain reasoning, even when disable_thinking set
// DisableReasoning (which would otherwise turn the extractor's stripping off).
reasoningCfg = spokenReasoningConfig(reasoningCfg)
return &speechStreamer{
func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer {
return &transcriptStreamer{
ctx: ctx,
t: t,
session: session,
responseID: responseID,
itemID: itemID,
extractor: reasoning.NewReasoningExtractor(thinkingStartToken, reasoningCfg),
streamTTS: session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS(),
extractor: reasoning.NewReasoningExtractor(thinkingStartToken, spokenReasoningConfig(reasoningCfg)),
}
}
// onToken handles one streamed LLM token. It is shaped to be used directly as
// the backend token callback's text sink.
func (s *speechStreamer) onToken(token string) {
// onToken handles one streamed unit of model output, sending a transcript delta
// for the new content (reasoning stripped). For plain-content models the unit is
// the raw text token; for autoparser tool turns the backend clears the text and
// delivers content via ChatDeltas, so the caller passes that content here.
func (s *transcriptStreamer) onToken(token string) {
_, content := s.extractor.ProcessToken(token)
if content == "" {
return
@@ -64,41 +54,20 @@ func (s *speechStreamer) onToken(token string) {
ContentIndex: 0,
Delta: content,
})
if s.streamTTS {
for _, segment := range s.seg.Push(content) {
s.speak(segment)
}
}
}
func (s *speechStreamer) speak(text string) {
pcm, err := emitSpeech(s.ctx, s.t, s.session, s.responseID, s.itemID, text)
if err != nil {
if s.err == nil {
s.err = err
}
return
}
s.audio = append(s.audio, pcm...)
}
// finish flushes any buffered sentence to TTS and returns the full cleaned
// content, the accumulated PCM audio, and the first error encountered (if any).
func (s *speechStreamer) finish() (content string, audio []byte, err error) {
if s.streamTTS {
if rem := s.seg.Flush(); rem != "" {
s.speak(rem)
}
}
return s.extractor.CleanedContent(), s.audio, s.err
// content returns the full transcript so far with reasoning stripped.
func (s *transcriptStreamer) content() string {
return s.extractor.CleanedContent()
}
// streamLLMResponse drives a streamed, plain-content (no tools) realtime reply.
// It announces the assistant item before tokens arrive, feeds the LLM token
// callback through a speechStreamer (transcript deltas + sentence-piped TTS),
// then emits the terminal events. It returns true when it has fully handled the
// response so the caller can return; callers must only invoke it for turns with
// no tools and an audio modality (see triggerResponseAtTurn).
// It announces the assistant item before tokens arrive, streams transcript
// deltas as the LLM generates, then synthesizes the whole buffered message once
// (streaming the audio chunks when the TTS backend supports it, otherwise a
// single unary delta) and emits the terminal events. It returns true when it has
// fully handled the response so the caller can return; callers must only invoke
// it for turns with no tools and an audio modality (see triggerResponseAtTurn).
func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool {
// Announce the assistant item up front so streamed deltas target a known item.
item := types.MessageItemUnion{
@@ -150,7 +119,7 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation
}
thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig)
streamer := newSpeechStreamer(ctx, t, session, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig)
streamer := newTranscriptStreamer(ctx, t, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig)
cb := func(token string, _ backend.TokenUsage) bool {
if ctx.Err() != nil {
return false
@@ -177,8 +146,16 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation
return true
}
content, audio, err := streamer.finish()
// Buffer the whole message, then synthesize it once. emitSpeech streams the
// audio chunks when the TTS backend supports TTSStream, otherwise it sends a
// single unary delta — no per-sentence segmentation either way.
content := streamer.content()
audio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, content)
if err != nil {
if ctx.Err() != nil {
cancel()
return true
}
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
return true
}

View File

@@ -6,86 +6,77 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
"github.com/mudler/LocalAI/pkg/reasoning"
)
// speechStreamer consumes streamed LLM tokens: it strips reasoning, emits a
// transcript delta per content fragment, and sentence-pipes content into TTS so
// audio starts before the full reply is generated.
var _ = Describe("speechStreamer", func() {
It("emits a transcript delta per token and speaks each completed sentence", func() {
on := true
m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000}
session := &Session{
OutputSampleRate: 24000,
ModelInterface: m,
ModelConfig: &config.ModelConfig{
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &on}},
},
}
// transcriptStreamer turns streamed LLM tokens into incremental transcript
// deltas, stripping reasoning. Audio is synthesized once from the full message
// by the caller, so there is no per-sentence segmentation.
var _ = Describe("transcriptStreamer", func() {
It("emits one transcript delta per content token", func() {
t := &fakeTransport{}
s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{})
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{})
for _, tok := range []string{"Hello", " world.", " Bye"} {
s.onToken(tok)
}
content, audio, err := s.finish()
Expect(err).ToNot(HaveOccurred())
Expect(content).To(Equal("Hello world. Bye"))
// One transcript delta per (non-empty) token.
Expect(s.content()).To(Equal("Hello world. Bye"))
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
// Two sentences spoken: "Hello world." mid-stream + "Bye" on flush; one
// chunk each.
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2))
Expect(audio).To(Equal([]byte{7, 7}))
Expect(t.transcriptDeltaText()).To(Equal("Hello world. Bye"))
})
It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() {
// disable_thinking maps to ReasoningConfig.DisableReasoning=true (it tells
// the backend enable_thinking=false). When the model ignores that and emits
// thinking anyway, the spoken stream must still not leak it: the streamer is
// the last line of defence and always strips reasoning from spoken content.
// disable_thinking maps to DisableReasoning=true (enable_thinking=false to
// the backend). If the model emits thinking anyway, the transcript must
// still not leak it: stripping always runs for spoken output.
disable := true
session := &Session{
OutputSampleRate: 24000,
ModelInterface: &fakeModel{},
ModelConfig: &config.ModelConfig{}, // streaming.tts off
}
t := &fakeTransport{}
s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "",
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "",
reasoning.Config{DisableReasoning: &disable})
s.onToken("<think>secret plan</think>")
s.onToken("The answer is 42.")
content, _, err := s.finish()
Expect(err).ToNot(HaveOccurred())
Expect(content).To(Equal("The answer is 42."))
Expect(content).ToNot(ContainSubstring("secret plan"))
// The text streamed to the client must not carry the reasoning either.
Expect(s.content()).To(Equal("The answer is 42."))
Expect(s.content()).ToNot(ContainSubstring("secret plan"))
Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan"))
})
})
It("does not synthesize audio when TTS streaming is disabled", func() {
m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000}
// streamLLMResponse drives a full streamed realtime turn: live transcript
// deltas while the LLM generates, then the whole message is synthesized once.
var _ = Describe("streamLLMResponse", func() {
It("streams transcript deltas then synthesizes the whole message once", func() {
on := true
m := &fakeModel{
predictTokens: []string{"Hello", " world.", " How are you?"},
predictResp: backend.LLMResponse{Response: "Hello world. How are you?"},
ttsStreamChunks: [][]byte{{9}},
ttsStreamRate: 24000,
}
session := &Session{
OutputSampleRate: 24000,
ModelInterface: m,
ModelConfig: &config.ModelConfig{}, // streaming.tts off
ModelConfig: &config.ModelConfig{
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
},
}
conv := &Conversation{}
t := &fakeTransport{}
s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{})
llmCfg := &config.ModelConfig{}
s.onToken("Hello world.")
content, audio, err := s.finish()
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg)
Expect(err).ToNot(HaveOccurred())
Expect(content).To(Equal("Hello world."))
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(1))
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(0))
Expect(audio).To(BeEmpty())
Expect(handled).To(BeTrue())
// One live transcript delta per streamed token.
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
// The whole message is synthesized ONCE (not per sentence): a single
// emitSpeech replays the one TTS stream chunk.
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1))
Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?"))
})
})