From 16a5bab71fa9c6e9d2dade8887694c5ca72a37dc Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:40:32 +0000 Subject: [PATCH] feat(realtime): wire streamLLMResponse for token-streamed replies triggerResponseAtTurn takes a streamed path when pipeline.streaming.llm is set, the turn has no tools, and audio is requested: streamLLMResponse announces the assistant item, drives the LLM token callback through a speechStreamer (reasoning-stripped transcript deltas + sentence-piped TTS), and emits the terminal events. Tool turns and non-streaming pipelines keep the existing buffered path unchanged, so this is strictly opt-in. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 17 +++ core/http/endpoints/openai/realtime_stream.go | 144 ++++++++++++++++++ 2 files changed, 161 insertions(+) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 14971ff1f..078cf4a5b 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1496,6 +1496,23 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }, }) + // Streamed LLM path: when the pipeline opts into LLM streaming and the turn + // cannot produce a tool call (no tools), stream tokens straight to the client + // as transcript deltas and sentence-pipe them into TTS. Tool turns fall + // through to the buffered path below, since partial tool-call output can't be + // safely spoken mid-stream. + if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() && len(tools) == 0 { + var respMods []types.Modality + if overrides != nil { + respMods = overrides.OutputModalities + } + if modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) { + if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config) { + return + } + } + } + predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil) if err != nil { sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index aa3f31d7d..015f6850e 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -2,8 +2,13 @@ package openai import ( "context" + "encoding/base64" + "fmt" + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/reasoning" ) @@ -84,3 +89,142 @@ func (s *speechStreamer) finish() (content string, audio []byte, err error) { } return s.extractor.CleanedContent(), s.audio, s.err } + +// streamLLMResponse drives a streamed, plain-content (no tools) realtime reply. +// It announces the assistant item before tokens arrive, feeds the LLM token +// callback through a speechStreamer (transcript deltas + sentence-piped TTS), +// then emits the terminal events. It returns true when it has fully handled the +// response so the caller can return; callers must only invoke it for turns with +// no tools and an audio modality (see triggerResponseAtTurn). +func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool { + // Announce the assistant item up front so streamed deltas target a known item. + item := types.MessageItemUnion{ + Assistant: &types.MessageItemAssistant{ + ID: generateItemID(), + Status: types.ItemStatusInProgress, + Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}}, + }, + } + conv.Lock.Lock() + conv.Items = append(conv.Items, &item) + conv.Lock.Unlock() + + sendEvent(t, types.ResponseOutputItemAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + sendEvent(t, types.ResponseContentPartAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + + cancel := func() { + conv.Lock.Lock() + for i := len(conv.Items) - 1; i >= 0; i-- { + if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == item.Assistant.ID { + conv.Items = append(conv.Items[:i], conv.Items[i+1:]...) + break + } + } + conv.Lock.Unlock() + sendEvent(t, types.ResponseDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled}, + }) + } + + var template string + if llmCfg.TemplateConfig.UseTokenizerTemplate { + template = llmCfg.GetModelTemplate() + } else { + template = llmCfg.TemplateConfig.Chat + } + thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) + + streamer := newSpeechStreamer(ctx, t, session, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) + cb := func(token string, _ backend.TokenUsage) bool { + if ctx.Err() != nil { + return false + } + streamer.onToken(token) + return true + } + + predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, nil, nil, nil, nil, nil) + if err != nil { + sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + return true + } + if _, err := predFunc(); err != nil { + if ctx.Err() != nil { + cancel() + return true + } + sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + return true + } + if ctx.Err() != nil { + cancel() + return true + } + + content, audio, err := streamer.finish() + if err != nil { + sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) + return true + } + + _, isWebRTC := t.(*WebRTCTransport) + + sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + Transcript: content, + }) + if !isWebRTC { + sendEvent(t, types.ResponseOutputAudioDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + }) + } + + conv.Lock.Lock() + item.Assistant.Status = types.ItemStatusCompleted + item.Assistant.Content[0].Transcript = content + if !isWebRTC { + item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio) + } + conv.Lock.Unlock() + + sendEvent(t, types.ResponseContentPartDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + sendEvent(t, types.ResponseOutputItemDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + sendEvent(t, types.ResponseDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCompleted}, + }) + return true +}