diff --git a/core/http/endpoints/openai/realtime_segmenter.go b/core/http/endpoints/openai/realtime_segmenter.go new file mode 100644 index 000000000..77116229f --- /dev/null +++ b/core/http/endpoints/openai/realtime_segmenter.go @@ -0,0 +1,61 @@ +package openai + +import "strings" + +// streamSegmenter accumulates streamed LLM text and emits complete utterance +// segments (sentence/clause boundaries) so the realtime pipeline can hand each +// segment to TTS as soon as it's complete, overlapping generation, synthesis +// and playback instead of waiting for the whole reply. +// +// A segment is committed when a sentence terminator (. ! ?) is followed by +// whitespace, or at a newline. Terminators not followed by whitespace (e.g. +// decimals like "3.14" mid-stream) stay buffered until more text arrives or the +// stream is flushed. +type streamSegmenter struct { + buf strings.Builder +} + +func isSentenceTerminator(b byte) bool { + return b == '.' || b == '!' || b == '?' +} + +func isSpace(b byte) bool { + return b == ' ' || b == '\t' || b == '\n' || b == '\r' +} + +// Push appends text to the buffer and returns any newly-completed segments, +// trimmed of surrounding whitespace. Incomplete trailing text stays buffered. +func (s *streamSegmenter) Push(text string) []string { + s.buf.WriteString(text) + cur := s.buf.String() + + var segments []string + start := 0 + for i := 0; i < len(cur); i++ { + cut := -1 + switch { + case cur[i] == '\n': + cut = i // segment excludes the newline + case isSentenceTerminator(cur[i]) && i+1 < len(cur) && isSpace(cur[i+1]): + cut = i + 1 // segment includes the terminator + } + if cut >= 0 { + if seg := strings.TrimSpace(cur[start:cut]); seg != "" { + segments = append(segments, seg) + } + start = cut + } + } + + rem := cur[start:] + s.buf.Reset() + s.buf.WriteString(rem) + return segments +} + +// Flush returns the remaining buffered text (trimmed) and clears the buffer. +func (s *streamSegmenter) Flush() string { + seg := strings.TrimSpace(s.buf.String()) + s.buf.Reset() + return seg +} diff --git a/core/http/endpoints/openai/realtime_segmenter_test.go b/core/http/endpoints/openai/realtime_segmenter_test.go new file mode 100644 index 000000000..13d9a8c78 --- /dev/null +++ b/core/http/endpoints/openai/realtime_segmenter_test.go @@ -0,0 +1,41 @@ +package openai + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// streamSegmenter turns a stream of LLM token text into complete sentence/clause +// segments so TTS can start synthesizing before the full reply is generated. +var _ = Describe("streamSegmenter", func() { + It("buffers partial text until a sentence terminator followed by space", func() { + var s streamSegmenter + Expect(s.Push("Hello")).To(BeEmpty()) + Expect(s.Push(" world")).To(BeEmpty()) + Expect(s.Push(". ")).To(Equal([]string{"Hello world."})) + }) + + It("emits each complete sentence and keeps the trailing partial buffered", func() { + var s streamSegmenter + Expect(s.Push("One. Two! Three")).To(Equal([]string{"One.", "Two!"})) + Expect(s.Flush()).To(Equal("Three")) + }) + + It("splits on newlines", func() { + var s streamSegmenter + Expect(s.Push("Line one\nLine two")).To(Equal([]string{"Line one"})) + Expect(s.Flush()).To(Equal("Line two")) + }) + + It("does not split decimals or mid-token punctuation", func() { + var s streamSegmenter + Expect(s.Push("Pi is 3.14 today")).To(BeEmpty()) + Expect(s.Flush()).To(Equal("Pi is 3.14 today")) + }) + + It("flushes to empty when the buffer holds only consumed text", func() { + var s streamSegmenter + s.Push("Done. ") + Expect(s.Flush()).To(Equal("")) + }) +})