feat(realtime): wire streamLLMResponse for token-streamed replies

triggerResponseAtTurn takes a streamed path when pipeline.streaming.llm is
set, the turn has no tools, and audio is requested: streamLLMResponse
announces the assistant item, drives the LLM token callback through a
speechStreamer (reasoning-stripped transcript deltas + sentence-piped TTS),
and emits the terminal events. Tool turns and non-streaming pipelines keep
the existing buffered path unchanged, so this is strictly opt-in.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-04 16:40:32 +00:00
parent ca23d05c66
commit 16a5bab71f
2 changed files with 161 additions and 0 deletions

View File

@@ -1496,6 +1496,23 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
},
})
// Streamed LLM path: when the pipeline opts into LLM streaming and the turn
// cannot produce a tool call (no tools), stream tokens straight to the client
// as transcript deltas and sentence-pipe them into TTS. Tool turns fall
// through to the buffered path below, since partial tool-call output can't be
// safely spoken mid-stream.
if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() && len(tools) == 0 {
var respMods []types.Modality
if overrides != nil {
respMods = overrides.OutputModalities
}
if modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) {
if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config) {
return
}
}
}
predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil)
if err != nil {
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here

View File

@@ -2,8 +2,13 @@ package openai
import (
"context"
"encoding/base64"
"fmt"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/reasoning"
)
@@ -84,3 +89,142 @@ func (s *speechStreamer) finish() (content string, audio []byte, err error) {
}
return s.extractor.CleanedContent(), s.audio, s.err
}
// streamLLMResponse drives a streamed, plain-content (no tools) realtime reply.
// It announces the assistant item before tokens arrive, feeds the LLM token
// callback through a speechStreamer (transcript deltas + sentence-piped TTS),
// then emits the terminal events. It returns true when it has fully handled the
// response so the caller can return; callers must only invoke it for turns with
// no tools and an audio modality (see triggerResponseAtTurn).
func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool {
// Announce the assistant item up front so streamed deltas target a known item.
item := types.MessageItemUnion{
Assistant: &types.MessageItemAssistant{
ID: generateItemID(),
Status: types.ItemStatusInProgress,
Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}},
},
}
conv.Lock.Lock()
conv.Items = append(conv.Items, &item)
conv.Lock.Unlock()
sendEvent(t, types.ResponseOutputItemAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: 0,
Item: item,
})
sendEvent(t, types.ResponseContentPartAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Part: item.Assistant.Content[0],
})
cancel := func() {
conv.Lock.Lock()
for i := len(conv.Items) - 1; i >= 0; i-- {
if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == item.Assistant.ID {
conv.Items = append(conv.Items[:i], conv.Items[i+1:]...)
break
}
}
conv.Lock.Unlock()
sendEvent(t, types.ResponseDoneEvent{
ServerEventBase: types.ServerEventBase{},
Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled},
})
}
var template string
if llmCfg.TemplateConfig.UseTokenizerTemplate {
template = llmCfg.GetModelTemplate()
} else {
template = llmCfg.TemplateConfig.Chat
}
thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig)
streamer := newSpeechStreamer(ctx, t, session, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig)
cb := func(token string, _ backend.TokenUsage) bool {
if ctx.Err() != nil {
return false
}
streamer.onToken(token)
return true
}
predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, nil, nil, nil, nil, nil)
if err != nil {
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID)
return true
}
if _, err := predFunc(); err != nil {
if ctx.Err() != nil {
cancel()
return true
}
sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID)
return true
}
if ctx.Err() != nil {
cancel()
return true
}
content, audio, err := streamer.finish()
if err != nil {
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
return true
}
_, isWebRTC := t.(*WebRTCTransport)
sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Transcript: content,
})
if !isWebRTC {
sendEvent(t, types.ResponseOutputAudioDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
})
}
conv.Lock.Lock()
item.Assistant.Status = types.ItemStatusCompleted
item.Assistant.Content[0].Transcript = content
if !isWebRTC {
item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio)
}
conv.Lock.Unlock()
sendEvent(t, types.ResponseContentPartDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Part: item.Assistant.Content[0],
})
sendEvent(t, types.ResponseOutputItemDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: 0,
Item: item,
})
sendEvent(t, types.ResponseDoneEvent{
ServerEventBase: types.ServerEventBase{},
Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCompleted},
})
return true
}