From 16d7704a692f6f0d02d09f7345764e351c33ca8a Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:03:03 +0000 Subject: [PATCH] feat(realtime): pipeline streaming + disable_thinking config Add a nested pipeline.streaming.{llm,tts,transcription} block plus pipeline.disable_thinking, with StreamLLM/StreamTTS/StreamTranscription/ ThinkingDisabled helpers. Pointer-bools so unset keeps the unary path; existing configs are unaffected. Wiring into the realtime handler follows. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/config/model_config.go | 33 ++++++++++++++++ core/config/pipeline_streaming_test.go | 54 ++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 core/config/pipeline_streaming_test.go diff --git a/core/config/model_config.go b/core/config/model_config.go index 9980c92e8..241ed2d49 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -499,6 +499,16 @@ type Pipeline struct { // the pipeline's LLM without editing the LLM model config. Overrides the LLM's // own reasoning_effort. Unset leaves the LLM model config in charge. ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"` + + // Streaming opts each pipeline stage into incremental delivery (LLM tokens, + // TTS audio chunks, transcription text). Unset stages keep the blocking + // unary path, so existing configs are unaffected. + Streaming PipelineStreaming `yaml:"streaming,omitempty" json:"streaming,omitempty"` + + // DisableThinking suppresses reasoning/thinking for the pipeline LLM (maps + // to enable_thinking=false backend metadata) without editing the underlying + // LLM model config. Unset leaves the LLM model config in charge. + DisableThinking *bool `yaml:"disable_thinking,omitempty" json:"disable_thinking,omitempty"` } // ApplyReasoningEffort resolves the effective reasoning effort — a per-request @@ -530,6 +540,29 @@ func (c *ModelConfig) ApplyReasoningEffort(requestEffort string) { } } +// @Description PipelineStreaming toggles incremental delivery per realtime stage. +type PipelineStreaming struct { + LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"` + TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"` + Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"` +} + +// StreamLLM reports whether LLM tokens should be streamed for this pipeline. +func (p Pipeline) StreamLLM() bool { return p.Streaming.LLM != nil && *p.Streaming.LLM } + +// StreamTTS reports whether TTS audio should be streamed for this pipeline. +func (p Pipeline) StreamTTS() bool { return p.Streaming.TTS != nil && *p.Streaming.TTS } + +// StreamTranscription reports whether transcription text should be streamed. +func (p Pipeline) StreamTranscription() bool { + return p.Streaming.Transcription != nil && *p.Streaming.Transcription +} + +// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off. +func (p Pipeline) ThinkingDisabled() bool { + return p.DisableThinking != nil && *p.DisableThinking +} + // @Description File configuration for model downloads type File struct { Filename string `yaml:"filename,omitempty" json:"filename,omitempty"` diff --git a/core/config/pipeline_streaming_test.go b/core/config/pipeline_streaming_test.go new file mode 100644 index 000000000..a6bec5ee4 --- /dev/null +++ b/core/config/pipeline_streaming_test.go @@ -0,0 +1,54 @@ +package config + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "gopkg.in/yaml.v3" +) + +// The realtime pipeline can stream each stage (LLM tokens, TTS audio, +// transcription text) and can disable model "thinking" for the LLM. These are +// opt-in per pipeline; everything defaults to off so existing configs keep the +// unary behaviour. +var _ = Describe("Pipeline streaming config", func() { + It("defaults every streaming + thinking helper to false when unset", func() { + var p Pipeline + Expect(p.StreamLLM()).To(BeFalse()) + Expect(p.StreamTTS()).To(BeFalse()) + Expect(p.StreamTranscription()).To(BeFalse()) + Expect(p.ThinkingDisabled()).To(BeFalse()) + }) + + It("parses the nested streaming block and disable_thinking from YAML", func() { + var c ModelConfig + err := yaml.Unmarshal([]byte(` +name: gpt-realtime +pipeline: + llm: my-llm + tts: my-tts + transcription: my-stt + streaming: + llm: true + tts: true + transcription: true + disable_thinking: true +`), &c) + Expect(err).ToNot(HaveOccurred()) + Expect(c.Pipeline.StreamLLM()).To(BeTrue()) + Expect(c.Pipeline.StreamTTS()).To(BeTrue()) + Expect(c.Pipeline.StreamTranscription()).To(BeTrue()) + Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue()) + }) + + It("treats an explicit false in the streaming block as disabled", func() { + var c ModelConfig + err := yaml.Unmarshal([]byte(` +name: gpt-realtime +pipeline: + streaming: + tts: false +`), &c) + Expect(err).ToNot(HaveOccurred()) + Expect(c.Pipeline.StreamTTS()).To(BeFalse()) + }) +})