diff --git a/core/config/meta/registry.go b/core/config/meta/registry.go index 565bc2d4e..1b4cd8580 100644 --- a/core/config/meta/registry.go +++ b/core/config/meta/registry.go @@ -336,6 +336,13 @@ func DefaultRegistry() map[string]FieldMetaOverride { Component: "toggle", Order: 68, }, + "pipeline.streaming.clause_chunking": { + Section: "pipeline", + Label: "Clause Chunking", + Description: "Split the streamed reply into speakable clauses and synthesize each as soon as it completes, instead of buffering the whole message before TTS — lower time-to-first-audio. Script-aware (handles CJK 。!? and Thai/Lao spaces), so it does not whitespace-split. Requires Stream LLM; off buffers the whole message.", + Component: "toggle", + Order: 69, + }, // --- Functions --- "function.grammar.parallel_calls": { diff --git a/core/config/model_config.go b/core/config/model_config.go index 241ed2d49..c5b6cde4b 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -545,6 +545,12 @@ type PipelineStreaming struct { LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"` TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"` Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"` + // ClauseChunking splits the streamed LLM reply into speakable clauses and + // synthesizes each as soon as it completes, instead of buffering the whole + // message before TTS. Script-aware (CJK/Thai), so it does not rely on + // whitespace sentence boundaries. Requires LLM streaming; unset buffers the + // whole message (today's default). + ClauseChunking *bool `yaml:"clause_chunking,omitempty" json:"clause_chunking,omitempty"` } // StreamLLM reports whether LLM tokens should be streamed for this pipeline. @@ -558,6 +564,12 @@ func (p Pipeline) StreamTranscription() bool { return p.Streaming.Transcription != nil && *p.Streaming.Transcription } +// ChunkClauses reports whether the streamed reply should be split into +// script-aware clauses and synthesized incrementally rather than buffered whole. +func (p Pipeline) ChunkClauses() bool { + return p.Streaming.ClauseChunking != nil && *p.Streaming.ClauseChunking +} + // ThinkingDisabled reports whether the pipeline forces the LLM's thinking off. func (p Pipeline) ThinkingDisabled() bool { return p.DisableThinking != nil && *p.DisableThinking diff --git a/core/config/pipeline_streaming_test.go b/core/config/pipeline_streaming_test.go index a6bec5ee4..c9c5812c4 100644 --- a/core/config/pipeline_streaming_test.go +++ b/core/config/pipeline_streaming_test.go @@ -16,6 +16,7 @@ var _ = Describe("Pipeline streaming config", func() { Expect(p.StreamLLM()).To(BeFalse()) Expect(p.StreamTTS()).To(BeFalse()) Expect(p.StreamTranscription()).To(BeFalse()) + Expect(p.ChunkClauses()).To(BeFalse()) Expect(p.ThinkingDisabled()).To(BeFalse()) }) @@ -31,12 +32,14 @@ pipeline: llm: true tts: true transcription: true + clause_chunking: true disable_thinking: true `), &c) Expect(err).ToNot(HaveOccurred()) Expect(c.Pipeline.StreamLLM()).To(BeTrue()) Expect(c.Pipeline.StreamTTS()).To(BeTrue()) Expect(c.Pipeline.StreamTranscription()).To(BeTrue()) + Expect(c.Pipeline.ChunkClauses()).To(BeTrue()) Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue()) }) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index c8433cfeb..d2e2413c6 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -2,8 +2,10 @@ package openai import ( "context" + "crypto/rand" "encoding/base64" "encoding/binary" + "encoding/hex" "encoding/json" "fmt" "math" @@ -1983,8 +1985,11 @@ func generateItemID() string { } func generateUniqueID() string { - // Generate a unique ID string - // For simplicity, use a counter or UUID - // Implement as needed - return "unique_id" + // 16 random bytes, hex-encoded. Must be collision-free: session, item, + // response and call IDs build on this, and the conversation tracks/removes + // items by ID (e.g. cancel() in realtime_stream.go, conversation.item.retrieve). + // A constant would make every ID alias and corrupt that bookkeeping. + var b [16]byte + _, _ = rand.Read(b[:]) + return hex.EncodeToString(b[:]) } diff --git a/core/http/endpoints/openai/realtime_chunker.go b/core/http/endpoints/openai/realtime_chunker.go new file mode 100644 index 000000000..3738e82f8 --- /dev/null +++ b/core/http/endpoints/openai/realtime_chunker.go @@ -0,0 +1,200 @@ +package openai + +import ( + "strings" + "unicode" + "unicode/utf8" + + "github.com/rivo/uniseg" +) + +// Default clause-chunker bounds (in runes). minRunes gates only sub-sentence +// (clause-mark / Thai-space) cuts so we don't synthesize tiny choppy fragments; +// full sentences always flush regardless of length. maxRunes caps an +// unterminated run so a long punctuation-less span doesn't buffer unbounded. +const ( + defaultClauseMinRunes = 12 + defaultClauseMaxRunes = 200 +) + +// clauseChunker splits streamed LLM content into speakable clauses for +// incremental TTS, in a SCRIPT-AWARE way so it works for languages without +// whitespace word boundaries. It leans on UAX #29 sentence segmentation (which +// natively terminates on CJK 。!? as well as Latin .!?), adds CJK clause +// punctuation (,、;:) and Thai/Lao spaces as finer boundaries, and caps an +// over-long unterminated run via UAX #14 line-break opportunities. +// +// Unlike the old ASCII .!?/newline segmenter (dropped in 076dcdbe), it does not +// degrade to whole-message buffering for CJK (handled natively) or Thai/Lao +// (handled via spaces, which Thai uses at clause/sentence boundaries). Scripts +// that genuinely need a dictionary (Khmer/Myanmar) simply stay buffered until a +// space or end-of-message — no worse than the buffered default. +// +// It is not safe for concurrent use; callers feed it from a single goroutine +// (the LLM token callback). +type clauseChunker struct { + buf strings.Builder + minRunes int + maxRunes int +} + +func newClauseChunker(minRunes, maxRunes int) *clauseChunker { + return &clauseChunker{minRunes: minRunes, maxRunes: maxRunes} +} + +// push appends streamed content and returns any clauses that are now complete — +// "complete" meaning confirmed by following content, so we never speak a clause +// that the next token might extend. Incomplete trailing text stays buffered. +func (c *clauseChunker) push(text string) []string { + c.buf.WriteString(text) + return c.drain(false) +} + +// flush returns the remaining buffered clauses, treating end-of-input as a hard +// boundary, and clears the buffer. +func (c *clauseChunker) flush() []string { + return c.drain(true) +} + +func (c *clauseChunker) drain(final bool) []string { + s := c.buf.String() + rest := s + var out []string + for rest != "" { + end, ok := c.nextBoundary(rest, final) + if !ok { + break + } + if seg := strings.TrimSpace(rest[:end]); seg != "" { + out = append(out, seg) + } + rest = rest[end:] + } + // Rewriting the builder reallocates and copies the whole buffer; skip it on + // the common per-token call where no boundary was confirmed. + if len(rest) != len(s) { + c.buf.Reset() + c.buf.WriteString(rest) + } + return out +} + +// nextBoundary returns the byte offset just past the first emittable clause in +// s, or ok=false when more input is needed (final=false) and no boundary is +// confirmed yet. +func (c *clauseChunker) nextBoundary(s string, final bool) (int, bool) { + if s == "" { + return 0, false + } + + // 1) UAX #29 sentence boundary. When the first sentence is followed by more + // text it is a confirmed complete sentence (handles Latin .!? with + // abbreviation/decimal guards, and CJK 。!? with no whitespace). + sentence, rest, _ := uniseg.FirstSentenceInString(s, -1) + if rest != "" { + // Optionally cut finer inside the sentence at a clause boundary. + if cut, ok := c.firstClauseCut(sentence); ok { + return cut, true + } + return len(sentence), true + } + + // 2) Unterminated tail: look for a sub-sentence clause boundary (CJK + // punctuation or a Thai/Lao space) confirmed by following content. + if cut, ok := c.firstClauseCut(s); ok { + return cut, true + } + + // 3) Over-long punctuation-less run: force a typographically legal break so + // we don't buffer unbounded (e.g. a long CJK run with no punctuation). + if !final && c.maxRunes > 0 && utf8.RuneCountInString(s) > c.maxRunes { + if cut, ok := lineBreakCut(s, c.maxRunes); ok { + return cut, true + } + } + + // 4) End of input: emit whatever remains as the final clause. + if final { + return len(s), true + } + return 0, false +} + +// firstClauseCut returns the byte offset just past the first sub-sentence clause +// boundary in s — a CJK clause punctuation mark, or a space following a Thai/Lao +// letter — provided the prefix is at least minRunes long and non-space content +// follows. The boundary mark (and any trailing spaces) stay with the left clause. +func (c *clauseChunker) firstClauseCut(s string) (int, bool) { + var prev rune + runes := 0 + for i, r := range s { + boundary := isCJKClausePunct(r) || (unicode.IsSpace(r) && isThaiLao(prev)) + if boundary && runes+1 >= c.minRunes { + end := i + utf8.RuneLen(r) + for end < len(s) { + nr, sz := utf8.DecodeRuneInString(s[end:]) + if !unicode.IsSpace(nr) { + break + } + end += sz + } + if end < len(s) { // confirmed: real content follows the boundary + return end, true + } + // Boundary sits at the end of the buffer with nothing after it yet — + // wait for the next token to confirm it rather than emit early. + return 0, false + } + prev = r + runes++ + } + return 0, false +} + +// lineBreakCut walks UAX #14 line segments and returns the byte offset of the +// last legal break opportunity at or before maxRunes. Returns ok=false when the +// run has no internal break opportunity (e.g. a space-less Thai run), leaving it +// buffered. +func lineBreakCut(s string, maxRunes int) (int, bool) { + state := -1 + rest := s + consumed := 0 + runes := 0 + for rest != "" { + seg, rem, _, st := uniseg.FirstLineSegmentInString(rest, state) + state = st + runes += utf8.RuneCountInString(seg) + consumed += len(seg) + rest = rem + if runes >= maxRunes { + if consumed < len(s) { + return consumed, true + } + return 0, false + } + } + return 0, false +} + +// isCJKClausePunct reports whether r is a CJK clause-level separator worth a +// soft TTS break. Sentence terminators (。!?) are intentionally excluded — UAX +// #29 sentence segmentation already handles those. +func isCJKClausePunct(r rune) bool { + switch r { + case ',', // , fullwidth comma + '、', // 、 ideographic comma + ';', // ; fullwidth semicolon + ':', // : fullwidth colon + '・', // ・ katakana middle dot + '・': // ・ halfwidth katakana middle dot + return true + } + return false +} + +// isThaiLao reports whether r is a Thai or Lao letter. Those scripts have no +// inter-word spaces; an ASCII space inside such a run marks a clause/sentence +// boundary, which is the only no-dictionary segmentation signal available. +func isThaiLao(r rune) bool { + return unicode.Is(unicode.Thai, r) || unicode.Is(unicode.Lao, r) +} diff --git a/core/http/endpoints/openai/realtime_chunker_test.go b/core/http/endpoints/openai/realtime_chunker_test.go new file mode 100644 index 000000000..4446b2a11 --- /dev/null +++ b/core/http/endpoints/openai/realtime_chunker_test.go @@ -0,0 +1,103 @@ +package openai + +import ( + "strings" + "unicode/utf8" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// clauseChunker splits streamed LLM content into speakable clauses in a +// script-aware way: UAX#29 sentences (Latin .!? and CJK 。!?), CJK clause +// punctuation, and Thai/Lao spaces — never whitespace-splitting CJK. +var _ = Describe("clauseChunker", func() { + Context("Latin sentences", func() { + It("emits a sentence only once following content confirms it is complete", func() { + c := newClauseChunker(12, 200) + Expect(c.push("Hello world. How are you?")).To(Equal([]string{"Hello world."})) + // The trailing sentence is held until flush (the next token might extend it). + Expect(c.flush()).To(Equal([]string{"How are you?"})) + }) + + It("assembles a sentence across many small tokens", func() { + c := newClauseChunker(12, 200) + var got []string + for _, tok := range []string{"Hello", " world.", " How", " are", " you?"} { + got = append(got, c.push(tok)...) + } + got = append(got, c.flush()...) + Expect(got).To(Equal([]string{"Hello world.", "How are you?"})) + }) + + It("does not split decimals or abbreviations (UAX#29 SB6)", func() { + c := newClauseChunker(12, 200) + got := c.push("Pi is 3.14 and e is 2.72. Done") + Expect(got).To(Equal([]string{"Pi is 3.14 and e is 2.72."})) + Expect(c.flush()).To(Equal([]string{"Done"})) + }) + }) + + Context("CJK (no whitespace)", func() { + It("splits Chinese on the ideographic full stop", func() { + c := newClauseChunker(12, 200) + Expect(c.push("你好世界。今天天气很好。")).To(Equal([]string{"你好世界。"})) + Expect(c.flush()).To(Equal([]string{"今天天气很好。"})) + }) + + It("splits Japanese on the ideographic full stop", func() { + c := newClauseChunker(12, 200) + Expect(c.push("こんにちは。元気ですか。")).To(Equal([]string{"こんにちは。"})) + Expect(c.flush()).To(Equal([]string{"元気ですか。"})) + }) + + It("splits on CJK clause punctuation for lower latency", func() { + c := newClauseChunker(2, 200) // small min so short test clauses cut + Expect(c.push("你好,世界。再见")).To(Equal([]string{"你好,", "世界。"})) + Expect(c.flush()).To(Equal([]string{"再见"})) + }) + }) + + Context("Thai (spaces mark clauses, not words)", func() { + It("splits a Thai run on the inter-clause space", func() { + c := newClauseChunker(2, 200) + Expect(c.push("สวัสดีครับ กินข้าวไหม")).To(Equal([]string{"สวัสดีครับ"})) + Expect(c.flush()).To(Equal([]string{"กินข้าวไหม"})) + }) + + It("never shatters a space-less Thai run into characters", func() { + c := newClauseChunker(2, 200) + Expect(c.push("สวัสดีครับ")).To(BeEmpty()) // held, no boundary + Expect(c.flush()).To(Equal([]string{"สวัสดีครับ"})) + }) + }) + + Context("length cap (UAX#14 fallback)", func() { + It("force-breaks an over-long punctuation-less CJK run at legal points", func() { + c := newClauseChunker(4, 10) // maxRunes = 10 + run := strings.Repeat("字", 25) + got := c.push(run) + got = append(got, c.flush()...) + total := 0 + for _, seg := range got { + n := utf8.RuneCountInString(seg) + Expect(n).To(BeNumerically("<=", 10)) // never exceeds the cap + total += n + } + Expect(total).To(Equal(25)) // nothing dropped + Expect(len(got)).To(BeNumerically(">=", 3)) // 10 + 10 + 5 + }) + }) + + Context("buffer lifecycle", func() { + It("flush clears the buffer so the chunker is reusable", func() { + c := newClauseChunker(12, 200) + // "First one." is confirmed by the following "Second", so push drains it; + // only the unterminated tail remains for flush. + Expect(c.push("First one. Second")).To(Equal([]string{"First one."})) + Expect(c.flush()).To(Equal([]string{"Second"})) + Expect(c.flush()).To(BeEmpty()) + Expect(c.push("Again. More")).To(Equal([]string{"Again."})) + }) + }) +}) diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index f6c70e82d..909fc50dc 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -45,13 +45,15 @@ func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, } // onToken handles one streamed unit of model output, sending a transcript delta -// for the new content (reasoning stripped). For plain-content models the unit is -// the raw text token; for autoparser tool turns the backend clears the text and -// delivers content via ChatDeltas, so the caller passes that content here. -func (s *transcriptStreamer) onToken(token string) { +// for the new content (reasoning stripped) and returning that content delta so +// the caller can also feed it to the clause chunker. For plain-content models +// the unit is the raw text token; for autoparser tool turns the backend clears +// the text and delivers content via ChatDeltas, so the caller passes that +// content here. Returns "" when the token produced no new spoken content. +func (s *transcriptStreamer) onToken(token string) string { _, content := s.extractor.ProcessToken(token) if content == "" { - return + return "" } if !s.announced { s.announced = true @@ -67,6 +69,7 @@ func (s *transcriptStreamer) onToken(token string) { ContentIndex: 0, Delta: content, }) + return content } // content returns the full transcript so far with reasoning stripped. @@ -143,8 +146,52 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation } thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) - streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, llmCfg.ReasoningConfig) + // The autoparser (tokenizer-template path) already delivers reasoning-free + // content. Prefilling the thinking start token here would re-tag that clean + // content as an unclosed reasoning block, leaving CleanedContent() empty — + // no spoken reply, no TTS. Disable the prefill; closed tag pairs are still + // stripped (PEG-fallback case, #9985). + reasoningCfg := llmCfg.ReasoningConfig + if llmCfg.TemplateConfig.UseTokenizerTemplate { + disablePrefill := true + reasoningCfg.DisableReasoningTagPrefill = &disablePrefill + } + + streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, reasoningCfg) streamer.announce = announce + + // Clause chunking (opt-in): synthesize each clause as soon as it completes + // instead of buffering the whole reply. streamedAudio accumulates the PCM + // across clauses for the conversation item record; ttsErr captures the first + // synthesis failure so the token callback can stop the prediction. emitSpeech + // runs synchronously here — the LLM keeps generating into the gRPC stream + // while a clause is synthesized, so audio still starts mid-generation. + var chunker *clauseChunker + if session.ModelConfig != nil && session.ModelConfig.Pipeline.ChunkClauses() { + chunker = newClauseChunker(defaultClauseMinRunes, defaultClauseMaxRunes) + } + var streamedAudio []byte + var ttsErr error + speakClause := func(clause string) error { + a, err := emitSpeech(ctx, t, session, responseID, itemID, clause) + if err != nil { + return err + } + streamedAudio = append(streamedAudio, a...) + return nil + } + + // fail reports a mid-stream failure. A cancelled context means the client + // interrupted (barge-in), so roll the turn back instead of erroring. + fail := func(code, msg string, err error) bool { + if ctx.Err() != nil { + cancel() + } else { + sendError(t, code, fmt.Sprintf("%s: %v", msg, err), "", itemID) + } + return true + } + cb := func(token string, usage backend.TokenUsage) bool { if ctx.Err() != nil { return false @@ -157,7 +204,14 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation if len(usage.ChatDeltas) > 0 { text = functions.ContentFromChatDeltas(usage.ChatDeltas) } - streamer.onToken(text) + delta := streamer.onToken(text) + if chunker != nil && delta != "" { + for _, clause := range chunker.push(delta) { + if ttsErr = speakClause(clause); ttsErr != nil { + return false // stop the prediction; reported after predFunc returns + } + } + } return true } @@ -167,13 +221,13 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation return true } pred, err := predFunc() + // A clause synthesis failed mid-stream (the callback stopped the prediction); + // report it as a TTS error rather than a prediction error. + if ttsErr != nil { + return fail("tts_error", "TTS generation failed", ttsErr) + } if err != nil { - if ctx.Err() != nil { - cancel() - return true - } - sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", itemID) - return true + return fail("prediction_failed", "backend error", err) } if ctx.Err() != nil { cancel() @@ -189,17 +243,25 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation if !announced { announce() } - // Buffer the whole message, then synthesize it once. emitSpeech streams - // the audio chunks when the TTS backend supports TTSStream, otherwise it - // sends a single unary delta — no per-sentence segmentation either way. - audio, err := emitSpeech(ctx, t, session, responseID, itemID, content) - if err != nil { - if ctx.Err() != nil { - cancel() - return true + + // Synthesize the audio. With clause chunking the completed clauses were + // already spoken inside the token callback; flush the trailing clause(s) + // the segmenter was still holding. Otherwise buffer the whole message and + // synthesize it once. emitSpeech streams the audio chunks when the TTS + // backend supports TTSStream, otherwise it sends a single unary delta. + var audio []byte + if chunker != nil { + for _, clause := range chunker.flush() { + if ttsErr = speakClause(clause); ttsErr != nil { + break + } } - sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", itemID) - return true + audio = streamedAudio + } else { + audio, ttsErr = emitSpeech(ctx, t, session, responseID, itemID, content) + } + if ttsErr != nil { + return fail("tts_error", "TTS generation failed", ttsErr) } _, isWebRTC := t.(*WebRTCTransport) diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go index f7042f772..5150feb21 100644 --- a/core/http/endpoints/openai/realtime_stream_test.go +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -46,6 +46,38 @@ var _ = Describe("transcriptStreamer", func() { Expect(s.content()).ToNot(ContainSubstring("secret plan")) Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan")) }) + + It("does not swallow autoparser content when the template has a thinking start token (tokenizer-template path)", func() { + // Regression: with tag prefill on, the detected token is + // prepended to the autoparser's already-clean content, swallowing the + // whole reply (empty transcript → no TTS). streamLLMResponse disables + // the prefill for the tokenizer-template path. + disablePrefill := true + t := &fakeTransport{} + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", + reasoning.Config{DisableReasoningTagPrefill: &disablePrefill}) + + s.onToken("Hello") + s.onToken(" there.") + + Expect(s.content()).To(Equal("Hello there.")) + Expect(t.transcriptDeltaText()).To(Equal("Hello there.")) + }) + + It("still strips embedded closed reasoning tags with prefill disabled (PEG-fallback safety, #9985)", func() { + // Disabling prefill must not stop stripping closed + // pairs the PEG fallback can leave in autoparser content. + disablePrefill := true + t := &fakeTransport{} + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", + reasoning.Config{DisableReasoningTagPrefill: &disablePrefill}) + + s.onToken("secret") + s.onToken("The answer is 42.") + + Expect(s.content()).To(Equal("The answer is 42.")) + Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret")) + }) }) // streamLLMResponse drives a full streamed realtime turn: live transcript @@ -81,6 +113,37 @@ var _ = Describe("streamLLMResponse", func() { Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) }) + It("synthesizes each clause as it completes when clause chunking is enabled", func() { + on := true + m := &fakeModel{ + predictTokens: []string{"Hello world.", " How are you?"}, + predictResp: backend.LLMResponse{Response: "Hello world. How are you?"}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on, ClauseChunking: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // Two clauses ("Hello world." mid-stream, "How are you?" on flush) → two + // emitSpeech calls → two audio deltas, vs one for whole-message buffering. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2)) + // The full transcript still streams verbatim. + Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) + // Exactly one terminal response.done. + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) + It("streams content deltas and emits tool-call items (autoparser tool turn)", func() { on := true // Autoparser path: reply.Message is empty; content + tool calls arrive via diff --git a/core/http/react-ui/src/pages/Talk.jsx b/core/http/react-ui/src/pages/Talk.jsx index cf92102ac..04af7eb4e 100644 --- a/core/http/react-ui/src/pages/Talk.jsx +++ b/core/http/react-ui/src/pages/Talk.jsx @@ -17,6 +17,24 @@ const STATUS_STYLES = { error: { icon: 'fa-solid fa-circle', color: 'var(--color-error)', bg: 'var(--color-error-light)' }, } +// upsertAssistant merges a streamed transcript fragment into the assistant entry +// identified by the server's item_id, or appends a new entry if none exists yet. +// Keying by item_id (not a mutable index tracked across handler/updater +// boundaries) makes streamed deltas idempotent and order-independent, so React's +// batching of non-React data-channel events cannot produce a duplicate bubble. +// mode 'append' adds to the running text; 'replace' sets the final transcript. +function upsertAssistant(prev, itemId, text, mode) { + // Only assistant entries carry an id, and the streaming entry is almost + // always the newest — search from the tail so per-delta cost stays constant. + const i = prev.findLastIndex(e => e.id === itemId) + if (i === -1) { + return [...prev, { role: 'assistant', id: itemId, text }] + } + const next = [...prev] + next[i] = { ...next[i], text: mode === 'append' ? next[i].text + text : text } + return next +} + export default function Talk() { const { addToast } = useOutletContext() const navigate = useNavigate() @@ -34,7 +52,10 @@ export default function Talk() { // Transcript const [transcript, setTranscript] = useState([]) - const streamingRef = useRef(null) // tracks the index of the in-progress assistant message + // item_id of the assistant message currently streaming — used only to remove + // its partial bubble when a response is cancelled (barge-in). The transcript + // itself is keyed by item_id via upsertAssistant, not by this ref. + const inProgressIdRef = useRef(null) // Session settings const [instructions, setInstructions] = useState( @@ -227,39 +248,21 @@ export default function Talk() { break case 'conversation.item.input_audio_transcription.completed': if (event.transcript) { - streamingRef.current = null setTranscript(prev => [...prev, { role: 'user', text: event.transcript }]) } updateStatus('thinking', 'Generating response...') break case 'response.output_audio_transcript.delta': if (event.delta) { - setTranscript(prev => { - if (streamingRef.current !== null) { - const updated = [...prev] - updated[streamingRef.current] = { - ...updated[streamingRef.current], - text: updated[streamingRef.current].text + event.delta, - } - return updated - } - streamingRef.current = prev.length - return [...prev, { role: 'assistant', text: event.delta }] - }) + inProgressIdRef.current = event.item_id + setTranscript(prev => upsertAssistant(prev, event.item_id, event.delta, 'append')) } break case 'response.output_audio_transcript.done': if (event.transcript) { - setTranscript(prev => { - if (streamingRef.current !== null) { - const updated = [...prev] - updated[streamingRef.current] = { ...updated[streamingRef.current], text: event.transcript } - return updated - } - return [...prev, { role: 'assistant', text: event.transcript }] - }) + setTranscript(prev => upsertAssistant(prev, event.item_id, event.transcript, 'replace')) } - streamingRef.current = null + inProgressIdRef.current = null break case 'response.output_audio.delta': updateStatus('speaking', 'Speaking...') @@ -281,7 +284,7 @@ export default function Talk() { // Pretty-print JSON for readability; fall back to raw string. try { preview = JSON.stringify(JSON.parse(preview), null, 2) } catch (_) { /* keep raw */ } setTranscript(prev => [...prev, { role: 'tool_result', text: preview }]) - streamingRef.current = null // tool result ends the current assistant text run + inProgressIdRef.current = null // tool result ends the current assistant text run } break } @@ -290,9 +293,20 @@ export default function Talk() { // conversation.item.create + response.create when it's done. handleFunctionCall(event) break - case 'response.done': + case 'response.done': { + // A cancelled response (barge-in / interruption) leaves a partial, + // incrementally-streamed assistant bubble behind. The server discards + // the interrupted item from history; mirror that here (remove the + // in-progress assistant entry by item_id) so the regenerated reply + // doesn't show up as a second assistant message. + if (event.response?.status === 'cancelled' && inProgressIdRef.current) { + const id = inProgressIdRef.current + inProgressIdRef.current = null + setTranscript(prev => prev.filter(e => e.id !== id)) + } updateStatus('listening', 'Listening...') break + } case 'error': hasErrorRef.current = true updateStatus('error', 'Error: ' + (event.error?.message || 'Unknown error')) @@ -789,7 +803,7 @@ export default function Talk() { const iconColor = isToolCall || isToolResult ? 'var(--color-text-secondary)' : isUser ? 'var(--color-primary)' : 'var(--color-accent)' return ( -
+