diff --git a/core/config/meta/registry.go b/core/config/meta/registry.go index 548b21892..1b4cd8580 100644 --- a/core/config/meta/registry.go +++ b/core/config/meta/registry.go @@ -308,6 +308,41 @@ func DefaultRegistry() map[string]FieldMetaOverride { }, Order: 64, }, + "pipeline.disable_thinking": { + Section: "pipeline", + Label: "Disable Thinking", + Description: "Suppress reasoning/thinking output from the pipeline LLM (sets enable_thinking=false on the underlying model). Use for models that emit blocks you don't want spoken or streamed back to the realtime client.", + Component: "toggle", + Order: 65, + }, + "pipeline.streaming.llm": { + Section: "pipeline", + Label: "Stream LLM", + Description: "Stream LLM tokens to the realtime client as they are generated instead of waiting for the full response. Emits incremental response.output_audio_transcript.delta / text deltas.", + Component: "toggle", + Order: 66, + }, + "pipeline.streaming.tts": { + Section: "pipeline", + Label: "Stream TTS", + Description: "Stream synthesized audio chunks to the realtime client as they are produced (requires a TTS backend that implements TTSStream). Falls back to unary synthesis otherwise.", + Component: "toggle", + Order: 67, + }, + "pipeline.streaming.transcription": { + Section: "pipeline", + Label: "Stream Transcription", + Description: "Stream partial transcription text to the realtime client as the STT backend produces it (requires a transcription backend that implements AudioTranscriptionStream). Falls back to unary transcription otherwise.", + Component: "toggle", + Order: 68, + }, + "pipeline.streaming.clause_chunking": { + Section: "pipeline", + Label: "Clause Chunking", + Description: "Split the streamed reply into speakable clauses and synthesize each as soon as it completes, instead of buffering the whole message before TTS — lower time-to-first-audio. Script-aware (handles CJK 。!? and Thai/Lao spaces), so it does not whitespace-split. Requires Stream LLM; off buffers the whole message.", + Component: "toggle", + Order: 69, + }, // --- Functions --- "function.grammar.parallel_calls": { diff --git a/core/config/model_config.go b/core/config/model_config.go index 9980c92e8..c5b6cde4b 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -499,6 +499,16 @@ type Pipeline struct { // the pipeline's LLM without editing the LLM model config. Overrides the LLM's // own reasoning_effort. Unset leaves the LLM model config in charge. ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"` + + // Streaming opts each pipeline stage into incremental delivery (LLM tokens, + // TTS audio chunks, transcription text). Unset stages keep the blocking + // unary path, so existing configs are unaffected. + Streaming PipelineStreaming `yaml:"streaming,omitempty" json:"streaming,omitempty"` + + // DisableThinking suppresses reasoning/thinking for the pipeline LLM (maps + // to enable_thinking=false backend metadata) without editing the underlying + // LLM model config. Unset leaves the LLM model config in charge. + DisableThinking *bool `yaml:"disable_thinking,omitempty" json:"disable_thinking,omitempty"` } // ApplyReasoningEffort resolves the effective reasoning effort — a per-request @@ -530,6 +540,41 @@ func (c *ModelConfig) ApplyReasoningEffort(requestEffort string) { } } +// @Description PipelineStreaming toggles incremental delivery per realtime stage. +type PipelineStreaming struct { + LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"` + TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"` + Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"` + // ClauseChunking splits the streamed LLM reply into speakable clauses and + // synthesizes each as soon as it completes, instead of buffering the whole + // message before TTS. Script-aware (CJK/Thai), so it does not rely on + // whitespace sentence boundaries. Requires LLM streaming; unset buffers the + // whole message (today's default). + ClauseChunking *bool `yaml:"clause_chunking,omitempty" json:"clause_chunking,omitempty"` +} + +// StreamLLM reports whether LLM tokens should be streamed for this pipeline. +func (p Pipeline) StreamLLM() bool { return p.Streaming.LLM != nil && *p.Streaming.LLM } + +// StreamTTS reports whether TTS audio should be streamed for this pipeline. +func (p Pipeline) StreamTTS() bool { return p.Streaming.TTS != nil && *p.Streaming.TTS } + +// StreamTranscription reports whether transcription text should be streamed. +func (p Pipeline) StreamTranscription() bool { + return p.Streaming.Transcription != nil && *p.Streaming.Transcription +} + +// ChunkClauses reports whether the streamed reply should be split into +// script-aware clauses and synthesized incrementally rather than buffered whole. +func (p Pipeline) ChunkClauses() bool { + return p.Streaming.ClauseChunking != nil && *p.Streaming.ClauseChunking +} + +// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off. +func (p Pipeline) ThinkingDisabled() bool { + return p.DisableThinking != nil && *p.DisableThinking +} + // @Description File configuration for model downloads type File struct { Filename string `yaml:"filename,omitempty" json:"filename,omitempty"` diff --git a/core/config/pipeline_streaming_test.go b/core/config/pipeline_streaming_test.go new file mode 100644 index 000000000..c9c5812c4 --- /dev/null +++ b/core/config/pipeline_streaming_test.go @@ -0,0 +1,57 @@ +package config + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "gopkg.in/yaml.v3" +) + +// The realtime pipeline can stream each stage (LLM tokens, TTS audio, +// transcription text) and can disable model "thinking" for the LLM. These are +// opt-in per pipeline; everything defaults to off so existing configs keep the +// unary behaviour. +var _ = Describe("Pipeline streaming config", func() { + It("defaults every streaming + thinking helper to false when unset", func() { + var p Pipeline + Expect(p.StreamLLM()).To(BeFalse()) + Expect(p.StreamTTS()).To(BeFalse()) + Expect(p.StreamTranscription()).To(BeFalse()) + Expect(p.ChunkClauses()).To(BeFalse()) + Expect(p.ThinkingDisabled()).To(BeFalse()) + }) + + It("parses the nested streaming block and disable_thinking from YAML", func() { + var c ModelConfig + err := yaml.Unmarshal([]byte(` +name: gpt-realtime +pipeline: + llm: my-llm + tts: my-tts + transcription: my-stt + streaming: + llm: true + tts: true + transcription: true + clause_chunking: true + disable_thinking: true +`), &c) + Expect(err).ToNot(HaveOccurred()) + Expect(c.Pipeline.StreamLLM()).To(BeTrue()) + Expect(c.Pipeline.StreamTTS()).To(BeTrue()) + Expect(c.Pipeline.StreamTranscription()).To(BeTrue()) + Expect(c.Pipeline.ChunkClauses()).To(BeTrue()) + Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue()) + }) + + It("treats an explicit false in the streaming block as disabled", func() { + var c ModelConfig + err := yaml.Unmarshal([]byte(` +name: gpt-realtime +pipeline: + streaming: + tts: false +`), &c) + Expect(err).ToNot(HaveOccurred()) + Expect(c.Pipeline.StreamTTS()).To(BeFalse()) + }) +}) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 9bd40679c..d2e2413c6 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -2,8 +2,10 @@ package openai import ( "context" + "crypto/rand" "encoding/base64" "encoding/binary" + "encoding/hex" "encoding/json" "fmt" "math" @@ -235,6 +237,12 @@ type Model interface { Transcribe(ctx context.Context, audio, language string, translate bool, diarize bool, prompt string) (*schema.TranscriptionResult, error) Predict(ctx context.Context, messages schema.Messages, images, videos, audios []string, tokenCallback func(string, backend.TokenUsage) bool, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, logprobs *int, topLogprobs *int, logitBias map[string]float64) (func() (backend.LLMResponse, error), error) TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error) + // TTSStream synthesizes speech incrementally, invoking onAudio with raw PCM + // chunks (and the backend sample rate) as they are produced. + TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error + // TranscribeStream transcribes audio incrementally, invoking onDelta for each + // transcript text fragment and returning the final aggregated result. + TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) PredictConfig() *config.ModelConfig } @@ -1254,27 +1262,15 @@ func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Co // TODO: If we have a real any-to-any model then transcription is optional var transcript string if session.InputAudioTranscription != nil { - tr, err := session.ModelInterface.Transcribe(ctx, f.Name(), session.InputAudioTranscription.Language, false, false, session.InputAudioTranscription.Prompt) + // emitTranscription streams transcript deltas when + // pipeline.streaming.transcription is set, otherwise emits a single + // completed event; either way it returns the final transcript text. + var err error + transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name()) if err != nil { sendError(t, "transcription_failed", err.Error(), "", "event_TODO") return - } else if tr == nil { - sendError(t, "transcription_failed", "trancribe result is nil", "", "event_TODO") - return } - - transcript = tr.Text - sendEvent(t, types.ConversationItemInputAudioTranscriptionCompletedEvent{ - ServerEventBase: types.ServerEventBase{ - EventID: "event_TODO", - }, - - ItemID: generateItemID(), - // ResponseID: "resp_TODO", // Not needed for transcription completed event - // OutputIndex: 0, - ContentIndex: 0, - Transcript: transcript, - }) } else { sendNotImplemented(t, "any-to-any models") return @@ -1502,6 +1498,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }, }) + // Streamed LLM path: when the pipeline opts into LLM streaming, stream the + // transcript to the client as it is generated and synthesize the buffered + // message once. Tool turns are supported only when the model uses its + // tokenizer template: the C++ autoparser then delivers content and tool + // calls via ChatDeltas (clearing the text stream), so the spoken transcript + // never leaks tool-call tokens. Grammar-based function calling emits the + // call as JSON in the token stream, so those turns keep the buffered path. + if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() { + canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate + var respMods []types.Modality + if overrides != nil { + respMods = overrides.OutputModalities + } + if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) { + if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) { + return + } + } + } + predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil) if err != nil { sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here @@ -1579,7 +1595,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa // ExtractReasoningWithConfig is a no-op when no tag pair matches, // so it's safe to apply unconditionally in the no-reasoning branch. if deltaReasoning == "" && deltaContent != "" { - deltaReasoning, deltaContent = reasoning.ExtractReasoningComplete(deltaContent, thinkingStartToken, config.ReasoningConfig) + deltaReasoning, deltaContent = reasoning.ExtractReasoningComplete(deltaContent, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig)) } reasoningText = deltaReasoning responseWithoutReasoning = deltaContent @@ -1587,7 +1603,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa cleanedResponse = deltaContent toolCalls = deltaToolCalls } else { - reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningComplete(rawResponse, thinkingStartToken, config.ReasoningConfig) + reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningComplete(rawResponse, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig)) textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig) cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig) toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig) @@ -1713,64 +1729,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa return } - audioFilePath, res, err := session.ModelInterface.TTS(ctx, finalSpeech, session.Voice, session.InputAudioTranscription.Language) - if err != nil { - if ctx.Err() != nil { - xlog.Debug("TTS cancelled (barge-in)") - sendCancelledResponse() - return - } - xlog.Error("TTS failed", "error", err) - sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) - return - } - if !res.Success { - xlog.Error("TTS failed", "message", res.Message) - sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %s", res.Message), "", item.Assistant.ID) - return - } - defer func() { _ = os.Remove(audioFilePath) }() - - audioBytes, err := os.ReadFile(audioFilePath) - if err != nil { - xlog.Error("failed to read TTS file", "error", err) - sendError(t, "tts_error", fmt.Sprintf("Failed to read TTS audio: %v", err), "", item.Assistant.ID) - return - } - - // Parse WAV header to get raw PCM and the actual sample rate from the TTS backend. - pcmData, ttsSampleRate := laudio.ParseWAV(audioBytes) - if ttsSampleRate == 0 { - ttsSampleRate = localSampleRate - } - xlog.Debug("TTS audio parsed", "raw_bytes", len(audioBytes), "pcm_bytes", len(pcmData), "sample_rate", ttsSampleRate) - - // SendAudio (WebRTC) passes PCM at the TTS sample rate directly to the - // Opus encoder, which resamples to 48kHz internally. This avoids a - // lossy intermediate resample through 16kHz. - // XXX: This is a noop in websocket mode; it's included in the JSON instead - if err := t.SendAudio(ctx, pcmData, ttsSampleRate); err != nil { - if ctx.Err() != nil { - xlog.Debug("Audio playback cancelled (barge-in)") - sendCancelledResponse() - return - } - xlog.Error("failed to send audio via transport", "error", err) - } - - // For WebSocket clients, resample to the session's output rate and - // deliver audio as base64 in JSON events. WebRTC clients already - // received audio over the RTP track, so skip the base64 payload. - if !isWebRTC { - wsPCM := pcmData - if ttsSampleRate != session.OutputSampleRate { - samples := sound.BytesToInt16sLE(pcmData) - resampled := sound.ResampleInt16(samples, ttsSampleRate, session.OutputSampleRate) - wsPCM = sound.Int16toBytesLE(resampled) - } - audioString = base64.StdEncoding.EncodeToString(wsPCM) - } - + // Transcript of the spoken reply (the audio's text). sendEvent(t, types.ResponseOutputAudioTranscriptDeltaEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, @@ -1788,15 +1747,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa Transcript: finalSpeech, }) + // Synthesize and send the audio. With pipeline.streaming.tts enabled + // emitSpeech forwards a response.output_audio.delta per backend PCM + // chunk as it's produced; otherwise it sends the whole utterance as a + // single delta. The returned PCM is stored (base64) on the item below. + pcmAudio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech) + if err != nil { + if ctx.Err() != nil { + xlog.Debug("TTS cancelled (barge-in)") + sendCancelledResponse() + return + } + xlog.Error("TTS failed", "error", err) + sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) + return + } + if !isWebRTC { + audioString = base64.StdEncoding.EncodeToString(pcmAudio) + } + if !isWebRTC { - sendEvent(t, types.ResponseOutputAudioDeltaEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Delta: audioString, - }) sendEvent(t, types.ResponseOutputAudioDoneEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, @@ -1849,17 +1819,27 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }) } - // Handle Tool Calls. Two paths: - // - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run - // server-side; we append both the call and its output to conv.Items - // and re-trigger a follow-up response so the model can speak the - // result. The client only sees observability events. - // - All other tools follow the standard OpenAI flow: emit - // function_call_arguments.done and wait for the client to send - // conversation.item.create back. - xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls)) + // Emit the parsed tool calls, the terminal response.done, and (for + // server-side assistant tools) the follow-up response. Shared with the + // streamed path so both finalize tool calls identically. + emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn) +} + +// emitToolCallItems emits the realtime function_call items for the parsed tool +// calls, the terminal response.done, and — for server-side LocalAI Assistant +// tools — re-triggers a follow-up response so the model can speak the result. +// hasContent shifts the tool-call output index past the assistant content item +// when the same turn also produced spoken/text content. Two tool paths: +// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side; +// we append both the call and its output to conv.Items and re-trigger. The +// client only sees observability events. +// - All other tools follow the standard OpenAI flow: emit +// function_call_arguments.done and wait for the client to send +// conversation.item.create back. +func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) { + xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls)) executedAssistantTool := false - for i, tc := range finalToolCalls { + for i, tc := range toolCalls { toolCallID := generateItemID() callID := "call_" + generateUniqueID() // OpenAI uses call_xyz @@ -1879,7 +1859,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa conv.Lock.Unlock() outputIndex := i - if finalSpeech != "" { + if hasContent { outputIndex++ } @@ -2005,8 +1985,11 @@ func generateItemID() string { } func generateUniqueID() string { - // Generate a unique ID string - // For simplicity, use a counter or UUID - // Implement as needed - return "unique_id" + // 16 random bytes, hex-encoded. Must be collision-free: session, item, + // response and call IDs build on this, and the conversation tracks/removes + // items by ID (e.g. cancel() in realtime_stream.go, conversation.item.retrieve). + // A constant would make every ID alias and corrupt that bookkeeping. + var b [16]byte + _, _ = rand.Read(b[:]) + return hex.EncodeToString(b[:]) } diff --git a/core/http/endpoints/openai/realtime_chunker.go b/core/http/endpoints/openai/realtime_chunker.go new file mode 100644 index 000000000..3738e82f8 --- /dev/null +++ b/core/http/endpoints/openai/realtime_chunker.go @@ -0,0 +1,200 @@ +package openai + +import ( + "strings" + "unicode" + "unicode/utf8" + + "github.com/rivo/uniseg" +) + +// Default clause-chunker bounds (in runes). minRunes gates only sub-sentence +// (clause-mark / Thai-space) cuts so we don't synthesize tiny choppy fragments; +// full sentences always flush regardless of length. maxRunes caps an +// unterminated run so a long punctuation-less span doesn't buffer unbounded. +const ( + defaultClauseMinRunes = 12 + defaultClauseMaxRunes = 200 +) + +// clauseChunker splits streamed LLM content into speakable clauses for +// incremental TTS, in a SCRIPT-AWARE way so it works for languages without +// whitespace word boundaries. It leans on UAX #29 sentence segmentation (which +// natively terminates on CJK 。!? as well as Latin .!?), adds CJK clause +// punctuation (,、;:) and Thai/Lao spaces as finer boundaries, and caps an +// over-long unterminated run via UAX #14 line-break opportunities. +// +// Unlike the old ASCII .!?/newline segmenter (dropped in 076dcdbe), it does not +// degrade to whole-message buffering for CJK (handled natively) or Thai/Lao +// (handled via spaces, which Thai uses at clause/sentence boundaries). Scripts +// that genuinely need a dictionary (Khmer/Myanmar) simply stay buffered until a +// space or end-of-message — no worse than the buffered default. +// +// It is not safe for concurrent use; callers feed it from a single goroutine +// (the LLM token callback). +type clauseChunker struct { + buf strings.Builder + minRunes int + maxRunes int +} + +func newClauseChunker(minRunes, maxRunes int) *clauseChunker { + return &clauseChunker{minRunes: minRunes, maxRunes: maxRunes} +} + +// push appends streamed content and returns any clauses that are now complete — +// "complete" meaning confirmed by following content, so we never speak a clause +// that the next token might extend. Incomplete trailing text stays buffered. +func (c *clauseChunker) push(text string) []string { + c.buf.WriteString(text) + return c.drain(false) +} + +// flush returns the remaining buffered clauses, treating end-of-input as a hard +// boundary, and clears the buffer. +func (c *clauseChunker) flush() []string { + return c.drain(true) +} + +func (c *clauseChunker) drain(final bool) []string { + s := c.buf.String() + rest := s + var out []string + for rest != "" { + end, ok := c.nextBoundary(rest, final) + if !ok { + break + } + if seg := strings.TrimSpace(rest[:end]); seg != "" { + out = append(out, seg) + } + rest = rest[end:] + } + // Rewriting the builder reallocates and copies the whole buffer; skip it on + // the common per-token call where no boundary was confirmed. + if len(rest) != len(s) { + c.buf.Reset() + c.buf.WriteString(rest) + } + return out +} + +// nextBoundary returns the byte offset just past the first emittable clause in +// s, or ok=false when more input is needed (final=false) and no boundary is +// confirmed yet. +func (c *clauseChunker) nextBoundary(s string, final bool) (int, bool) { + if s == "" { + return 0, false + } + + // 1) UAX #29 sentence boundary. When the first sentence is followed by more + // text it is a confirmed complete sentence (handles Latin .!? with + // abbreviation/decimal guards, and CJK 。!? with no whitespace). + sentence, rest, _ := uniseg.FirstSentenceInString(s, -1) + if rest != "" { + // Optionally cut finer inside the sentence at a clause boundary. + if cut, ok := c.firstClauseCut(sentence); ok { + return cut, true + } + return len(sentence), true + } + + // 2) Unterminated tail: look for a sub-sentence clause boundary (CJK + // punctuation or a Thai/Lao space) confirmed by following content. + if cut, ok := c.firstClauseCut(s); ok { + return cut, true + } + + // 3) Over-long punctuation-less run: force a typographically legal break so + // we don't buffer unbounded (e.g. a long CJK run with no punctuation). + if !final && c.maxRunes > 0 && utf8.RuneCountInString(s) > c.maxRunes { + if cut, ok := lineBreakCut(s, c.maxRunes); ok { + return cut, true + } + } + + // 4) End of input: emit whatever remains as the final clause. + if final { + return len(s), true + } + return 0, false +} + +// firstClauseCut returns the byte offset just past the first sub-sentence clause +// boundary in s — a CJK clause punctuation mark, or a space following a Thai/Lao +// letter — provided the prefix is at least minRunes long and non-space content +// follows. The boundary mark (and any trailing spaces) stay with the left clause. +func (c *clauseChunker) firstClauseCut(s string) (int, bool) { + var prev rune + runes := 0 + for i, r := range s { + boundary := isCJKClausePunct(r) || (unicode.IsSpace(r) && isThaiLao(prev)) + if boundary && runes+1 >= c.minRunes { + end := i + utf8.RuneLen(r) + for end < len(s) { + nr, sz := utf8.DecodeRuneInString(s[end:]) + if !unicode.IsSpace(nr) { + break + } + end += sz + } + if end < len(s) { // confirmed: real content follows the boundary + return end, true + } + // Boundary sits at the end of the buffer with nothing after it yet — + // wait for the next token to confirm it rather than emit early. + return 0, false + } + prev = r + runes++ + } + return 0, false +} + +// lineBreakCut walks UAX #14 line segments and returns the byte offset of the +// last legal break opportunity at or before maxRunes. Returns ok=false when the +// run has no internal break opportunity (e.g. a space-less Thai run), leaving it +// buffered. +func lineBreakCut(s string, maxRunes int) (int, bool) { + state := -1 + rest := s + consumed := 0 + runes := 0 + for rest != "" { + seg, rem, _, st := uniseg.FirstLineSegmentInString(rest, state) + state = st + runes += utf8.RuneCountInString(seg) + consumed += len(seg) + rest = rem + if runes >= maxRunes { + if consumed < len(s) { + return consumed, true + } + return 0, false + } + } + return 0, false +} + +// isCJKClausePunct reports whether r is a CJK clause-level separator worth a +// soft TTS break. Sentence terminators (。!?) are intentionally excluded — UAX +// #29 sentence segmentation already handles those. +func isCJKClausePunct(r rune) bool { + switch r { + case ',', // , fullwidth comma + '、', // 、 ideographic comma + ';', // ; fullwidth semicolon + ':', // : fullwidth colon + '・', // ・ katakana middle dot + '・': // ・ halfwidth katakana middle dot + return true + } + return false +} + +// isThaiLao reports whether r is a Thai or Lao letter. Those scripts have no +// inter-word spaces; an ASCII space inside such a run marks a clause/sentence +// boundary, which is the only no-dictionary segmentation signal available. +func isThaiLao(r rune) bool { + return unicode.Is(unicode.Thai, r) || unicode.Is(unicode.Lao, r) +} diff --git a/core/http/endpoints/openai/realtime_chunker_test.go b/core/http/endpoints/openai/realtime_chunker_test.go new file mode 100644 index 000000000..4446b2a11 --- /dev/null +++ b/core/http/endpoints/openai/realtime_chunker_test.go @@ -0,0 +1,103 @@ +package openai + +import ( + "strings" + "unicode/utf8" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// clauseChunker splits streamed LLM content into speakable clauses in a +// script-aware way: UAX#29 sentences (Latin .!? and CJK 。!?), CJK clause +// punctuation, and Thai/Lao spaces — never whitespace-splitting CJK. +var _ = Describe("clauseChunker", func() { + Context("Latin sentences", func() { + It("emits a sentence only once following content confirms it is complete", func() { + c := newClauseChunker(12, 200) + Expect(c.push("Hello world. How are you?")).To(Equal([]string{"Hello world."})) + // The trailing sentence is held until flush (the next token might extend it). + Expect(c.flush()).To(Equal([]string{"How are you?"})) + }) + + It("assembles a sentence across many small tokens", func() { + c := newClauseChunker(12, 200) + var got []string + for _, tok := range []string{"Hello", " world.", " How", " are", " you?"} { + got = append(got, c.push(tok)...) + } + got = append(got, c.flush()...) + Expect(got).To(Equal([]string{"Hello world.", "How are you?"})) + }) + + It("does not split decimals or abbreviations (UAX#29 SB6)", func() { + c := newClauseChunker(12, 200) + got := c.push("Pi is 3.14 and e is 2.72. Done") + Expect(got).To(Equal([]string{"Pi is 3.14 and e is 2.72."})) + Expect(c.flush()).To(Equal([]string{"Done"})) + }) + }) + + Context("CJK (no whitespace)", func() { + It("splits Chinese on the ideographic full stop", func() { + c := newClauseChunker(12, 200) + Expect(c.push("你好世界。今天天气很好。")).To(Equal([]string{"你好世界。"})) + Expect(c.flush()).To(Equal([]string{"今天天气很好。"})) + }) + + It("splits Japanese on the ideographic full stop", func() { + c := newClauseChunker(12, 200) + Expect(c.push("こんにちは。元気ですか。")).To(Equal([]string{"こんにちは。"})) + Expect(c.flush()).To(Equal([]string{"元気ですか。"})) + }) + + It("splits on CJK clause punctuation for lower latency", func() { + c := newClauseChunker(2, 200) // small min so short test clauses cut + Expect(c.push("你好,世界。再见")).To(Equal([]string{"你好,", "世界。"})) + Expect(c.flush()).To(Equal([]string{"再见"})) + }) + }) + + Context("Thai (spaces mark clauses, not words)", func() { + It("splits a Thai run on the inter-clause space", func() { + c := newClauseChunker(2, 200) + Expect(c.push("สวัสดีครับ กินข้าวไหม")).To(Equal([]string{"สวัสดีครับ"})) + Expect(c.flush()).To(Equal([]string{"กินข้าวไหม"})) + }) + + It("never shatters a space-less Thai run into characters", func() { + c := newClauseChunker(2, 200) + Expect(c.push("สวัสดีครับ")).To(BeEmpty()) // held, no boundary + Expect(c.flush()).To(Equal([]string{"สวัสดีครับ"})) + }) + }) + + Context("length cap (UAX#14 fallback)", func() { + It("force-breaks an over-long punctuation-less CJK run at legal points", func() { + c := newClauseChunker(4, 10) // maxRunes = 10 + run := strings.Repeat("字", 25) + got := c.push(run) + got = append(got, c.flush()...) + total := 0 + for _, seg := range got { + n := utf8.RuneCountInString(seg) + Expect(n).To(BeNumerically("<=", 10)) // never exceeds the cap + total += n + } + Expect(total).To(Equal(25)) // nothing dropped + Expect(len(got)).To(BeNumerically(">=", 3)) // 10 + 10 + 5 + }) + }) + + Context("buffer lifecycle", func() { + It("flush clears the buffer so the chunker is reusable", func() { + c := newClauseChunker(12, 200) + // "First one." is confirmed by the following "Second", so push drains it; + // only the unterminated tail remains for flush. + Expect(c.push("First one. Second")).To(Equal([]string{"First one."})) + Expect(c.flush()).To(Equal([]string{"Second"})) + Expect(c.flush()).To(BeEmpty()) + Expect(c.push("Again. More")).To(Equal([]string{"Again."})) + }) + }) +}) diff --git a/core/http/endpoints/openai/realtime_doubles_test.go b/core/http/endpoints/openai/realtime_doubles_test.go new file mode 100644 index 000000000..accd6af51 --- /dev/null +++ b/core/http/endpoints/openai/realtime_doubles_test.go @@ -0,0 +1,138 @@ +package openai + +import ( + "context" + "strings" + + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/grpc/proto" +) + +// fakeTransport records the server events and audio sent to a realtime client +// so streaming behaviour can be asserted without a real WebSocket/WebRTC peer. +// It is not a *WebRTCTransport, so handler code takes the WebSocket path. +type fakeTransport struct { + events []types.ServerEvent + audio []fakeAudioChunk +} + +type fakeAudioChunk struct { + pcm []byte + sampleRate int +} + +func (f *fakeTransport) SendEvent(e types.ServerEvent) error { + f.events = append(f.events, e) + return nil +} + +func (f *fakeTransport) ReadEvent() ([]byte, error) { return nil, nil } + +func (f *fakeTransport) SendAudio(_ context.Context, pcm []byte, sampleRate int) error { + f.audio = append(f.audio, fakeAudioChunk{pcm: pcm, sampleRate: sampleRate}) + return nil +} + +func (f *fakeTransport) Close() error { return nil } + +// countEvents returns how many recorded events have the given type. +func (f *fakeTransport) countEvents(et types.ServerEventType) int { + n := 0 + for _, e := range f.events { + if e.ServerEventType() == et { + n++ + } + } + return n +} + +// transcriptDeltaText concatenates the Delta of every recorded transcript +// delta event — i.e. the text streamed to the client as it is generated. +func (f *fakeTransport) transcriptDeltaText() string { + var b strings.Builder + for _, e := range f.events { + if d, ok := e.(types.ResponseOutputAudioTranscriptDeltaEvent); ok { + b.WriteString(d.Delta) + } + } + return b.String() +} + +// fakeModel is a configurable Model double. TTSStream replays ttsStreamChunks +// and TranscribeStream replays transcribeDeltas, so the handler's streaming +// paths can be driven deterministically. +type fakeModel struct { + cfg *config.ModelConfig + + ttsFile string + ttsStreamChunks [][]byte + ttsStreamRate int + ttsStreamErr error + + transcribeDeltas []string + transcribeFinal *schema.TranscriptionResult + + // Predict streaming: predictTokens are replayed through the token callback + // (simulating streamed LLM output); predictResp/predictErr are returned by + // the deferred predict function. predictChunkDeltas, when set, are delivered + // per-token via TokenUsage.ChatDeltas to exercise the autoparser path. + predictTokens []string + predictChunkDeltas [][]*proto.ChatDelta + predictResp backend.LLMResponse + predictErr error +} + +func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) { + return nil, nil +} + +func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, string) (*schema.TranscriptionResult, error) { + return m.transcribeFinal, nil +} + +func (m *fakeModel) Predict(_ context.Context, _ schema.Messages, _, _, _ []string, cb func(string, backend.TokenUsage) bool, _ []types.ToolUnion, _ *types.ToolChoiceUnion, _, _ *int, _ map[string]float64) (func() (backend.LLMResponse, error), error) { + if m.predictErr != nil { + return nil, m.predictErr + } + return func() (backend.LLMResponse, error) { + for i, tok := range m.predictTokens { + if cb == nil { + continue + } + usage := backend.TokenUsage{} + if i < len(m.predictChunkDeltas) { + usage.ChatDeltas = m.predictChunkDeltas[i] + } + cb(tok, usage) + } + return m.predictResp, nil + }, nil +} + +func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) { + return m.ttsFile, &proto.Result{Success: true}, nil +} + +func (m *fakeModel) TTSStream(_ context.Context, _, _, _ string, onAudio func(pcm []byte, sampleRate int) error) error { + if m.ttsStreamErr != nil { + return m.ttsStreamErr + } + for _, c := range m.ttsStreamChunks { + if err := onAudio(c, m.ttsStreamRate); err != nil { + return err + } + } + return nil +} + +func (m *fakeModel) TranscribeStream(_ context.Context, _, _ string, _, _ bool, _ string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + for _, d := range m.transcribeDeltas { + onDelta(d) + } + return m.transcribeFinal, nil +} + +func (m *fakeModel) PredictConfig() *config.ModelConfig { return m.cfg } diff --git a/core/http/endpoints/openai/realtime_model.go b/core/http/endpoints/openai/realtime_model.go index b9a3adda9..8281197a3 100644 --- a/core/http/endpoints/openai/realtime_model.go +++ b/core/http/endpoints/openai/realtime_model.go @@ -3,6 +3,7 @@ package openai import ( "context" "crypto/rand" + "encoding/binary" "encoding/hex" "encoding/json" "fmt" @@ -87,6 +88,14 @@ func (m *transcriptOnlyModel) TTS(ctx context.Context, text, voice, language str return "", nil, fmt.Errorf("TTS not supported in transcript-only mode") } +func (m *transcriptOnlyModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error { + return fmt.Errorf("TTS not supported in transcript-only mode") +} + +func (m *transcriptOnlyModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta) +} + func (m *transcriptOnlyModel) PredictConfig() *config.ModelConfig { return nil } @@ -321,10 +330,75 @@ func (m *wrappedModel) TTS(ctx context.Context, text, voice, language string) (s return backend.ModelTTS(ctx, text, voice, language, "", nil, m.modelLoader, m.appConfig, *m.TTSConfig) } +func (m *wrappedModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error { + return ttsStream(ctx, m.modelLoader, m.appConfig, *m.TTSConfig, text, voice, language, onAudio) +} + +func (m *wrappedModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta) +} + func (m *wrappedModel) PredictConfig() *config.ModelConfig { return m.LLMConfig } +// wavStreamHeaderBytes is the size of the WAV header that backend.ModelTTSStream +// emits as its first audio callback; the sample rate lives at byte offset 24. +const wavStreamHeaderBytes = 44 + +// ttsStream adapts backend.ModelTTSStream (which emits a WAV stream: a 44-byte +// header carrying the sample rate, then raw PCM) to the realtime onAudio +// callback, which wants raw PCM plus the sample rate. The header is buffered +// until complete, the sample rate is read from it, and subsequent bytes are +// forwarded as PCM. +func ttsStream(ctx context.Context, ml *model.ModelLoader, appConfig *config.ApplicationConfig, ttsConfig config.ModelConfig, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error { + var header []byte + headerDone := false + sampleRate := 0 + return backend.ModelTTSStream(ctx, text, voice, language, "", nil, ml, appConfig, ttsConfig, func(b []byte) error { + if headerDone { + if len(b) == 0 { + return nil + } + return onAudio(b, sampleRate) + } + header = append(header, b...) + if len(header) < wavStreamHeaderBytes { + return nil + } + sampleRate = int(binary.LittleEndian.Uint32(header[24:28])) + headerDone = true + if len(header) > wavStreamHeaderBytes { + return onAudio(header[wavStreamHeaderBytes:], sampleRate) + } + return nil + }) +} + +// transcribeStream adapts backend.ModelTranscriptionStream to the realtime +// onDelta callback, returning the final aggregated transcription result. +func transcribeStream(ctx context.Context, ml *model.ModelLoader, transcriptionConfig config.ModelConfig, appConfig *config.ApplicationConfig, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + var final *schema.TranscriptionResult + err := backend.ModelTranscriptionStream(ctx, backend.TranscriptionRequest{ + Audio: audio, + Language: language, + Translate: translate, + Diarize: diarize, + Prompt: prompt, + }, ml, transcriptionConfig, appConfig, func(chunk backend.TranscriptionStreamChunk) { + if chunk.Delta != "" { + onDelta(chunk.Delta) + } + if chunk.Final != nil { + final = chunk.Final + } + }) + if err != nil { + return nil, err + } + return final, nil +} + func newTranscriptionOnlyModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) (Model, *config.ModelConfig, error) { cfgVAD, err := cl.LoadModelConfigFileByName(pipeline.VAD, ml.ModelPath) if err != nil { @@ -454,8 +528,10 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model return nil, fmt.Errorf("failed to validate config: %w", err) } - // Let the pipeline set the LLM's reasoning effort (cfgLLM is a per-session copy). + // Let the pipeline set the LLM's reasoning effort and force thinking off + // (cfgLLM is a per-session copy). disable_thinking applies after the effort. applyPipelineReasoning(cfgLLM, *pipeline) + applyPipelineThinking(cfgLLM, *pipeline) cfgTTS, err := cl.LoadModelConfigFileByName(pipeline.TTS, ml.ModelPath) if err != nil { diff --git a/core/http/endpoints/openai/realtime_speech.go b/core/http/endpoints/openai/realtime_speech.go new file mode 100644 index 000000000..ec4bbc4b0 --- /dev/null +++ b/core/http/endpoints/openai/realtime_speech.go @@ -0,0 +1,102 @@ +package openai + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "path/filepath" + + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + laudio "github.com/mudler/LocalAI/pkg/audio" + "github.com/mudler/LocalAI/pkg/sound" +) + +// emitSpeech synthesizes text and sends the audio to the client. When the +// pipeline opts into TTS streaming it forwards each PCM chunk as its own +// response.output_audio.delta as soon as the backend produces it; otherwise it +// synthesizes the whole utterance and sends it as a single delta. +// +// It deliberately does NOT emit transcript or audio-done events: the caller owns +// those so a streamed reply can be split into several spoken segments that share +// one response/item. +// +// It returns the PCM audio (at the session output rate) accumulated across all +// chunks, which the caller base64-encodes onto the conversation item. For WebRTC +// the audio goes over the RTP track instead, so the returned slice is empty. +func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) ([]byte, error) { + if text == "" { + return nil, nil + } + + _, isWebRTC := t.(*WebRTCTransport) + + var wsAudio []byte // PCM at the session output rate, accumulated for the item record + + // sendChunk hands one PCM buffer to the transport: WebRTC consumes the raw + // PCM directly (it resamples internally); WebSocket gets base64 PCM at the + // session output rate via a JSON delta event. + sendChunk := func(pcm []byte, sampleRate int) error { + if len(pcm) == 0 { + return nil + } + if err := t.SendAudio(ctx, pcm, sampleRate); err != nil { + return err + } + if isWebRTC { + return nil + } + wsPCM := pcm + if sampleRate != 0 && sampleRate != session.OutputSampleRate { + samples := sound.BytesToInt16sLE(pcm) + resampled := sound.ResampleInt16(samples, sampleRate, session.OutputSampleRate) + wsPCM = sound.Int16toBytesLE(resampled) + } + wsAudio = append(wsAudio, wsPCM...) + return t.SendEvent(types.ResponseOutputAudioDeltaEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Delta: base64.StdEncoding.EncodeToString(wsPCM), + }) + } + + language := "" + if session.InputAudioTranscription != nil { + language = session.InputAudioTranscription.Language + } + + if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS() { + if err := session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk); err != nil { + return nil, err + } + return wsAudio, nil + } + + // Unary fallback: synthesize the whole utterance to a file, then emit once. + audioFilePath, res, err := session.ModelInterface.TTS(ctx, text, session.Voice, language) + if err != nil { + return nil, err + } + if res != nil && !res.Success { + return nil, fmt.Errorf("tts generation failed: %s", res.Message) + } + defer func() { _ = os.Remove(audioFilePath) }() + + // filepath.Clean normalizes the backend-produced temp path before reading + // (also keeps gosec G304 quiet — the path is backend-controlled, not user input). + audioBytes, err := os.ReadFile(filepath.Clean(audioFilePath)) + if err != nil { + return nil, fmt.Errorf("read tts audio: %w", err) + } + pcm, sampleRate := laudio.ParseWAV(audioBytes) + if sampleRate == 0 { + sampleRate = session.OutputSampleRate + } + if err := sendChunk(pcm, sampleRate); err != nil { + return nil, err + } + return wsAudio, nil +} diff --git a/core/http/endpoints/openai/realtime_speech_test.go b/core/http/endpoints/openai/realtime_speech_test.go new file mode 100644 index 000000000..a501f946c --- /dev/null +++ b/core/http/endpoints/openai/realtime_speech_test.go @@ -0,0 +1,70 @@ +package openai + +import ( + "context" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + laudio "github.com/mudler/LocalAI/pkg/audio" +) + +// emitSpeech synthesizes a piece of text and forwards the audio to the client, +// streaming a delta per TTS chunk when the pipeline opts in, or sending the +// whole utterance as one delta otherwise. +var _ = Describe("emitSpeech", func() { + ttsOn := true + + streamingSession := func(m Model) *Session { + return &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &ttsOn}}, + }, + } + } + + It("streams one output_audio.delta per TTS chunk when streaming is enabled", func() { + m := &fakeModel{ + ttsStreamChunks: [][]byte{{1, 2}, {3, 4}, {5, 6}}, + ttsStreamRate: 24000, + } + t := &fakeTransport{} + + audio, err := emitSpeech(context.Background(), t, streamingSession(m), "resp1", "item1", "Hello there.") + + Expect(err).ToNot(HaveOccurred()) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(3)) + // The returned audio is all chunks concatenated (session output rate). + Expect(audio).To(Equal([]byte{1, 2, 3, 4, 5, 6})) + }) + + It("sends a single output_audio.delta in unary mode", func() { + // A minimal real WAV file for the unary TTS path to read + parse. + f, err := os.CreateTemp("", "emit-*.wav") + Expect(err).ToNot(HaveOccurred()) + defer func() { _ = os.Remove(f.Name()) }() + pcm := make([]byte, 320) // 160 samples of silence + hdr := laudio.NewWAVHeader(uint32(len(pcm))) + Expect(hdr.Write(f)).To(Succeed()) + _, err = f.Write(pcm) + Expect(err).ToNot(HaveOccurred()) + Expect(f.Close()).To(Succeed()) + + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: &fakeModel{ttsFile: f.Name()}, + ModelConfig: &config.ModelConfig{}, // streaming off + } + t := &fakeTransport{} + + _, err = emitSpeech(context.Background(), t, session, "resp1", "item1", "Hello there.") + + Expect(err).ToNot(HaveOccurred()) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) + }) +}) diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go new file mode 100644 index 000000000..909fc50dc --- /dev/null +++ b/core/http/endpoints/openai/realtime_stream.go @@ -0,0 +1,315 @@ +package openai + +import ( + "context" + "encoding/base64" + "fmt" + + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/functions" + "github.com/mudler/LocalAI/pkg/reasoning" +) + +// transcriptStreamer turns streamed LLM tokens into the assistant's spoken +// transcript: it strips reasoning incrementally and sends one +// response.output_audio_transcript.delta per content fragment. It does NOT +// synthesize audio — the caller buffers the full message and synthesizes it +// once (streaming the audio chunks when the TTS backend supports TTSStream), +// which works uniformly for streaming and non-streaming TTS and for languages +// without sentence or word boundaries. +type transcriptStreamer struct { + ctx context.Context + t Transport + responseID string + itemID string + extractor *reasoning.ReasoningExtractor + + // announce, if set, is invoked once just before the first transcript delta. + // It lets the caller create the assistant item lazily, so a content-less + // tool-call turn never emits a spurious empty assistant item. + announce func() + announced bool +} + +func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer { + return &transcriptStreamer{ + ctx: ctx, + t: t, + responseID: responseID, + itemID: itemID, + extractor: reasoning.NewReasoningExtractor(thinkingStartToken, spokenReasoningConfig(reasoningCfg)), + } +} + +// onToken handles one streamed unit of model output, sending a transcript delta +// for the new content (reasoning stripped) and returning that content delta so +// the caller can also feed it to the clause chunker. For plain-content models +// the unit is the raw text token; for autoparser tool turns the backend clears +// the text and delivers content via ChatDeltas, so the caller passes that +// content here. Returns "" when the token produced no new spoken content. +func (s *transcriptStreamer) onToken(token string) string { + _, content := s.extractor.ProcessToken(token) + if content == "" { + return "" + } + if !s.announced { + s.announced = true + if s.announce != nil { + s.announce() + } + } + _ = s.t.SendEvent(types.ResponseOutputAudioTranscriptDeltaEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: s.responseID, + ItemID: s.itemID, + OutputIndex: 0, + ContentIndex: 0, + Delta: content, + }) + return content +} + +// content returns the full transcript so far with reasoning stripped. +func (s *transcriptStreamer) content() string { + return s.extractor.CleanedContent() +} + +// streamLLMResponse drives a streamed realtime reply. It streams the assistant +// transcript as the LLM generates, then synthesizes the whole buffered message +// once (streaming the audio chunks when the TTS backend supports it, otherwise a +// single unary delta). Tool calls parsed from the autoparser ChatDeltas are +// emitted after the spoken content. The assistant content item is created lazily +// on the first content delta, so a content-less tool-call turn emits only the +// tool calls. It returns true when it has fully handled the response so the +// caller can return; callers must only invoke it for an audio modality, and with +// tools only when the model uses its tokenizer template (see triggerResponseAtTurn). +func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, toolTurn int) bool { + itemID := generateItemID() + item := types.MessageItemUnion{ + Assistant: &types.MessageItemAssistant{ + ID: itemID, + Status: types.ItemStatusInProgress, + Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}}, + }, + } + + // announce creates the assistant content item lazily, just before the first + // transcript delta — a tool-only turn never produces content, so it stays out + // of the conversation and the client sees only the tool calls. + announced := false + announce := func() { + announced = true + conv.Lock.Lock() + conv.Items = append(conv.Items, &item) + conv.Lock.Unlock() + sendEvent(t, types.ResponseOutputItemAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + sendEvent(t, types.ResponseContentPartAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + } + + cancel := func() { + if announced { + conv.Lock.Lock() + for i := len(conv.Items) - 1; i >= 0; i-- { + if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == itemID { + conv.Items = append(conv.Items[:i], conv.Items[i+1:]...) + break + } + } + conv.Lock.Unlock() + } + sendEvent(t, types.ResponseDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled}, + }) + } + + var template string + if llmCfg.TemplateConfig.UseTokenizerTemplate { + template = llmCfg.GetModelTemplate() + } else { + template = llmCfg.TemplateConfig.Chat + } + thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) + + // The autoparser (tokenizer-template path) already delivers reasoning-free + // content. Prefilling the thinking start token here would re-tag that clean + // content as an unclosed reasoning block, leaving CleanedContent() empty — + // no spoken reply, no TTS. Disable the prefill; closed tag pairs are still + // stripped (PEG-fallback case, #9985). + reasoningCfg := llmCfg.ReasoningConfig + if llmCfg.TemplateConfig.UseTokenizerTemplate { + disablePrefill := true + reasoningCfg.DisableReasoningTagPrefill = &disablePrefill + } + + streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, reasoningCfg) + streamer.announce = announce + + // Clause chunking (opt-in): synthesize each clause as soon as it completes + // instead of buffering the whole reply. streamedAudio accumulates the PCM + // across clauses for the conversation item record; ttsErr captures the first + // synthesis failure so the token callback can stop the prediction. emitSpeech + // runs synchronously here — the LLM keeps generating into the gRPC stream + // while a clause is synthesized, so audio still starts mid-generation. + var chunker *clauseChunker + if session.ModelConfig != nil && session.ModelConfig.Pipeline.ChunkClauses() { + chunker = newClauseChunker(defaultClauseMinRunes, defaultClauseMaxRunes) + } + var streamedAudio []byte + var ttsErr error + speakClause := func(clause string) error { + a, err := emitSpeech(ctx, t, session, responseID, itemID, clause) + if err != nil { + return err + } + streamedAudio = append(streamedAudio, a...) + return nil + } + + // fail reports a mid-stream failure. A cancelled context means the client + // interrupted (barge-in), so roll the turn back instead of erroring. + fail := func(code, msg string, err error) bool { + if ctx.Err() != nil { + cancel() + } else { + sendError(t, code, fmt.Sprintf("%s: %v", msg, err), "", itemID) + } + return true + } + + cb := func(token string, usage backend.TokenUsage) bool { + if ctx.Err() != nil { + return false + } + // Plain-content models stream text via the token; autoparser tool turns + // clear the text and deliver content via ChatDeltas, so prefer the latter + // when present. Either way only content reaches the transcript — tool-call + // deltas are parsed from the final response below. + text := token + if len(usage.ChatDeltas) > 0 { + text = functions.ContentFromChatDeltas(usage.ChatDeltas) + } + delta := streamer.onToken(text) + if chunker != nil && delta != "" { + for _, clause := range chunker.push(delta) { + if ttsErr = speakClause(clause); ttsErr != nil { + return false // stop the prediction; reported after predFunc returns + } + } + } + return true + } + + predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, tools, toolChoice, nil, nil, nil) + if err != nil { + sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", itemID) + return true + } + pred, err := predFunc() + // A clause synthesis failed mid-stream (the callback stopped the prediction); + // report it as a TTS error rather than a prediction error. + if ttsErr != nil { + return fail("tts_error", "TTS generation failed", ttsErr) + } + if err != nil { + return fail("prediction_failed", "backend error", err) + } + if ctx.Err() != nil { + cancel() + return true + } + + content := streamer.content() + toolCalls := functions.ToolCallsFromChatDeltas(pred.ChatDeltas) + + // Finalize the spoken content item only when the turn produced content. A + // tool-only turn skips this entirely (no empty assistant item). + if content != "" { + if !announced { + announce() + } + + // Synthesize the audio. With clause chunking the completed clauses were + // already spoken inside the token callback; flush the trailing clause(s) + // the segmenter was still holding. Otherwise buffer the whole message and + // synthesize it once. emitSpeech streams the audio chunks when the TTS + // backend supports TTSStream, otherwise it sends a single unary delta. + var audio []byte + if chunker != nil { + for _, clause := range chunker.flush() { + if ttsErr = speakClause(clause); ttsErr != nil { + break + } + } + audio = streamedAudio + } else { + audio, ttsErr = emitSpeech(ctx, t, session, responseID, itemID, content) + } + if ttsErr != nil { + return fail("tts_error", "TTS generation failed", ttsErr) + } + + _, isWebRTC := t.(*WebRTCTransport) + + sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Transcript: content, + }) + if !isWebRTC { + sendEvent(t, types.ResponseOutputAudioDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + }) + } + + conv.Lock.Lock() + item.Assistant.Status = types.ItemStatusCompleted + item.Assistant.Content[0].Transcript = content + if !isWebRTC { + item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio) + } + conv.Lock.Unlock() + + sendEvent(t, types.ResponseContentPartDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + sendEvent(t, types.ResponseOutputItemDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + } + + // Emit any tool calls, the terminal response.done, and (for server-side + // assistant tools) the follow-up turn — shared with the buffered path. + emitToolCallItems(ctx, session, conv, t, responseID, toolCalls, content != "", toolTurn) + return true +} diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go new file mode 100644 index 000000000..5150feb21 --- /dev/null +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -0,0 +1,213 @@ +package openai + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/pkg/grpc/proto" + "github.com/mudler/LocalAI/pkg/reasoning" +) + +// transcriptStreamer turns streamed LLM tokens into incremental transcript +// deltas, stripping reasoning. Audio is synthesized once from the full message +// by the caller, so there is no per-sentence segmentation. +var _ = Describe("transcriptStreamer", func() { + It("emits one transcript delta per content token", func() { + t := &fakeTransport{} + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{}) + + for _, tok := range []string{"Hello", " world.", " Bye"} { + s.onToken(tok) + } + + Expect(s.content()).To(Equal("Hello world. Bye")) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3)) + Expect(t.transcriptDeltaText()).To(Equal("Hello world. Bye")) + }) + + It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() { + // disable_thinking maps to DisableReasoning=true (enable_thinking=false to + // the backend). If the model emits thinking anyway, the transcript must + // still not leak it: stripping always runs for spoken output. + disable := true + t := &fakeTransport{} + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", + reasoning.Config{DisableReasoning: &disable}) + + s.onToken("secret plan") + s.onToken("The answer is 42.") + + Expect(s.content()).To(Equal("The answer is 42.")) + Expect(s.content()).ToNot(ContainSubstring("secret plan")) + Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan")) + }) + + It("does not swallow autoparser content when the template has a thinking start token (tokenizer-template path)", func() { + // Regression: with tag prefill on, the detected token is + // prepended to the autoparser's already-clean content, swallowing the + // whole reply (empty transcript → no TTS). streamLLMResponse disables + // the prefill for the tokenizer-template path. + disablePrefill := true + t := &fakeTransport{} + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", + reasoning.Config{DisableReasoningTagPrefill: &disablePrefill}) + + s.onToken("Hello") + s.onToken(" there.") + + Expect(s.content()).To(Equal("Hello there.")) + Expect(t.transcriptDeltaText()).To(Equal("Hello there.")) + }) + + It("still strips embedded closed reasoning tags with prefill disabled (PEG-fallback safety, #9985)", func() { + // Disabling prefill must not stop stripping closed + // pairs the PEG fallback can leave in autoparser content. + disablePrefill := true + t := &fakeTransport{} + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", + reasoning.Config{DisableReasoningTagPrefill: &disablePrefill}) + + s.onToken("secret") + s.onToken("The answer is 42.") + + Expect(s.content()).To(Equal("The answer is 42.")) + Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret")) + }) +}) + +// streamLLMResponse drives a full streamed realtime turn: live transcript +// deltas while the LLM generates, then the whole message is synthesized once. +var _ = Describe("streamLLMResponse", func() { + It("streams transcript deltas then synthesizes the whole message once", func() { + on := true + m := &fakeModel{ + predictTokens: []string{"Hello", " world.", " How are you?"}, + predictResp: backend.LLMResponse{Response: "Hello world. How are you?"}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // One live transcript delta per streamed token. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3)) + // The whole message is synthesized ONCE (not per sentence): a single + // emitSpeech replays the one TTS stream chunk. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) + Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) + }) + + It("synthesizes each clause as it completes when clause chunking is enabled", func() { + on := true + m := &fakeModel{ + predictTokens: []string{"Hello world.", " How are you?"}, + predictResp: backend.LLMResponse{Response: "Hello world. How are you?"}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on, ClauseChunking: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // Two clauses ("Hello world." mid-stream, "How are you?" on flush) → two + // emitSpeech calls → two audio deltas, vs one for whole-message buffering. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2)) + // The full transcript still streams verbatim. + Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) + // Exactly one terminal response.done. + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) + + It("streams content deltas and emits tool-call items (autoparser tool turn)", func() { + on := true + // Autoparser path: reply.Message is empty; content + tool calls arrive via + // ChatDeltas. Chunk 1 carries content, chunk 2 carries the tool call. + contentDelta := []*proto.ChatDelta{{Content: "Let me check."}} + toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Paris"}`}}}} + m := &fakeModel{ + predictTokens: []string{"", ""}, + predictChunkDeltas: [][]*proto.ChatDelta{contentDelta, toolDelta}, + predictResp: backend.LLMResponse{ChatDeltas: append(append([]*proto.ChatDelta{}, contentDelta...), toolDelta...)}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + llmCfg.TemplateConfig.UseTokenizerTemplate = true + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // The spoken content was streamed live. + Expect(t.transcriptDeltaText()).To(Equal("Let me check.")) + // The tool call is emitted as a function_call item. + Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1)) + // Exactly one terminal response.done. + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) + + It("emits only tool-call items for a content-less tool turn (no empty assistant item)", func() { + on := true + toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Rome"}`}}}} + m := &fakeModel{ + predictTokens: []string{""}, + predictChunkDeltas: [][]*proto.ChatDelta{toolDelta}, + predictResp: backend.LLMResponse{ChatDeltas: toolDelta}, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + llmCfg.TemplateConfig.UseTokenizerTemplate = true + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // No content → no transcript deltas and no spurious assistant content item. + Expect(t.transcriptDeltaText()).To(Equal("")) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(0)) + // The tool call is still emitted. + Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1)) + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) +}) diff --git a/core/http/endpoints/openai/realtime_thinking.go b/core/http/endpoints/openai/realtime_thinking.go new file mode 100644 index 000000000..8222219af --- /dev/null +++ b/core/http/endpoints/openai/realtime_thinking.go @@ -0,0 +1,33 @@ +package openai + +import ( + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/pkg/reasoning" +) + +// applyPipelineThinking forces the LLM's reasoning/thinking off when the realtime +// pipeline sets disable_thinking, mapping to the enable_thinking=false backend +// metadata via ReasoningConfig.DisableReasoning. The LLM config passed in is the +// per-session copy returned by the config loader, so this does not affect other +// users of the same model. When the pipeline does not set disable_thinking the +// LLM config is left untouched. +func applyPipelineThinking(llm *config.ModelConfig, pipeline config.Pipeline) { + if llm == nil || !pipeline.ThinkingDisabled() { + return + } + disable := true + llm.ReasoningConfig.DisableReasoning = &disable +} + +// spokenReasoningConfig adapts a model's reasoning config for stripping reasoning +// OUT of realtime spoken output. ReasoningConfig.DisableReasoning is overloaded: +// the backend reads it as the "enable_thinking=false" hint (which pipeline +// disable_thinking sets via applyPipelineThinking), but the reasoning extractor +// reads it as "skip stripping, assume there is no reasoning". Honouring the latter +// when extracting for speech would leak raw whenever the model +// ignores the suppression hint. Spoken output must never contain reasoning, so we +// always strip: clear DisableReasoning while keeping custom tokens/tag pairs. +func spokenReasoningConfig(cfg reasoning.Config) reasoning.Config { + cfg.DisableReasoning = nil + return cfg +} diff --git a/core/http/endpoints/openai/realtime_thinking_test.go b/core/http/endpoints/openai/realtime_thinking_test.go new file mode 100644 index 000000000..a056dd0e7 --- /dev/null +++ b/core/http/endpoints/openai/realtime_thinking_test.go @@ -0,0 +1,50 @@ +package openai + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/pkg/reasoning" +) + +// applyPipelineThinking lets a realtime pipeline force the LLM's thinking off +// (enable_thinking=false metadata) without editing the LLM model config. +var _ = Describe("applyPipelineThinking", func() { + It("disables reasoning on the LLM config when the pipeline disables thinking", func() { + disable := true + llm := &config.ModelConfig{} + applyPipelineThinking(llm, config.Pipeline{DisableThinking: &disable}) + Expect(llm.ReasoningConfig.DisableReasoning).ToNot(BeNil()) + Expect(*llm.ReasoningConfig.DisableReasoning).To(BeTrue()) + }) + + It("leaves the LLM config untouched when the pipeline does not set disable_thinking", func() { + llm := &config.ModelConfig{} + applyPipelineThinking(llm, config.Pipeline{}) + Expect(llm.ReasoningConfig.DisableReasoning).To(BeNil()) + }) +}) + +// spokenReasoningConfig clears DisableReasoning so realtime spoken output always +// strips reasoning, even though disable_thinking sets DisableReasoning=true on the +// LLM config (which the backend reads as enable_thinking=false). +var _ = Describe("spokenReasoningConfig", func() { + It("clears DisableReasoning so the extractor still strips leaked reasoning", func() { + disable := true + out := spokenReasoningConfig(reasoning.Config{DisableReasoning: &disable}) + Expect(out.DisableReasoning).To(BeNil()) + }) + + It("preserves the other reasoning settings", func() { + disable := true + out := spokenReasoningConfig(reasoning.Config{ + DisableReasoning: &disable, + ThinkingStartTokens: []string{""}, + TagPairs: []reasoning.TagPair{{Start: "", End: ""}}, + }) + Expect(out.ThinkingStartTokens).To(Equal([]string{""})) + Expect(out.TagPairs).To(HaveLen(1)) + Expect(out.TagPairs[0].Start).To(Equal("")) + }) +}) diff --git a/core/http/endpoints/openai/realtime_transcription.go b/core/http/endpoints/openai/realtime_transcription.go new file mode 100644 index 000000000..44456101c --- /dev/null +++ b/core/http/endpoints/openai/realtime_transcription.go @@ -0,0 +1,63 @@ +package openai + +import ( + "context" + "fmt" + + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" +) + +// emitTranscription transcribes a committed utterance and emits the transcription +// events for it, returning the final transcript text. With +// pipeline.streaming.transcription enabled it streams each transcript fragment as +// a conversation.item.input_audio_transcription.delta as the backend produces it, +// then a completed event; otherwise it transcribes the whole utterance and emits +// a single completed event. delta and completed events share itemID. +func emitTranscription(ctx context.Context, t Transport, session *Session, itemID, audioPath string) (string, error) { + cfg := session.InputAudioTranscription + + if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTranscription() { + final, err := session.ModelInterface.TranscribeStream(ctx, audioPath, cfg.Language, false, false, cfg.Prompt, func(delta string) { + _ = t.SendEvent(types.ConversationItemInputAudioTranscriptionDeltaEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Delta: delta, + }) + }) + if err != nil { + return "", err + } + transcript := "" + if final != nil { + transcript = final.Text + } + if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Transcript: transcript, + }); err != nil { + return "", err + } + return transcript, nil + } + + // Unary fallback: transcribe the whole utterance, emit one completed event. + tr, err := session.ModelInterface.Transcribe(ctx, audioPath, cfg.Language, false, false, cfg.Prompt) + if err != nil { + return "", err + } + if tr == nil { + return "", fmt.Errorf("transcribe result is nil") + } + if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Transcript: tr.Text, + }); err != nil { + return "", err + } + return tr.Text, nil +} diff --git a/core/http/endpoints/openai/realtime_transcription_test.go b/core/http/endpoints/openai/realtime_transcription_test.go new file mode 100644 index 000000000..f3f760fd8 --- /dev/null +++ b/core/http/endpoints/openai/realtime_transcription_test.go @@ -0,0 +1,54 @@ +package openai + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" +) + +// emitTranscription transcribes a committed utterance, streaming transcript text +// deltas when the pipeline opts in, and returns the final transcript text. +var _ = Describe("emitTranscription", func() { + It("streams transcription deltas then a completed event when streaming is enabled", func() { + on := true + session := &Session{ + InputAudioTranscription: &types.AudioTranscription{}, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{Transcription: &on}}, + }, + ModelInterface: &fakeModel{ + transcribeDeltas: []string{"Hel", "lo", " world"}, + transcribeFinal: &schema.TranscriptionResult{Text: "Hello world"}, + }, + } + t := &fakeTransport{} + + transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav") + + Expect(err).ToNot(HaveOccurred()) + Expect(transcript).To(Equal("Hello world")) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(3)) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1)) + }) + + It("emits a single completed event with no deltas in unary mode", func() { + session := &Session{ + InputAudioTranscription: &types.AudioTranscription{}, + ModelConfig: &config.ModelConfig{}, // streaming off + ModelInterface: &fakeModel{transcribeFinal: &schema.TranscriptionResult{Text: "Hi"}}, + } + t := &fakeTransport{} + + transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav") + + Expect(err).ToNot(HaveOccurred()) + Expect(transcript).To(Equal("Hi")) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(0)) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1)) + }) +}) diff --git a/core/http/react-ui/src/pages/Talk.jsx b/core/http/react-ui/src/pages/Talk.jsx index cf92102ac..04af7eb4e 100644 --- a/core/http/react-ui/src/pages/Talk.jsx +++ b/core/http/react-ui/src/pages/Talk.jsx @@ -17,6 +17,24 @@ const STATUS_STYLES = { error: { icon: 'fa-solid fa-circle', color: 'var(--color-error)', bg: 'var(--color-error-light)' }, } +// upsertAssistant merges a streamed transcript fragment into the assistant entry +// identified by the server's item_id, or appends a new entry if none exists yet. +// Keying by item_id (not a mutable index tracked across handler/updater +// boundaries) makes streamed deltas idempotent and order-independent, so React's +// batching of non-React data-channel events cannot produce a duplicate bubble. +// mode 'append' adds to the running text; 'replace' sets the final transcript. +function upsertAssistant(prev, itemId, text, mode) { + // Only assistant entries carry an id, and the streaming entry is almost + // always the newest — search from the tail so per-delta cost stays constant. + const i = prev.findLastIndex(e => e.id === itemId) + if (i === -1) { + return [...prev, { role: 'assistant', id: itemId, text }] + } + const next = [...prev] + next[i] = { ...next[i], text: mode === 'append' ? next[i].text + text : text } + return next +} + export default function Talk() { const { addToast } = useOutletContext() const navigate = useNavigate() @@ -34,7 +52,10 @@ export default function Talk() { // Transcript const [transcript, setTranscript] = useState([]) - const streamingRef = useRef(null) // tracks the index of the in-progress assistant message + // item_id of the assistant message currently streaming — used only to remove + // its partial bubble when a response is cancelled (barge-in). The transcript + // itself is keyed by item_id via upsertAssistant, not by this ref. + const inProgressIdRef = useRef(null) // Session settings const [instructions, setInstructions] = useState( @@ -227,39 +248,21 @@ export default function Talk() { break case 'conversation.item.input_audio_transcription.completed': if (event.transcript) { - streamingRef.current = null setTranscript(prev => [...prev, { role: 'user', text: event.transcript }]) } updateStatus('thinking', 'Generating response...') break case 'response.output_audio_transcript.delta': if (event.delta) { - setTranscript(prev => { - if (streamingRef.current !== null) { - const updated = [...prev] - updated[streamingRef.current] = { - ...updated[streamingRef.current], - text: updated[streamingRef.current].text + event.delta, - } - return updated - } - streamingRef.current = prev.length - return [...prev, { role: 'assistant', text: event.delta }] - }) + inProgressIdRef.current = event.item_id + setTranscript(prev => upsertAssistant(prev, event.item_id, event.delta, 'append')) } break case 'response.output_audio_transcript.done': if (event.transcript) { - setTranscript(prev => { - if (streamingRef.current !== null) { - const updated = [...prev] - updated[streamingRef.current] = { ...updated[streamingRef.current], text: event.transcript } - return updated - } - return [...prev, { role: 'assistant', text: event.transcript }] - }) + setTranscript(prev => upsertAssistant(prev, event.item_id, event.transcript, 'replace')) } - streamingRef.current = null + inProgressIdRef.current = null break case 'response.output_audio.delta': updateStatus('speaking', 'Speaking...') @@ -281,7 +284,7 @@ export default function Talk() { // Pretty-print JSON for readability; fall back to raw string. try { preview = JSON.stringify(JSON.parse(preview), null, 2) } catch (_) { /* keep raw */ } setTranscript(prev => [...prev, { role: 'tool_result', text: preview }]) - streamingRef.current = null // tool result ends the current assistant text run + inProgressIdRef.current = null // tool result ends the current assistant text run } break } @@ -290,9 +293,20 @@ export default function Talk() { // conversation.item.create + response.create when it's done. handleFunctionCall(event) break - case 'response.done': + case 'response.done': { + // A cancelled response (barge-in / interruption) leaves a partial, + // incrementally-streamed assistant bubble behind. The server discards + // the interrupted item from history; mirror that here (remove the + // in-progress assistant entry by item_id) so the regenerated reply + // doesn't show up as a second assistant message. + if (event.response?.status === 'cancelled' && inProgressIdRef.current) { + const id = inProgressIdRef.current + inProgressIdRef.current = null + setTranscript(prev => prev.filter(e => e.id !== id)) + } updateStatus('listening', 'Listening...') break + } case 'error': hasErrorRef.current = true updateStatus('error', 'Error: ' + (event.error?.message || 'Unknown error')) @@ -789,7 +803,7 @@ export default function Talk() { const iconColor = isToolCall || isToolResult ? 'var(--color-text-secondary)' : isUser ? 'var(--color-primary)' : 'var(--color-accent)' return ( -
+