mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-10 17:56:49 -04:00
feat(realtime): script-aware clause chunking + streamed-reply fixes
Opt-in pipeline.streaming.clause_chunking splits the streamed LLM reply
into speakable clauses and synthesizes each as soon as it completes,
lowering time-to-first-audio instead of buffering the whole message. The
splitter is script-aware (rivo/uniseg, pure Go): UAX#29 sentence
segmentation handles CJK 。!? with no whitespace, CJK clause
punctuation (,、;:) and Thai/Lao spaces give finer cuts, and a UAX#14
line-break cap bounds an over-long punctuation-less run. Unlike the old
ASCII .!?/newline segmenter (dropped in 076dcdbe) it does not degrade to
whole-message buffering for CJK/Thai; scripts needing a dictionary
(Khmer/Burmese) stay buffered until a space or end-of-message. Clauses
are synthesized synchronously in the token callback (the LLM keeps
generating into the gRPC stream meanwhile), so audio still starts
mid-generation. Off by default — the whole-message path is unchanged.
Also fix the streamed-reply path and the Talk page:
- Don't swallow streamed autoparser content as reasoning: the
tokenizer-template path already delivers reasoning-free content via
ChatDeltas, so prefilling the thinking start token re-tagged it as an
unclosed reasoning block, leaving no spoken reply. Disable the prefill
on that path; closed tag pairs are still stripped (#9985).
- Generate collision-free realtime IDs (16 random bytes) instead of a
constant, so per-item bookkeeping (cancel, conversation.item.retrieve)
works.
- Key the Talk transcript by the server item_id and upsert entries.
Realtime events arrive over a WebRTC data channel — outside React's
event system — so React defers the setTranscript updaters while
synchronous ref writes in handler bodies run first; the old
index-tracking ref rendered a duplicate assistant bubble on
completion. Upserts by item_id are idempotent and order-independent.
- Drop the partial assistant bubble on a cancelled response (barge-in):
the server discards the interrupted item and sends response.done with
status "cancelled"; mirror that in the UI so the regenerated reply
isn't rendered as a second assistant message.
Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Assisted-by: Claude:claude-fable-5 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>
This commit is contained in:
@@ -336,6 +336,13 @@ func DefaultRegistry() map[string]FieldMetaOverride {
|
||||
Component: "toggle",
|
||||
Order: 68,
|
||||
},
|
||||
"pipeline.streaming.clause_chunking": {
|
||||
Section: "pipeline",
|
||||
Label: "Clause Chunking",
|
||||
Description: "Split the streamed reply into speakable clauses and synthesize each as soon as it completes, instead of buffering the whole message before TTS — lower time-to-first-audio. Script-aware (handles CJK 。!? and Thai/Lao spaces), so it does not whitespace-split. Requires Stream LLM; off buffers the whole message.",
|
||||
Component: "toggle",
|
||||
Order: 69,
|
||||
},
|
||||
|
||||
// --- Functions ---
|
||||
"function.grammar.parallel_calls": {
|
||||
|
||||
@@ -545,6 +545,12 @@ type PipelineStreaming struct {
|
||||
LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"`
|
||||
TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"`
|
||||
Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"`
|
||||
// ClauseChunking splits the streamed LLM reply into speakable clauses and
|
||||
// synthesizes each as soon as it completes, instead of buffering the whole
|
||||
// message before TTS. Script-aware (CJK/Thai), so it does not rely on
|
||||
// whitespace sentence boundaries. Requires LLM streaming; unset buffers the
|
||||
// whole message (today's default).
|
||||
ClauseChunking *bool `yaml:"clause_chunking,omitempty" json:"clause_chunking,omitempty"`
|
||||
}
|
||||
|
||||
// StreamLLM reports whether LLM tokens should be streamed for this pipeline.
|
||||
@@ -558,6 +564,12 @@ func (p Pipeline) StreamTranscription() bool {
|
||||
return p.Streaming.Transcription != nil && *p.Streaming.Transcription
|
||||
}
|
||||
|
||||
// ChunkClauses reports whether the streamed reply should be split into
|
||||
// script-aware clauses and synthesized incrementally rather than buffered whole.
|
||||
func (p Pipeline) ChunkClauses() bool {
|
||||
return p.Streaming.ClauseChunking != nil && *p.Streaming.ClauseChunking
|
||||
}
|
||||
|
||||
// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off.
|
||||
func (p Pipeline) ThinkingDisabled() bool {
|
||||
return p.DisableThinking != nil && *p.DisableThinking
|
||||
|
||||
@@ -16,6 +16,7 @@ var _ = Describe("Pipeline streaming config", func() {
|
||||
Expect(p.StreamLLM()).To(BeFalse())
|
||||
Expect(p.StreamTTS()).To(BeFalse())
|
||||
Expect(p.StreamTranscription()).To(BeFalse())
|
||||
Expect(p.ChunkClauses()).To(BeFalse())
|
||||
Expect(p.ThinkingDisabled()).To(BeFalse())
|
||||
})
|
||||
|
||||
@@ -31,12 +32,14 @@ pipeline:
|
||||
llm: true
|
||||
tts: true
|
||||
transcription: true
|
||||
clause_chunking: true
|
||||
disable_thinking: true
|
||||
`), &c)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(c.Pipeline.StreamLLM()).To(BeTrue())
|
||||
Expect(c.Pipeline.StreamTTS()).To(BeTrue())
|
||||
Expect(c.Pipeline.StreamTranscription()).To(BeTrue())
|
||||
Expect(c.Pipeline.ChunkClauses()).To(BeTrue())
|
||||
Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue())
|
||||
})
|
||||
|
||||
|
||||
@@ -2,8 +2,10 @@ package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
@@ -1983,8 +1985,11 @@ func generateItemID() string {
|
||||
}
|
||||
|
||||
func generateUniqueID() string {
|
||||
// Generate a unique ID string
|
||||
// For simplicity, use a counter or UUID
|
||||
// Implement as needed
|
||||
return "unique_id"
|
||||
// 16 random bytes, hex-encoded. Must be collision-free: session, item,
|
||||
// response and call IDs build on this, and the conversation tracks/removes
|
||||
// items by ID (e.g. cancel() in realtime_stream.go, conversation.item.retrieve).
|
||||
// A constant would make every ID alias and corrupt that bookkeeping.
|
||||
var b [16]byte
|
||||
_, _ = rand.Read(b[:])
|
||||
return hex.EncodeToString(b[:])
|
||||
}
|
||||
|
||||
200
core/http/endpoints/openai/realtime_chunker.go
Normal file
200
core/http/endpoints/openai/realtime_chunker.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/rivo/uniseg"
|
||||
)
|
||||
|
||||
// Default clause-chunker bounds (in runes). minRunes gates only sub-sentence
|
||||
// (clause-mark / Thai-space) cuts so we don't synthesize tiny choppy fragments;
|
||||
// full sentences always flush regardless of length. maxRunes caps an
|
||||
// unterminated run so a long punctuation-less span doesn't buffer unbounded.
|
||||
const (
|
||||
defaultClauseMinRunes = 12
|
||||
defaultClauseMaxRunes = 200
|
||||
)
|
||||
|
||||
// clauseChunker splits streamed LLM content into speakable clauses for
|
||||
// incremental TTS, in a SCRIPT-AWARE way so it works for languages without
|
||||
// whitespace word boundaries. It leans on UAX #29 sentence segmentation (which
|
||||
// natively terminates on CJK 。!? as well as Latin .!?), adds CJK clause
|
||||
// punctuation (,、;:) and Thai/Lao spaces as finer boundaries, and caps an
|
||||
// over-long unterminated run via UAX #14 line-break opportunities.
|
||||
//
|
||||
// Unlike the old ASCII .!?/newline segmenter (dropped in 076dcdbe), it does not
|
||||
// degrade to whole-message buffering for CJK (handled natively) or Thai/Lao
|
||||
// (handled via spaces, which Thai uses at clause/sentence boundaries). Scripts
|
||||
// that genuinely need a dictionary (Khmer/Myanmar) simply stay buffered until a
|
||||
// space or end-of-message — no worse than the buffered default.
|
||||
//
|
||||
// It is not safe for concurrent use; callers feed it from a single goroutine
|
||||
// (the LLM token callback).
|
||||
type clauseChunker struct {
|
||||
buf strings.Builder
|
||||
minRunes int
|
||||
maxRunes int
|
||||
}
|
||||
|
||||
func newClauseChunker(minRunes, maxRunes int) *clauseChunker {
|
||||
return &clauseChunker{minRunes: minRunes, maxRunes: maxRunes}
|
||||
}
|
||||
|
||||
// push appends streamed content and returns any clauses that are now complete —
|
||||
// "complete" meaning confirmed by following content, so we never speak a clause
|
||||
// that the next token might extend. Incomplete trailing text stays buffered.
|
||||
func (c *clauseChunker) push(text string) []string {
|
||||
c.buf.WriteString(text)
|
||||
return c.drain(false)
|
||||
}
|
||||
|
||||
// flush returns the remaining buffered clauses, treating end-of-input as a hard
|
||||
// boundary, and clears the buffer.
|
||||
func (c *clauseChunker) flush() []string {
|
||||
return c.drain(true)
|
||||
}
|
||||
|
||||
func (c *clauseChunker) drain(final bool) []string {
|
||||
s := c.buf.String()
|
||||
rest := s
|
||||
var out []string
|
||||
for rest != "" {
|
||||
end, ok := c.nextBoundary(rest, final)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if seg := strings.TrimSpace(rest[:end]); seg != "" {
|
||||
out = append(out, seg)
|
||||
}
|
||||
rest = rest[end:]
|
||||
}
|
||||
// Rewriting the builder reallocates and copies the whole buffer; skip it on
|
||||
// the common per-token call where no boundary was confirmed.
|
||||
if len(rest) != len(s) {
|
||||
c.buf.Reset()
|
||||
c.buf.WriteString(rest)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// nextBoundary returns the byte offset just past the first emittable clause in
|
||||
// s, or ok=false when more input is needed (final=false) and no boundary is
|
||||
// confirmed yet.
|
||||
func (c *clauseChunker) nextBoundary(s string, final bool) (int, bool) {
|
||||
if s == "" {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// 1) UAX #29 sentence boundary. When the first sentence is followed by more
|
||||
// text it is a confirmed complete sentence (handles Latin .!? with
|
||||
// abbreviation/decimal guards, and CJK 。!? with no whitespace).
|
||||
sentence, rest, _ := uniseg.FirstSentenceInString(s, -1)
|
||||
if rest != "" {
|
||||
// Optionally cut finer inside the sentence at a clause boundary.
|
||||
if cut, ok := c.firstClauseCut(sentence); ok {
|
||||
return cut, true
|
||||
}
|
||||
return len(sentence), true
|
||||
}
|
||||
|
||||
// 2) Unterminated tail: look for a sub-sentence clause boundary (CJK
|
||||
// punctuation or a Thai/Lao space) confirmed by following content.
|
||||
if cut, ok := c.firstClauseCut(s); ok {
|
||||
return cut, true
|
||||
}
|
||||
|
||||
// 3) Over-long punctuation-less run: force a typographically legal break so
|
||||
// we don't buffer unbounded (e.g. a long CJK run with no punctuation).
|
||||
if !final && c.maxRunes > 0 && utf8.RuneCountInString(s) > c.maxRunes {
|
||||
if cut, ok := lineBreakCut(s, c.maxRunes); ok {
|
||||
return cut, true
|
||||
}
|
||||
}
|
||||
|
||||
// 4) End of input: emit whatever remains as the final clause.
|
||||
if final {
|
||||
return len(s), true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// firstClauseCut returns the byte offset just past the first sub-sentence clause
|
||||
// boundary in s — a CJK clause punctuation mark, or a space following a Thai/Lao
|
||||
// letter — provided the prefix is at least minRunes long and non-space content
|
||||
// follows. The boundary mark (and any trailing spaces) stay with the left clause.
|
||||
func (c *clauseChunker) firstClauseCut(s string) (int, bool) {
|
||||
var prev rune
|
||||
runes := 0
|
||||
for i, r := range s {
|
||||
boundary := isCJKClausePunct(r) || (unicode.IsSpace(r) && isThaiLao(prev))
|
||||
if boundary && runes+1 >= c.minRunes {
|
||||
end := i + utf8.RuneLen(r)
|
||||
for end < len(s) {
|
||||
nr, sz := utf8.DecodeRuneInString(s[end:])
|
||||
if !unicode.IsSpace(nr) {
|
||||
break
|
||||
}
|
||||
end += sz
|
||||
}
|
||||
if end < len(s) { // confirmed: real content follows the boundary
|
||||
return end, true
|
||||
}
|
||||
// Boundary sits at the end of the buffer with nothing after it yet —
|
||||
// wait for the next token to confirm it rather than emit early.
|
||||
return 0, false
|
||||
}
|
||||
prev = r
|
||||
runes++
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// lineBreakCut walks UAX #14 line segments and returns the byte offset of the
|
||||
// last legal break opportunity at or before maxRunes. Returns ok=false when the
|
||||
// run has no internal break opportunity (e.g. a space-less Thai run), leaving it
|
||||
// buffered.
|
||||
func lineBreakCut(s string, maxRunes int) (int, bool) {
|
||||
state := -1
|
||||
rest := s
|
||||
consumed := 0
|
||||
runes := 0
|
||||
for rest != "" {
|
||||
seg, rem, _, st := uniseg.FirstLineSegmentInString(rest, state)
|
||||
state = st
|
||||
runes += utf8.RuneCountInString(seg)
|
||||
consumed += len(seg)
|
||||
rest = rem
|
||||
if runes >= maxRunes {
|
||||
if consumed < len(s) {
|
||||
return consumed, true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// isCJKClausePunct reports whether r is a CJK clause-level separator worth a
|
||||
// soft TTS break. Sentence terminators (。!?) are intentionally excluded — UAX
|
||||
// #29 sentence segmentation already handles those.
|
||||
func isCJKClausePunct(r rune) bool {
|
||||
switch r {
|
||||
case ',', // , fullwidth comma
|
||||
'、', // 、 ideographic comma
|
||||
';', // ; fullwidth semicolon
|
||||
':', // : fullwidth colon
|
||||
'・', // ・ katakana middle dot
|
||||
'・': // ・ halfwidth katakana middle dot
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isThaiLao reports whether r is a Thai or Lao letter. Those scripts have no
|
||||
// inter-word spaces; an ASCII space inside such a run marks a clause/sentence
|
||||
// boundary, which is the only no-dictionary segmentation signal available.
|
||||
func isThaiLao(r rune) bool {
|
||||
return unicode.Is(unicode.Thai, r) || unicode.Is(unicode.Lao, r)
|
||||
}
|
||||
103
core/http/endpoints/openai/realtime_chunker_test.go
Normal file
103
core/http/endpoints/openai/realtime_chunker_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// clauseChunker splits streamed LLM content into speakable clauses in a
|
||||
// script-aware way: UAX#29 sentences (Latin .!? and CJK 。!?), CJK clause
|
||||
// punctuation, and Thai/Lao spaces — never whitespace-splitting CJK.
|
||||
var _ = Describe("clauseChunker", func() {
|
||||
Context("Latin sentences", func() {
|
||||
It("emits a sentence only once following content confirms it is complete", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
Expect(c.push("Hello world. How are you?")).To(Equal([]string{"Hello world."}))
|
||||
// The trailing sentence is held until flush (the next token might extend it).
|
||||
Expect(c.flush()).To(Equal([]string{"How are you?"}))
|
||||
})
|
||||
|
||||
It("assembles a sentence across many small tokens", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
var got []string
|
||||
for _, tok := range []string{"Hello", " world.", " How", " are", " you?"} {
|
||||
got = append(got, c.push(tok)...)
|
||||
}
|
||||
got = append(got, c.flush()...)
|
||||
Expect(got).To(Equal([]string{"Hello world.", "How are you?"}))
|
||||
})
|
||||
|
||||
It("does not split decimals or abbreviations (UAX#29 SB6)", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
got := c.push("Pi is 3.14 and e is 2.72. Done")
|
||||
Expect(got).To(Equal([]string{"Pi is 3.14 and e is 2.72."}))
|
||||
Expect(c.flush()).To(Equal([]string{"Done"}))
|
||||
})
|
||||
})
|
||||
|
||||
Context("CJK (no whitespace)", func() {
|
||||
It("splits Chinese on the ideographic full stop", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
Expect(c.push("你好世界。今天天气很好。")).To(Equal([]string{"你好世界。"}))
|
||||
Expect(c.flush()).To(Equal([]string{"今天天气很好。"}))
|
||||
})
|
||||
|
||||
It("splits Japanese on the ideographic full stop", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
Expect(c.push("こんにちは。元気ですか。")).To(Equal([]string{"こんにちは。"}))
|
||||
Expect(c.flush()).To(Equal([]string{"元気ですか。"}))
|
||||
})
|
||||
|
||||
It("splits on CJK clause punctuation for lower latency", func() {
|
||||
c := newClauseChunker(2, 200) // small min so short test clauses cut
|
||||
Expect(c.push("你好,世界。再见")).To(Equal([]string{"你好,", "世界。"}))
|
||||
Expect(c.flush()).To(Equal([]string{"再见"}))
|
||||
})
|
||||
})
|
||||
|
||||
Context("Thai (spaces mark clauses, not words)", func() {
|
||||
It("splits a Thai run on the inter-clause space", func() {
|
||||
c := newClauseChunker(2, 200)
|
||||
Expect(c.push("สวัสดีครับ กินข้าวไหม")).To(Equal([]string{"สวัสดีครับ"}))
|
||||
Expect(c.flush()).To(Equal([]string{"กินข้าวไหม"}))
|
||||
})
|
||||
|
||||
It("never shatters a space-less Thai run into characters", func() {
|
||||
c := newClauseChunker(2, 200)
|
||||
Expect(c.push("สวัสดีครับ")).To(BeEmpty()) // held, no boundary
|
||||
Expect(c.flush()).To(Equal([]string{"สวัสดีครับ"}))
|
||||
})
|
||||
})
|
||||
|
||||
Context("length cap (UAX#14 fallback)", func() {
|
||||
It("force-breaks an over-long punctuation-less CJK run at legal points", func() {
|
||||
c := newClauseChunker(4, 10) // maxRunes = 10
|
||||
run := strings.Repeat("字", 25)
|
||||
got := c.push(run)
|
||||
got = append(got, c.flush()...)
|
||||
total := 0
|
||||
for _, seg := range got {
|
||||
n := utf8.RuneCountInString(seg)
|
||||
Expect(n).To(BeNumerically("<=", 10)) // never exceeds the cap
|
||||
total += n
|
||||
}
|
||||
Expect(total).To(Equal(25)) // nothing dropped
|
||||
Expect(len(got)).To(BeNumerically(">=", 3)) // 10 + 10 + 5
|
||||
})
|
||||
})
|
||||
|
||||
Context("buffer lifecycle", func() {
|
||||
It("flush clears the buffer so the chunker is reusable", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
// "First one." is confirmed by the following "Second", so push drains it;
|
||||
// only the unterminated tail remains for flush.
|
||||
Expect(c.push("First one. Second")).To(Equal([]string{"First one."}))
|
||||
Expect(c.flush()).To(Equal([]string{"Second"}))
|
||||
Expect(c.flush()).To(BeEmpty())
|
||||
Expect(c.push("Again. More")).To(Equal([]string{"Again."}))
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -45,13 +45,15 @@ func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID,
|
||||
}
|
||||
|
||||
// onToken handles one streamed unit of model output, sending a transcript delta
|
||||
// for the new content (reasoning stripped). For plain-content models the unit is
|
||||
// the raw text token; for autoparser tool turns the backend clears the text and
|
||||
// delivers content via ChatDeltas, so the caller passes that content here.
|
||||
func (s *transcriptStreamer) onToken(token string) {
|
||||
// for the new content (reasoning stripped) and returning that content delta so
|
||||
// the caller can also feed it to the clause chunker. For plain-content models
|
||||
// the unit is the raw text token; for autoparser tool turns the backend clears
|
||||
// the text and delivers content via ChatDeltas, so the caller passes that
|
||||
// content here. Returns "" when the token produced no new spoken content.
|
||||
func (s *transcriptStreamer) onToken(token string) string {
|
||||
_, content := s.extractor.ProcessToken(token)
|
||||
if content == "" {
|
||||
return
|
||||
return ""
|
||||
}
|
||||
if !s.announced {
|
||||
s.announced = true
|
||||
@@ -67,6 +69,7 @@ func (s *transcriptStreamer) onToken(token string) {
|
||||
ContentIndex: 0,
|
||||
Delta: content,
|
||||
})
|
||||
return content
|
||||
}
|
||||
|
||||
// content returns the full transcript so far with reasoning stripped.
|
||||
@@ -143,8 +146,52 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation
|
||||
}
|
||||
thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig)
|
||||
|
||||
streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, llmCfg.ReasoningConfig)
|
||||
// The autoparser (tokenizer-template path) already delivers reasoning-free
|
||||
// content. Prefilling the thinking start token here would re-tag that clean
|
||||
// content as an unclosed reasoning block, leaving CleanedContent() empty —
|
||||
// no spoken reply, no TTS. Disable the prefill; closed tag pairs are still
|
||||
// stripped (PEG-fallback case, #9985).
|
||||
reasoningCfg := llmCfg.ReasoningConfig
|
||||
if llmCfg.TemplateConfig.UseTokenizerTemplate {
|
||||
disablePrefill := true
|
||||
reasoningCfg.DisableReasoningTagPrefill = &disablePrefill
|
||||
}
|
||||
|
||||
streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, reasoningCfg)
|
||||
streamer.announce = announce
|
||||
|
||||
// Clause chunking (opt-in): synthesize each clause as soon as it completes
|
||||
// instead of buffering the whole reply. streamedAudio accumulates the PCM
|
||||
// across clauses for the conversation item record; ttsErr captures the first
|
||||
// synthesis failure so the token callback can stop the prediction. emitSpeech
|
||||
// runs synchronously here — the LLM keeps generating into the gRPC stream
|
||||
// while a clause is synthesized, so audio still starts mid-generation.
|
||||
var chunker *clauseChunker
|
||||
if session.ModelConfig != nil && session.ModelConfig.Pipeline.ChunkClauses() {
|
||||
chunker = newClauseChunker(defaultClauseMinRunes, defaultClauseMaxRunes)
|
||||
}
|
||||
var streamedAudio []byte
|
||||
var ttsErr error
|
||||
speakClause := func(clause string) error {
|
||||
a, err := emitSpeech(ctx, t, session, responseID, itemID, clause)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
streamedAudio = append(streamedAudio, a...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// fail reports a mid-stream failure. A cancelled context means the client
|
||||
// interrupted (barge-in), so roll the turn back instead of erroring.
|
||||
fail := func(code, msg string, err error) bool {
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
} else {
|
||||
sendError(t, code, fmt.Sprintf("%s: %v", msg, err), "", itemID)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
cb := func(token string, usage backend.TokenUsage) bool {
|
||||
if ctx.Err() != nil {
|
||||
return false
|
||||
@@ -157,7 +204,14 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation
|
||||
if len(usage.ChatDeltas) > 0 {
|
||||
text = functions.ContentFromChatDeltas(usage.ChatDeltas)
|
||||
}
|
||||
streamer.onToken(text)
|
||||
delta := streamer.onToken(text)
|
||||
if chunker != nil && delta != "" {
|
||||
for _, clause := range chunker.push(delta) {
|
||||
if ttsErr = speakClause(clause); ttsErr != nil {
|
||||
return false // stop the prediction; reported after predFunc returns
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -167,13 +221,13 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation
|
||||
return true
|
||||
}
|
||||
pred, err := predFunc()
|
||||
// A clause synthesis failed mid-stream (the callback stopped the prediction);
|
||||
// report it as a TTS error rather than a prediction error.
|
||||
if ttsErr != nil {
|
||||
return fail("tts_error", "TTS generation failed", ttsErr)
|
||||
}
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
return true
|
||||
}
|
||||
sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", itemID)
|
||||
return true
|
||||
return fail("prediction_failed", "backend error", err)
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
@@ -189,17 +243,25 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation
|
||||
if !announced {
|
||||
announce()
|
||||
}
|
||||
// Buffer the whole message, then synthesize it once. emitSpeech streams
|
||||
// the audio chunks when the TTS backend supports TTSStream, otherwise it
|
||||
// sends a single unary delta — no per-sentence segmentation either way.
|
||||
audio, err := emitSpeech(ctx, t, session, responseID, itemID, content)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
return true
|
||||
|
||||
// Synthesize the audio. With clause chunking the completed clauses were
|
||||
// already spoken inside the token callback; flush the trailing clause(s)
|
||||
// the segmenter was still holding. Otherwise buffer the whole message and
|
||||
// synthesize it once. emitSpeech streams the audio chunks when the TTS
|
||||
// backend supports TTSStream, otherwise it sends a single unary delta.
|
||||
var audio []byte
|
||||
if chunker != nil {
|
||||
for _, clause := range chunker.flush() {
|
||||
if ttsErr = speakClause(clause); ttsErr != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", itemID)
|
||||
return true
|
||||
audio = streamedAudio
|
||||
} else {
|
||||
audio, ttsErr = emitSpeech(ctx, t, session, responseID, itemID, content)
|
||||
}
|
||||
if ttsErr != nil {
|
||||
return fail("tts_error", "TTS generation failed", ttsErr)
|
||||
}
|
||||
|
||||
_, isWebRTC := t.(*WebRTCTransport)
|
||||
|
||||
@@ -46,6 +46,38 @@ var _ = Describe("transcriptStreamer", func() {
|
||||
Expect(s.content()).ToNot(ContainSubstring("secret plan"))
|
||||
Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan"))
|
||||
})
|
||||
|
||||
It("does not swallow autoparser content when the template has a thinking start token (tokenizer-template path)", func() {
|
||||
// Regression: with tag prefill on, the detected <think> token is
|
||||
// prepended to the autoparser's already-clean content, swallowing the
|
||||
// whole reply (empty transcript → no TTS). streamLLMResponse disables
|
||||
// the prefill for the tokenizer-template path.
|
||||
disablePrefill := true
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "<think>",
|
||||
reasoning.Config{DisableReasoningTagPrefill: &disablePrefill})
|
||||
|
||||
s.onToken("Hello")
|
||||
s.onToken(" there.")
|
||||
|
||||
Expect(s.content()).To(Equal("Hello there."))
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello there."))
|
||||
})
|
||||
|
||||
It("still strips embedded closed reasoning tags with prefill disabled (PEG-fallback safety, #9985)", func() {
|
||||
// Disabling prefill must not stop stripping closed <think>…</think>
|
||||
// pairs the PEG fallback can leave in autoparser content.
|
||||
disablePrefill := true
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "<think>",
|
||||
reasoning.Config{DisableReasoningTagPrefill: &disablePrefill})
|
||||
|
||||
s.onToken("<think>secret</think>")
|
||||
s.onToken("The answer is 42.")
|
||||
|
||||
Expect(s.content()).To(Equal("The answer is 42."))
|
||||
Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret"))
|
||||
})
|
||||
})
|
||||
|
||||
// streamLLMResponse drives a full streamed realtime turn: live transcript
|
||||
@@ -81,6 +113,37 @@ var _ = Describe("streamLLMResponse", func() {
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?"))
|
||||
})
|
||||
|
||||
It("synthesizes each clause as it completes when clause chunking is enabled", func() {
|
||||
on := true
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{"Hello world.", " How are you?"},
|
||||
predictResp: backend.LLMResponse{Response: "Hello world. How are you?"},
|
||||
ttsStreamChunks: [][]byte{{9}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on, ClauseChunking: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// Two clauses ("Hello world." mid-stream, "How are you?" on flush) → two
|
||||
// emitSpeech calls → two audio deltas, vs one for whole-message buffering.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2))
|
||||
// The full transcript still streams verbatim.
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?"))
|
||||
// Exactly one terminal response.done.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
|
||||
})
|
||||
|
||||
It("streams content deltas and emits tool-call items (autoparser tool turn)", func() {
|
||||
on := true
|
||||
// Autoparser path: reply.Message is empty; content + tool calls arrive via
|
||||
|
||||
@@ -17,6 +17,24 @@ const STATUS_STYLES = {
|
||||
error: { icon: 'fa-solid fa-circle', color: 'var(--color-error)', bg: 'var(--color-error-light)' },
|
||||
}
|
||||
|
||||
// upsertAssistant merges a streamed transcript fragment into the assistant entry
|
||||
// identified by the server's item_id, or appends a new entry if none exists yet.
|
||||
// Keying by item_id (not a mutable index tracked across handler/updater
|
||||
// boundaries) makes streamed deltas idempotent and order-independent, so React's
|
||||
// batching of non-React data-channel events cannot produce a duplicate bubble.
|
||||
// mode 'append' adds to the running text; 'replace' sets the final transcript.
|
||||
function upsertAssistant(prev, itemId, text, mode) {
|
||||
// Only assistant entries carry an id, and the streaming entry is almost
|
||||
// always the newest — search from the tail so per-delta cost stays constant.
|
||||
const i = prev.findLastIndex(e => e.id === itemId)
|
||||
if (i === -1) {
|
||||
return [...prev, { role: 'assistant', id: itemId, text }]
|
||||
}
|
||||
const next = [...prev]
|
||||
next[i] = { ...next[i], text: mode === 'append' ? next[i].text + text : text }
|
||||
return next
|
||||
}
|
||||
|
||||
export default function Talk() {
|
||||
const { addToast } = useOutletContext()
|
||||
const navigate = useNavigate()
|
||||
@@ -34,7 +52,10 @@ export default function Talk() {
|
||||
|
||||
// Transcript
|
||||
const [transcript, setTranscript] = useState([])
|
||||
const streamingRef = useRef(null) // tracks the index of the in-progress assistant message
|
||||
// item_id of the assistant message currently streaming — used only to remove
|
||||
// its partial bubble when a response is cancelled (barge-in). The transcript
|
||||
// itself is keyed by item_id via upsertAssistant, not by this ref.
|
||||
const inProgressIdRef = useRef(null)
|
||||
|
||||
// Session settings
|
||||
const [instructions, setInstructions] = useState(
|
||||
@@ -227,39 +248,21 @@ export default function Talk() {
|
||||
break
|
||||
case 'conversation.item.input_audio_transcription.completed':
|
||||
if (event.transcript) {
|
||||
streamingRef.current = null
|
||||
setTranscript(prev => [...prev, { role: 'user', text: event.transcript }])
|
||||
}
|
||||
updateStatus('thinking', 'Generating response...')
|
||||
break
|
||||
case 'response.output_audio_transcript.delta':
|
||||
if (event.delta) {
|
||||
setTranscript(prev => {
|
||||
if (streamingRef.current !== null) {
|
||||
const updated = [...prev]
|
||||
updated[streamingRef.current] = {
|
||||
...updated[streamingRef.current],
|
||||
text: updated[streamingRef.current].text + event.delta,
|
||||
}
|
||||
return updated
|
||||
}
|
||||
streamingRef.current = prev.length
|
||||
return [...prev, { role: 'assistant', text: event.delta }]
|
||||
})
|
||||
inProgressIdRef.current = event.item_id
|
||||
setTranscript(prev => upsertAssistant(prev, event.item_id, event.delta, 'append'))
|
||||
}
|
||||
break
|
||||
case 'response.output_audio_transcript.done':
|
||||
if (event.transcript) {
|
||||
setTranscript(prev => {
|
||||
if (streamingRef.current !== null) {
|
||||
const updated = [...prev]
|
||||
updated[streamingRef.current] = { ...updated[streamingRef.current], text: event.transcript }
|
||||
return updated
|
||||
}
|
||||
return [...prev, { role: 'assistant', text: event.transcript }]
|
||||
})
|
||||
setTranscript(prev => upsertAssistant(prev, event.item_id, event.transcript, 'replace'))
|
||||
}
|
||||
streamingRef.current = null
|
||||
inProgressIdRef.current = null
|
||||
break
|
||||
case 'response.output_audio.delta':
|
||||
updateStatus('speaking', 'Speaking...')
|
||||
@@ -281,7 +284,7 @@ export default function Talk() {
|
||||
// Pretty-print JSON for readability; fall back to raw string.
|
||||
try { preview = JSON.stringify(JSON.parse(preview), null, 2) } catch (_) { /* keep raw */ }
|
||||
setTranscript(prev => [...prev, { role: 'tool_result', text: preview }])
|
||||
streamingRef.current = null // tool result ends the current assistant text run
|
||||
inProgressIdRef.current = null // tool result ends the current assistant text run
|
||||
}
|
||||
break
|
||||
}
|
||||
@@ -290,9 +293,20 @@ export default function Talk() {
|
||||
// conversation.item.create + response.create when it's done.
|
||||
handleFunctionCall(event)
|
||||
break
|
||||
case 'response.done':
|
||||
case 'response.done': {
|
||||
// A cancelled response (barge-in / interruption) leaves a partial,
|
||||
// incrementally-streamed assistant bubble behind. The server discards
|
||||
// the interrupted item from history; mirror that here (remove the
|
||||
// in-progress assistant entry by item_id) so the regenerated reply
|
||||
// doesn't show up as a second assistant message.
|
||||
if (event.response?.status === 'cancelled' && inProgressIdRef.current) {
|
||||
const id = inProgressIdRef.current
|
||||
inProgressIdRef.current = null
|
||||
setTranscript(prev => prev.filter(e => e.id !== id))
|
||||
}
|
||||
updateStatus('listening', 'Listening...')
|
||||
break
|
||||
}
|
||||
case 'error':
|
||||
hasErrorRef.current = true
|
||||
updateStatus('error', 'Error: ' + (event.error?.message || 'Unknown error'))
|
||||
@@ -789,7 +803,7 @@ export default function Talk() {
|
||||
const iconColor = isToolCall || isToolResult ? 'var(--color-text-secondary)'
|
||||
: isUser ? 'var(--color-primary)' : 'var(--color-accent)'
|
||||
return (
|
||||
<div key={i} style={{ display: 'flex', alignItems: 'flex-start', gap: 'var(--spacing-xs)' }}>
|
||||
<div key={entry.id || i} style={{ display: 'flex', alignItems: 'flex-start', gap: 'var(--spacing-xs)' }}>
|
||||
<i className={iconClass} style={{ color: iconColor, marginTop: 3, flexShrink: 0, fontSize: '0.75rem' }} />
|
||||
<p style={{
|
||||
margin: 0,
|
||||
|
||||
@@ -43,14 +43,16 @@ pipeline:
|
||||
llm: qwen3-4b
|
||||
tts: tts-1
|
||||
streaming:
|
||||
llm: true # stream LLM tokens as transcript deltas
|
||||
tts: true # emit audio deltas per synthesized chunk
|
||||
transcription: true # stream transcript text deltas of the user's speech
|
||||
llm: true # stream LLM tokens as transcript deltas
|
||||
tts: true # emit audio deltas per synthesized chunk
|
||||
transcription: true # stream transcript text deltas of the user's speech
|
||||
clause_chunking: true # synthesize each clause as soon as it completes
|
||||
```
|
||||
|
||||
- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces (requires a backend that supports streaming synthesis), instead of one delta for the whole utterance. Falls back to a single unary delta otherwise.
|
||||
- **streaming.transcription**: stream `conversation.item.input_audio_transcription.delta` events as the transcript is produced (requires a transcription backend that supports streaming).
|
||||
- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events. The full reply is buffered and synthesized once it is complete — streamed as audio chunks when `streaming.tts` is enabled (and the TTS backend supports it), otherwise as a single unary delta. Reasoning/thinking is always stripped from the spoken transcript. Tool calls are supported while streaming when the LLM uses its tokenizer template (`use_tokenizer_template: true`): the backend's autoparser then delivers content and tool calls separately, so the spoken transcript never leaks tool-call tokens. Grammar-based function calling keeps the buffered path.
|
||||
- **streaming.clause_chunking**: instead of buffering the whole reply before TTS, split it into speakable clauses and synthesize each as soon as it completes, lowering the time-to-first-audio. The splitter is script-aware: it uses Unicode sentence segmentation (so it handles CJK `。!?` with no whitespace), CJK clause punctuation (`,、;:`), and Thai/Lao spaces — it does **not** rely on whitespace sentence boundaries, so it works for languages such as Chinese, Japanese and Thai where the old per-sentence approach degraded to whole-message buffering. Requires `streaming.llm`; scripts that genuinely need a dictionary (e.g. Khmer, Burmese) simply stay buffered until a space or end-of-message. Off by default.
|
||||
|
||||
All streaming flags are off by default, so existing pipelines are unaffected.
|
||||
|
||||
|
||||
2
go.mod
2
go.mod
@@ -465,7 +465,7 @@ require (
|
||||
github.com/quic-go/qpack v0.6.0 // indirect
|
||||
github.com/quic-go/quic-go v0.59.0 // indirect
|
||||
github.com/quic-go/webtransport-go v0.10.0 // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/rivo/uniseg v0.4.7
|
||||
github.com/shoenig/go-m1cpu v0.1.6 // indirect
|
||||
github.com/shopspring/decimal v1.4.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.4 // indirect
|
||||
|
||||
Reference in New Issue
Block a user