mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-06 07:46:15 -04:00
Compare commits
15 Commits
dependabot
...
feat/realt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d05d83ff36 | ||
|
|
076dcdbed8 | ||
|
|
9ec1456ec6 | ||
|
|
cb3609530a | ||
|
|
f48344f2ff | ||
|
|
658a3efb20 | ||
|
|
16a5bab71f | ||
|
|
ca23d05c66 | ||
|
|
685e4632d7 | ||
|
|
98ed541b22 | ||
|
|
378d6c25cf | ||
|
|
2c6fdd0570 | ||
|
|
2ba2216ce2 | ||
|
|
e0820a11c9 | ||
|
|
16d7704a69 |
@@ -308,6 +308,34 @@ func DefaultRegistry() map[string]FieldMetaOverride {
|
||||
},
|
||||
Order: 64,
|
||||
},
|
||||
"pipeline.disable_thinking": {
|
||||
Section: "pipeline",
|
||||
Label: "Disable Thinking",
|
||||
Description: "Suppress reasoning/thinking output from the pipeline LLM (sets enable_thinking=false on the underlying model). Use for models that emit <think> blocks you don't want spoken or streamed back to the realtime client.",
|
||||
Component: "toggle",
|
||||
Order: 65,
|
||||
},
|
||||
"pipeline.streaming.llm": {
|
||||
Section: "pipeline",
|
||||
Label: "Stream LLM",
|
||||
Description: "Stream LLM tokens to the realtime client as they are generated instead of waiting for the full response. Emits incremental response.output_audio_transcript.delta / text deltas.",
|
||||
Component: "toggle",
|
||||
Order: 66,
|
||||
},
|
||||
"pipeline.streaming.tts": {
|
||||
Section: "pipeline",
|
||||
Label: "Stream TTS",
|
||||
Description: "Stream synthesized audio chunks to the realtime client as they are produced (requires a TTS backend that implements TTSStream). Falls back to unary synthesis otherwise.",
|
||||
Component: "toggle",
|
||||
Order: 67,
|
||||
},
|
||||
"pipeline.streaming.transcription": {
|
||||
Section: "pipeline",
|
||||
Label: "Stream Transcription",
|
||||
Description: "Stream partial transcription text to the realtime client as the STT backend produces it (requires a transcription backend that implements AudioTranscriptionStream). Falls back to unary transcription otherwise.",
|
||||
Component: "toggle",
|
||||
Order: 68,
|
||||
},
|
||||
|
||||
// --- Functions ---
|
||||
"function.grammar.parallel_calls": {
|
||||
|
||||
@@ -499,6 +499,16 @@ type Pipeline struct {
|
||||
// the pipeline's LLM without editing the LLM model config. Overrides the LLM's
|
||||
// own reasoning_effort. Unset leaves the LLM model config in charge.
|
||||
ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"`
|
||||
|
||||
// Streaming opts each pipeline stage into incremental delivery (LLM tokens,
|
||||
// TTS audio chunks, transcription text). Unset stages keep the blocking
|
||||
// unary path, so existing configs are unaffected.
|
||||
Streaming PipelineStreaming `yaml:"streaming,omitempty" json:"streaming,omitempty"`
|
||||
|
||||
// DisableThinking suppresses reasoning/thinking for the pipeline LLM (maps
|
||||
// to enable_thinking=false backend metadata) without editing the underlying
|
||||
// LLM model config. Unset leaves the LLM model config in charge.
|
||||
DisableThinking *bool `yaml:"disable_thinking,omitempty" json:"disable_thinking,omitempty"`
|
||||
}
|
||||
|
||||
// ApplyReasoningEffort resolves the effective reasoning effort — a per-request
|
||||
@@ -530,6 +540,29 @@ func (c *ModelConfig) ApplyReasoningEffort(requestEffort string) {
|
||||
}
|
||||
}
|
||||
|
||||
// @Description PipelineStreaming toggles incremental delivery per realtime stage.
|
||||
type PipelineStreaming struct {
|
||||
LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"`
|
||||
TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"`
|
||||
Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"`
|
||||
}
|
||||
|
||||
// StreamLLM reports whether LLM tokens should be streamed for this pipeline.
|
||||
func (p Pipeline) StreamLLM() bool { return p.Streaming.LLM != nil && *p.Streaming.LLM }
|
||||
|
||||
// StreamTTS reports whether TTS audio should be streamed for this pipeline.
|
||||
func (p Pipeline) StreamTTS() bool { return p.Streaming.TTS != nil && *p.Streaming.TTS }
|
||||
|
||||
// StreamTranscription reports whether transcription text should be streamed.
|
||||
func (p Pipeline) StreamTranscription() bool {
|
||||
return p.Streaming.Transcription != nil && *p.Streaming.Transcription
|
||||
}
|
||||
|
||||
// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off.
|
||||
func (p Pipeline) ThinkingDisabled() bool {
|
||||
return p.DisableThinking != nil && *p.DisableThinking
|
||||
}
|
||||
|
||||
// @Description File configuration for model downloads
|
||||
type File struct {
|
||||
Filename string `yaml:"filename,omitempty" json:"filename,omitempty"`
|
||||
|
||||
54
core/config/pipeline_streaming_test.go
Normal file
54
core/config/pipeline_streaming_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// The realtime pipeline can stream each stage (LLM tokens, TTS audio,
|
||||
// transcription text) and can disable model "thinking" for the LLM. These are
|
||||
// opt-in per pipeline; everything defaults to off so existing configs keep the
|
||||
// unary behaviour.
|
||||
var _ = Describe("Pipeline streaming config", func() {
|
||||
It("defaults every streaming + thinking helper to false when unset", func() {
|
||||
var p Pipeline
|
||||
Expect(p.StreamLLM()).To(BeFalse())
|
||||
Expect(p.StreamTTS()).To(BeFalse())
|
||||
Expect(p.StreamTranscription()).To(BeFalse())
|
||||
Expect(p.ThinkingDisabled()).To(BeFalse())
|
||||
})
|
||||
|
||||
It("parses the nested streaming block and disable_thinking from YAML", func() {
|
||||
var c ModelConfig
|
||||
err := yaml.Unmarshal([]byte(`
|
||||
name: gpt-realtime
|
||||
pipeline:
|
||||
llm: my-llm
|
||||
tts: my-tts
|
||||
transcription: my-stt
|
||||
streaming:
|
||||
llm: true
|
||||
tts: true
|
||||
transcription: true
|
||||
disable_thinking: true
|
||||
`), &c)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(c.Pipeline.StreamLLM()).To(BeTrue())
|
||||
Expect(c.Pipeline.StreamTTS()).To(BeTrue())
|
||||
Expect(c.Pipeline.StreamTranscription()).To(BeTrue())
|
||||
Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue())
|
||||
})
|
||||
|
||||
It("treats an explicit false in the streaming block as disabled", func() {
|
||||
var c ModelConfig
|
||||
err := yaml.Unmarshal([]byte(`
|
||||
name: gpt-realtime
|
||||
pipeline:
|
||||
streaming:
|
||||
tts: false
|
||||
`), &c)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(c.Pipeline.StreamTTS()).To(BeFalse())
|
||||
})
|
||||
})
|
||||
@@ -235,6 +235,12 @@ type Model interface {
|
||||
Transcribe(ctx context.Context, audio, language string, translate bool, diarize bool, prompt string) (*schema.TranscriptionResult, error)
|
||||
Predict(ctx context.Context, messages schema.Messages, images, videos, audios []string, tokenCallback func(string, backend.TokenUsage) bool, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, logprobs *int, topLogprobs *int, logitBias map[string]float64) (func() (backend.LLMResponse, error), error)
|
||||
TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error)
|
||||
// TTSStream synthesizes speech incrementally, invoking onAudio with raw PCM
|
||||
// chunks (and the backend sample rate) as they are produced.
|
||||
TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error
|
||||
// TranscribeStream transcribes audio incrementally, invoking onDelta for each
|
||||
// transcript text fragment and returning the final aggregated result.
|
||||
TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error)
|
||||
PredictConfig() *config.ModelConfig
|
||||
}
|
||||
|
||||
@@ -1254,27 +1260,15 @@ func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Co
|
||||
// TODO: If we have a real any-to-any model then transcription is optional
|
||||
var transcript string
|
||||
if session.InputAudioTranscription != nil {
|
||||
tr, err := session.ModelInterface.Transcribe(ctx, f.Name(), session.InputAudioTranscription.Language, false, false, session.InputAudioTranscription.Prompt)
|
||||
// emitTranscription streams transcript deltas when
|
||||
// pipeline.streaming.transcription is set, otherwise emits a single
|
||||
// completed event; either way it returns the final transcript text.
|
||||
var err error
|
||||
transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name())
|
||||
if err != nil {
|
||||
sendError(t, "transcription_failed", err.Error(), "", "event_TODO")
|
||||
return
|
||||
} else if tr == nil {
|
||||
sendError(t, "transcription_failed", "trancribe result is nil", "", "event_TODO")
|
||||
return
|
||||
}
|
||||
|
||||
transcript = tr.Text
|
||||
sendEvent(t, types.ConversationItemInputAudioTranscriptionCompletedEvent{
|
||||
ServerEventBase: types.ServerEventBase{
|
||||
EventID: "event_TODO",
|
||||
},
|
||||
|
||||
ItemID: generateItemID(),
|
||||
// ResponseID: "resp_TODO", // Not needed for transcription completed event
|
||||
// OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Transcript: transcript,
|
||||
})
|
||||
} else {
|
||||
sendNotImplemented(t, "any-to-any models")
|
||||
return
|
||||
@@ -1502,6 +1496,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
},
|
||||
})
|
||||
|
||||
// Streamed LLM path: when the pipeline opts into LLM streaming, stream the
|
||||
// transcript to the client as it is generated and synthesize the buffered
|
||||
// message once. Tool turns are supported only when the model uses its
|
||||
// tokenizer template: the C++ autoparser then delivers content and tool
|
||||
// calls via ChatDeltas (clearing the text stream), so the spoken transcript
|
||||
// never leaks tool-call tokens. Grammar-based function calling emits the
|
||||
// call as JSON in the token stream, so those turns keep the buffered path.
|
||||
if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() {
|
||||
canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate
|
||||
var respMods []types.Modality
|
||||
if overrides != nil {
|
||||
respMods = overrides.OutputModalities
|
||||
}
|
||||
if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) {
|
||||
if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil)
|
||||
if err != nil {
|
||||
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here
|
||||
@@ -1579,7 +1593,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
// ExtractReasoningWithConfig is a no-op when no tag pair matches,
|
||||
// so it's safe to apply unconditionally in the no-reasoning branch.
|
||||
if deltaReasoning == "" && deltaContent != "" {
|
||||
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, config.ReasoningConfig)
|
||||
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
|
||||
}
|
||||
reasoningText = deltaReasoning
|
||||
responseWithoutReasoning = deltaContent
|
||||
@@ -1587,7 +1601,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
cleanedResponse = deltaContent
|
||||
toolCalls = deltaToolCalls
|
||||
} else {
|
||||
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, config.ReasoningConfig)
|
||||
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
|
||||
textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig)
|
||||
cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig)
|
||||
toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig)
|
||||
@@ -1713,64 +1727,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
return
|
||||
}
|
||||
|
||||
audioFilePath, res, err := session.ModelInterface.TTS(ctx, finalSpeech, session.Voice, session.InputAudioTranscription.Language)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
xlog.Debug("TTS cancelled (barge-in)")
|
||||
sendCancelledResponse()
|
||||
return
|
||||
}
|
||||
xlog.Error("TTS failed", "error", err)
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
if !res.Success {
|
||||
xlog.Error("TTS failed", "message", res.Message)
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %s", res.Message), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
defer func() { _ = os.Remove(audioFilePath) }()
|
||||
|
||||
audioBytes, err := os.ReadFile(audioFilePath)
|
||||
if err != nil {
|
||||
xlog.Error("failed to read TTS file", "error", err)
|
||||
sendError(t, "tts_error", fmt.Sprintf("Failed to read TTS audio: %v", err), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse WAV header to get raw PCM and the actual sample rate from the TTS backend.
|
||||
pcmData, ttsSampleRate := laudio.ParseWAV(audioBytes)
|
||||
if ttsSampleRate == 0 {
|
||||
ttsSampleRate = localSampleRate
|
||||
}
|
||||
xlog.Debug("TTS audio parsed", "raw_bytes", len(audioBytes), "pcm_bytes", len(pcmData), "sample_rate", ttsSampleRate)
|
||||
|
||||
// SendAudio (WebRTC) passes PCM at the TTS sample rate directly to the
|
||||
// Opus encoder, which resamples to 48kHz internally. This avoids a
|
||||
// lossy intermediate resample through 16kHz.
|
||||
// XXX: This is a noop in websocket mode; it's included in the JSON instead
|
||||
if err := t.SendAudio(ctx, pcmData, ttsSampleRate); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
xlog.Debug("Audio playback cancelled (barge-in)")
|
||||
sendCancelledResponse()
|
||||
return
|
||||
}
|
||||
xlog.Error("failed to send audio via transport", "error", err)
|
||||
}
|
||||
|
||||
// For WebSocket clients, resample to the session's output rate and
|
||||
// deliver audio as base64 in JSON events. WebRTC clients already
|
||||
// received audio over the RTP track, so skip the base64 payload.
|
||||
if !isWebRTC {
|
||||
wsPCM := pcmData
|
||||
if ttsSampleRate != session.OutputSampleRate {
|
||||
samples := sound.BytesToInt16sLE(pcmData)
|
||||
resampled := sound.ResampleInt16(samples, ttsSampleRate, session.OutputSampleRate)
|
||||
wsPCM = sound.Int16toBytesLE(resampled)
|
||||
}
|
||||
audioString = base64.StdEncoding.EncodeToString(wsPCM)
|
||||
}
|
||||
|
||||
// Transcript of the spoken reply (the audio's text).
|
||||
sendEvent(t, types.ResponseOutputAudioTranscriptDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
@@ -1788,15 +1745,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
Transcript: finalSpeech,
|
||||
})
|
||||
|
||||
// Synthesize and send the audio. With pipeline.streaming.tts enabled
|
||||
// emitSpeech forwards a response.output_audio.delta per backend PCM
|
||||
// chunk as it's produced; otherwise it sends the whole utterance as a
|
||||
// single delta. The returned PCM is stored (base64) on the item below.
|
||||
pcmAudio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
xlog.Debug("TTS cancelled (barge-in)")
|
||||
sendCancelledResponse()
|
||||
return
|
||||
}
|
||||
xlog.Error("TTS failed", "error", err)
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
if !isWebRTC {
|
||||
audioString = base64.StdEncoding.EncodeToString(pcmAudio)
|
||||
}
|
||||
|
||||
if !isWebRTC {
|
||||
sendEvent(t, types.ResponseOutputAudioDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: item.Assistant.ID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Delta: audioString,
|
||||
})
|
||||
sendEvent(t, types.ResponseOutputAudioDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
@@ -1849,17 +1817,27 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
})
|
||||
}
|
||||
|
||||
// Handle Tool Calls. Two paths:
|
||||
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run
|
||||
// server-side; we append both the call and its output to conv.Items
|
||||
// and re-trigger a follow-up response so the model can speak the
|
||||
// result. The client only sees observability events.
|
||||
// - All other tools follow the standard OpenAI flow: emit
|
||||
// function_call_arguments.done and wait for the client to send
|
||||
// conversation.item.create back.
|
||||
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls))
|
||||
// Emit the parsed tool calls, the terminal response.done, and (for
|
||||
// server-side assistant tools) the follow-up response. Shared with the
|
||||
// streamed path so both finalize tool calls identically.
|
||||
emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn)
|
||||
}
|
||||
|
||||
// emitToolCallItems emits the realtime function_call items for the parsed tool
|
||||
// calls, the terminal response.done, and — for server-side LocalAI Assistant
|
||||
// tools — re-triggers a follow-up response so the model can speak the result.
|
||||
// hasContent shifts the tool-call output index past the assistant content item
|
||||
// when the same turn also produced spoken/text content. Two tool paths:
|
||||
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side;
|
||||
// we append both the call and its output to conv.Items and re-trigger. The
|
||||
// client only sees observability events.
|
||||
// - All other tools follow the standard OpenAI flow: emit
|
||||
// function_call_arguments.done and wait for the client to send
|
||||
// conversation.item.create back.
|
||||
func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) {
|
||||
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls))
|
||||
executedAssistantTool := false
|
||||
for i, tc := range finalToolCalls {
|
||||
for i, tc := range toolCalls {
|
||||
toolCallID := generateItemID()
|
||||
callID := "call_" + generateUniqueID() // OpenAI uses call_xyz
|
||||
|
||||
@@ -1879,7 +1857,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
conv.Lock.Unlock()
|
||||
|
||||
outputIndex := i
|
||||
if finalSpeech != "" {
|
||||
if hasContent {
|
||||
outputIndex++
|
||||
}
|
||||
|
||||
|
||||
138
core/http/endpoints/openai/realtime_doubles_test.go
Normal file
138
core/http/endpoints/openai/realtime_doubles_test.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/pkg/grpc/proto"
|
||||
)
|
||||
|
||||
// fakeTransport records the server events and audio sent to a realtime client
|
||||
// so streaming behaviour can be asserted without a real WebSocket/WebRTC peer.
|
||||
// It is not a *WebRTCTransport, so handler code takes the WebSocket path.
|
||||
type fakeTransport struct {
|
||||
events []types.ServerEvent
|
||||
audio []fakeAudioChunk
|
||||
}
|
||||
|
||||
type fakeAudioChunk struct {
|
||||
pcm []byte
|
||||
sampleRate int
|
||||
}
|
||||
|
||||
func (f *fakeTransport) SendEvent(e types.ServerEvent) error {
|
||||
f.events = append(f.events, e)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeTransport) ReadEvent() ([]byte, error) { return nil, nil }
|
||||
|
||||
func (f *fakeTransport) SendAudio(_ context.Context, pcm []byte, sampleRate int) error {
|
||||
f.audio = append(f.audio, fakeAudioChunk{pcm: pcm, sampleRate: sampleRate})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeTransport) Close() error { return nil }
|
||||
|
||||
// countEvents returns how many recorded events have the given type.
|
||||
func (f *fakeTransport) countEvents(et types.ServerEventType) int {
|
||||
n := 0
|
||||
for _, e := range f.events {
|
||||
if e.ServerEventType() == et {
|
||||
n++
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// transcriptDeltaText concatenates the Delta of every recorded transcript
|
||||
// delta event — i.e. the text streamed to the client as it is generated.
|
||||
func (f *fakeTransport) transcriptDeltaText() string {
|
||||
var b strings.Builder
|
||||
for _, e := range f.events {
|
||||
if d, ok := e.(types.ResponseOutputAudioTranscriptDeltaEvent); ok {
|
||||
b.WriteString(d.Delta)
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// fakeModel is a configurable Model double. TTSStream replays ttsStreamChunks
|
||||
// and TranscribeStream replays transcribeDeltas, so the handler's streaming
|
||||
// paths can be driven deterministically.
|
||||
type fakeModel struct {
|
||||
cfg *config.ModelConfig
|
||||
|
||||
ttsFile string
|
||||
ttsStreamChunks [][]byte
|
||||
ttsStreamRate int
|
||||
ttsStreamErr error
|
||||
|
||||
transcribeDeltas []string
|
||||
transcribeFinal *schema.TranscriptionResult
|
||||
|
||||
// Predict streaming: predictTokens are replayed through the token callback
|
||||
// (simulating streamed LLM output); predictResp/predictErr are returned by
|
||||
// the deferred predict function. predictChunkDeltas, when set, are delivered
|
||||
// per-token via TokenUsage.ChatDeltas to exercise the autoparser path.
|
||||
predictTokens []string
|
||||
predictChunkDeltas [][]*proto.ChatDelta
|
||||
predictResp backend.LLMResponse
|
||||
predictErr error
|
||||
}
|
||||
|
||||
func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, string) (*schema.TranscriptionResult, error) {
|
||||
return m.transcribeFinal, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) Predict(_ context.Context, _ schema.Messages, _, _, _ []string, cb func(string, backend.TokenUsage) bool, _ []types.ToolUnion, _ *types.ToolChoiceUnion, _, _ *int, _ map[string]float64) (func() (backend.LLMResponse, error), error) {
|
||||
if m.predictErr != nil {
|
||||
return nil, m.predictErr
|
||||
}
|
||||
return func() (backend.LLMResponse, error) {
|
||||
for i, tok := range m.predictTokens {
|
||||
if cb == nil {
|
||||
continue
|
||||
}
|
||||
usage := backend.TokenUsage{}
|
||||
if i < len(m.predictChunkDeltas) {
|
||||
usage.ChatDeltas = m.predictChunkDeltas[i]
|
||||
}
|
||||
cb(tok, usage)
|
||||
}
|
||||
return m.predictResp, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) {
|
||||
return m.ttsFile, &proto.Result{Success: true}, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) TTSStream(_ context.Context, _, _, _ string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
if m.ttsStreamErr != nil {
|
||||
return m.ttsStreamErr
|
||||
}
|
||||
for _, c := range m.ttsStreamChunks {
|
||||
if err := onAudio(c, m.ttsStreamRate); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) TranscribeStream(_ context.Context, _, _ string, _, _ bool, _ string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
for _, d := range m.transcribeDeltas {
|
||||
onDelta(d)
|
||||
}
|
||||
return m.transcribeFinal, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) PredictConfig() *config.ModelConfig { return m.cfg }
|
||||
@@ -3,6 +3,7 @@ package openai
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -87,6 +88,14 @@ func (m *transcriptOnlyModel) TTS(ctx context.Context, text, voice, language str
|
||||
return "", nil, fmt.Errorf("TTS not supported in transcript-only mode")
|
||||
}
|
||||
|
||||
func (m *transcriptOnlyModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
return fmt.Errorf("TTS not supported in transcript-only mode")
|
||||
}
|
||||
|
||||
func (m *transcriptOnlyModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta)
|
||||
}
|
||||
|
||||
func (m *transcriptOnlyModel) PredictConfig() *config.ModelConfig {
|
||||
return nil
|
||||
}
|
||||
@@ -321,10 +330,75 @@ func (m *wrappedModel) TTS(ctx context.Context, text, voice, language string) (s
|
||||
return backend.ModelTTS(ctx, text, voice, language, "", nil, m.modelLoader, m.appConfig, *m.TTSConfig)
|
||||
}
|
||||
|
||||
func (m *wrappedModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
return ttsStream(ctx, m.modelLoader, m.appConfig, *m.TTSConfig, text, voice, language, onAudio)
|
||||
}
|
||||
|
||||
func (m *wrappedModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta)
|
||||
}
|
||||
|
||||
func (m *wrappedModel) PredictConfig() *config.ModelConfig {
|
||||
return m.LLMConfig
|
||||
}
|
||||
|
||||
// wavStreamHeaderBytes is the size of the WAV header that backend.ModelTTSStream
|
||||
// emits as its first audio callback; the sample rate lives at byte offset 24.
|
||||
const wavStreamHeaderBytes = 44
|
||||
|
||||
// ttsStream adapts backend.ModelTTSStream (which emits a WAV stream: a 44-byte
|
||||
// header carrying the sample rate, then raw PCM) to the realtime onAudio
|
||||
// callback, which wants raw PCM plus the sample rate. The header is buffered
|
||||
// until complete, the sample rate is read from it, and subsequent bytes are
|
||||
// forwarded as PCM.
|
||||
func ttsStream(ctx context.Context, ml *model.ModelLoader, appConfig *config.ApplicationConfig, ttsConfig config.ModelConfig, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
var header []byte
|
||||
headerDone := false
|
||||
sampleRate := 0
|
||||
return backend.ModelTTSStream(ctx, text, voice, language, "", nil, ml, appConfig, ttsConfig, func(b []byte) error {
|
||||
if headerDone {
|
||||
if len(b) == 0 {
|
||||
return nil
|
||||
}
|
||||
return onAudio(b, sampleRate)
|
||||
}
|
||||
header = append(header, b...)
|
||||
if len(header) < wavStreamHeaderBytes {
|
||||
return nil
|
||||
}
|
||||
sampleRate = int(binary.LittleEndian.Uint32(header[24:28]))
|
||||
headerDone = true
|
||||
if len(header) > wavStreamHeaderBytes {
|
||||
return onAudio(header[wavStreamHeaderBytes:], sampleRate)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// transcribeStream adapts backend.ModelTranscriptionStream to the realtime
|
||||
// onDelta callback, returning the final aggregated transcription result.
|
||||
func transcribeStream(ctx context.Context, ml *model.ModelLoader, transcriptionConfig config.ModelConfig, appConfig *config.ApplicationConfig, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
var final *schema.TranscriptionResult
|
||||
err := backend.ModelTranscriptionStream(ctx, backend.TranscriptionRequest{
|
||||
Audio: audio,
|
||||
Language: language,
|
||||
Translate: translate,
|
||||
Diarize: diarize,
|
||||
Prompt: prompt,
|
||||
}, ml, transcriptionConfig, appConfig, func(chunk backend.TranscriptionStreamChunk) {
|
||||
if chunk.Delta != "" {
|
||||
onDelta(chunk.Delta)
|
||||
}
|
||||
if chunk.Final != nil {
|
||||
final = chunk.Final
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return final, nil
|
||||
}
|
||||
|
||||
func newTranscriptionOnlyModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) (Model, *config.ModelConfig, error) {
|
||||
cfgVAD, err := cl.LoadModelConfigFileByName(pipeline.VAD, ml.ModelPath)
|
||||
if err != nil {
|
||||
@@ -454,8 +528,10 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model
|
||||
return nil, fmt.Errorf("failed to validate config: %w", err)
|
||||
}
|
||||
|
||||
// Let the pipeline set the LLM's reasoning effort (cfgLLM is a per-session copy).
|
||||
// Let the pipeline set the LLM's reasoning effort and force thinking off
|
||||
// (cfgLLM is a per-session copy). disable_thinking applies after the effort.
|
||||
applyPipelineReasoning(cfgLLM, *pipeline)
|
||||
applyPipelineThinking(cfgLLM, *pipeline)
|
||||
|
||||
cfgTTS, err := cl.LoadModelConfigFileByName(pipeline.TTS, ml.ModelPath)
|
||||
if err != nil {
|
||||
|
||||
102
core/http/endpoints/openai/realtime_speech.go
Normal file
102
core/http/endpoints/openai/realtime_speech.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
laudio "github.com/mudler/LocalAI/pkg/audio"
|
||||
"github.com/mudler/LocalAI/pkg/sound"
|
||||
)
|
||||
|
||||
// emitSpeech synthesizes text and sends the audio to the client. When the
|
||||
// pipeline opts into TTS streaming it forwards each PCM chunk as its own
|
||||
// response.output_audio.delta as soon as the backend produces it; otherwise it
|
||||
// synthesizes the whole utterance and sends it as a single delta.
|
||||
//
|
||||
// It deliberately does NOT emit transcript or audio-done events: the caller owns
|
||||
// those so a streamed reply can be split into several spoken segments that share
|
||||
// one response/item.
|
||||
//
|
||||
// It returns the PCM audio (at the session output rate) accumulated across all
|
||||
// chunks, which the caller base64-encodes onto the conversation item. For WebRTC
|
||||
// the audio goes over the RTP track instead, so the returned slice is empty.
|
||||
func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) ([]byte, error) {
|
||||
if text == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
_, isWebRTC := t.(*WebRTCTransport)
|
||||
|
||||
var wsAudio []byte // PCM at the session output rate, accumulated for the item record
|
||||
|
||||
// sendChunk hands one PCM buffer to the transport: WebRTC consumes the raw
|
||||
// PCM directly (it resamples internally); WebSocket gets base64 PCM at the
|
||||
// session output rate via a JSON delta event.
|
||||
sendChunk := func(pcm []byte, sampleRate int) error {
|
||||
if len(pcm) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := t.SendAudio(ctx, pcm, sampleRate); err != nil {
|
||||
return err
|
||||
}
|
||||
if isWebRTC {
|
||||
return nil
|
||||
}
|
||||
wsPCM := pcm
|
||||
if sampleRate != 0 && sampleRate != session.OutputSampleRate {
|
||||
samples := sound.BytesToInt16sLE(pcm)
|
||||
resampled := sound.ResampleInt16(samples, sampleRate, session.OutputSampleRate)
|
||||
wsPCM = sound.Int16toBytesLE(resampled)
|
||||
}
|
||||
wsAudio = append(wsAudio, wsPCM...)
|
||||
return t.SendEvent(types.ResponseOutputAudioDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Delta: base64.StdEncoding.EncodeToString(wsPCM),
|
||||
})
|
||||
}
|
||||
|
||||
language := ""
|
||||
if session.InputAudioTranscription != nil {
|
||||
language = session.InputAudioTranscription.Language
|
||||
}
|
||||
|
||||
if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS() {
|
||||
if err := session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return wsAudio, nil
|
||||
}
|
||||
|
||||
// Unary fallback: synthesize the whole utterance to a file, then emit once.
|
||||
audioFilePath, res, err := session.ModelInterface.TTS(ctx, text, session.Voice, language)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res != nil && !res.Success {
|
||||
return nil, fmt.Errorf("tts generation failed: %s", res.Message)
|
||||
}
|
||||
defer func() { _ = os.Remove(audioFilePath) }()
|
||||
|
||||
// filepath.Clean normalizes the backend-produced temp path before reading
|
||||
// (also keeps gosec G304 quiet — the path is backend-controlled, not user input).
|
||||
audioBytes, err := os.ReadFile(filepath.Clean(audioFilePath))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read tts audio: %w", err)
|
||||
}
|
||||
pcm, sampleRate := laudio.ParseWAV(audioBytes)
|
||||
if sampleRate == 0 {
|
||||
sampleRate = session.OutputSampleRate
|
||||
}
|
||||
if err := sendChunk(pcm, sampleRate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return wsAudio, nil
|
||||
}
|
||||
70
core/http/endpoints/openai/realtime_speech_test.go
Normal file
70
core/http/endpoints/openai/realtime_speech_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
laudio "github.com/mudler/LocalAI/pkg/audio"
|
||||
)
|
||||
|
||||
// emitSpeech synthesizes a piece of text and forwards the audio to the client,
|
||||
// streaming a delta per TTS chunk when the pipeline opts in, or sending the
|
||||
// whole utterance as one delta otherwise.
|
||||
var _ = Describe("emitSpeech", func() {
|
||||
ttsOn := true
|
||||
|
||||
streamingSession := func(m Model) *Session {
|
||||
return &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &ttsOn}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
It("streams one output_audio.delta per TTS chunk when streaming is enabled", func() {
|
||||
m := &fakeModel{
|
||||
ttsStreamChunks: [][]byte{{1, 2}, {3, 4}, {5, 6}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
audio, err := emitSpeech(context.Background(), t, streamingSession(m), "resp1", "item1", "Hello there.")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(3))
|
||||
// The returned audio is all chunks concatenated (session output rate).
|
||||
Expect(audio).To(Equal([]byte{1, 2, 3, 4, 5, 6}))
|
||||
})
|
||||
|
||||
It("sends a single output_audio.delta in unary mode", func() {
|
||||
// A minimal real WAV file for the unary TTS path to read + parse.
|
||||
f, err := os.CreateTemp("", "emit-*.wav")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
defer func() { _ = os.Remove(f.Name()) }()
|
||||
pcm := make([]byte, 320) // 160 samples of silence
|
||||
hdr := laudio.NewWAVHeader(uint32(len(pcm)))
|
||||
Expect(hdr.Write(f)).To(Succeed())
|
||||
_, err = f.Write(pcm)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(f.Close()).To(Succeed())
|
||||
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: &fakeModel{ttsFile: f.Name()},
|
||||
ModelConfig: &config.ModelConfig{}, // streaming off
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
_, err = emitSpeech(context.Background(), t, session, "resp1", "item1", "Hello there.")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1))
|
||||
})
|
||||
})
|
||||
253
core/http/endpoints/openai/realtime_stream.go
Normal file
253
core/http/endpoints/openai/realtime_stream.go
Normal file
@@ -0,0 +1,253 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/pkg/functions"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// transcriptStreamer turns streamed LLM tokens into the assistant's spoken
|
||||
// transcript: it strips reasoning incrementally and sends one
|
||||
// response.output_audio_transcript.delta per content fragment. It does NOT
|
||||
// synthesize audio — the caller buffers the full message and synthesizes it
|
||||
// once (streaming the audio chunks when the TTS backend supports TTSStream),
|
||||
// which works uniformly for streaming and non-streaming TTS and for languages
|
||||
// without sentence or word boundaries.
|
||||
type transcriptStreamer struct {
|
||||
ctx context.Context
|
||||
t Transport
|
||||
responseID string
|
||||
itemID string
|
||||
extractor *reasoning.ReasoningExtractor
|
||||
|
||||
// announce, if set, is invoked once just before the first transcript delta.
|
||||
// It lets the caller create the assistant item lazily, so a content-less
|
||||
// tool-call turn never emits a spurious empty assistant item.
|
||||
announce func()
|
||||
announced bool
|
||||
}
|
||||
|
||||
func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer {
|
||||
return &transcriptStreamer{
|
||||
ctx: ctx,
|
||||
t: t,
|
||||
responseID: responseID,
|
||||
itemID: itemID,
|
||||
extractor: reasoning.NewReasoningExtractor(thinkingStartToken, spokenReasoningConfig(reasoningCfg)),
|
||||
}
|
||||
}
|
||||
|
||||
// onToken handles one streamed unit of model output, sending a transcript delta
|
||||
// for the new content (reasoning stripped). For plain-content models the unit is
|
||||
// the raw text token; for autoparser tool turns the backend clears the text and
|
||||
// delivers content via ChatDeltas, so the caller passes that content here.
|
||||
func (s *transcriptStreamer) onToken(token string) {
|
||||
_, content := s.extractor.ProcessToken(token)
|
||||
if content == "" {
|
||||
return
|
||||
}
|
||||
if !s.announced {
|
||||
s.announced = true
|
||||
if s.announce != nil {
|
||||
s.announce()
|
||||
}
|
||||
}
|
||||
_ = s.t.SendEvent(types.ResponseOutputAudioTranscriptDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: s.responseID,
|
||||
ItemID: s.itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Delta: content,
|
||||
})
|
||||
}
|
||||
|
||||
// content returns the full transcript so far with reasoning stripped.
|
||||
func (s *transcriptStreamer) content() string {
|
||||
return s.extractor.CleanedContent()
|
||||
}
|
||||
|
||||
// streamLLMResponse drives a streamed realtime reply. It streams the assistant
|
||||
// transcript as the LLM generates, then synthesizes the whole buffered message
|
||||
// once (streaming the audio chunks when the TTS backend supports it, otherwise a
|
||||
// single unary delta). Tool calls parsed from the autoparser ChatDeltas are
|
||||
// emitted after the spoken content. The assistant content item is created lazily
|
||||
// on the first content delta, so a content-less tool-call turn emits only the
|
||||
// tool calls. It returns true when it has fully handled the response so the
|
||||
// caller can return; callers must only invoke it for an audio modality, and with
|
||||
// tools only when the model uses its tokenizer template (see triggerResponseAtTurn).
|
||||
func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, toolTurn int) bool {
|
||||
itemID := generateItemID()
|
||||
item := types.MessageItemUnion{
|
||||
Assistant: &types.MessageItemAssistant{
|
||||
ID: itemID,
|
||||
Status: types.ItemStatusInProgress,
|
||||
Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}},
|
||||
},
|
||||
}
|
||||
|
||||
// announce creates the assistant content item lazily, just before the first
|
||||
// transcript delta — a tool-only turn never produces content, so it stays out
|
||||
// of the conversation and the client sees only the tool calls.
|
||||
announced := false
|
||||
announce := func() {
|
||||
announced = true
|
||||
conv.Lock.Lock()
|
||||
conv.Items = append(conv.Items, &item)
|
||||
conv.Lock.Unlock()
|
||||
sendEvent(t, types.ResponseOutputItemAddedEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
OutputIndex: 0,
|
||||
Item: item,
|
||||
})
|
||||
sendEvent(t, types.ResponseContentPartAddedEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Part: item.Assistant.Content[0],
|
||||
})
|
||||
}
|
||||
|
||||
cancel := func() {
|
||||
if announced {
|
||||
conv.Lock.Lock()
|
||||
for i := len(conv.Items) - 1; i >= 0; i-- {
|
||||
if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == itemID {
|
||||
conv.Items = append(conv.Items[:i], conv.Items[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
conv.Lock.Unlock()
|
||||
}
|
||||
sendEvent(t, types.ResponseDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled},
|
||||
})
|
||||
}
|
||||
|
||||
var template string
|
||||
if llmCfg.TemplateConfig.UseTokenizerTemplate {
|
||||
template = llmCfg.GetModelTemplate()
|
||||
} else {
|
||||
template = llmCfg.TemplateConfig.Chat
|
||||
}
|
||||
thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig)
|
||||
|
||||
streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, llmCfg.ReasoningConfig)
|
||||
streamer.announce = announce
|
||||
cb := func(token string, usage backend.TokenUsage) bool {
|
||||
if ctx.Err() != nil {
|
||||
return false
|
||||
}
|
||||
// Plain-content models stream text via the token; autoparser tool turns
|
||||
// clear the text and deliver content via ChatDeltas, so prefer the latter
|
||||
// when present. Either way only content reaches the transcript — tool-call
|
||||
// deltas are parsed from the final response below.
|
||||
text := token
|
||||
if len(usage.ChatDeltas) > 0 {
|
||||
text = functions.ContentFromChatDeltas(usage.ChatDeltas)
|
||||
}
|
||||
streamer.onToken(text)
|
||||
return true
|
||||
}
|
||||
|
||||
predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, tools, toolChoice, nil, nil, nil)
|
||||
if err != nil {
|
||||
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", itemID)
|
||||
return true
|
||||
}
|
||||
pred, err := predFunc()
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
return true
|
||||
}
|
||||
sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", itemID)
|
||||
return true
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
return true
|
||||
}
|
||||
|
||||
content := streamer.content()
|
||||
toolCalls := functions.ToolCallsFromChatDeltas(pred.ChatDeltas)
|
||||
|
||||
// Finalize the spoken content item only when the turn produced content. A
|
||||
// tool-only turn skips this entirely (no empty assistant item).
|
||||
if content != "" {
|
||||
if !announced {
|
||||
announce()
|
||||
}
|
||||
// Buffer the whole message, then synthesize it once. emitSpeech streams
|
||||
// the audio chunks when the TTS backend supports TTSStream, otherwise it
|
||||
// sends a single unary delta — no per-sentence segmentation either way.
|
||||
audio, err := emitSpeech(ctx, t, session, responseID, itemID, content)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
return true
|
||||
}
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", itemID)
|
||||
return true
|
||||
}
|
||||
|
||||
_, isWebRTC := t.(*WebRTCTransport)
|
||||
|
||||
sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Transcript: content,
|
||||
})
|
||||
if !isWebRTC {
|
||||
sendEvent(t, types.ResponseOutputAudioDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
})
|
||||
}
|
||||
|
||||
conv.Lock.Lock()
|
||||
item.Assistant.Status = types.ItemStatusCompleted
|
||||
item.Assistant.Content[0].Transcript = content
|
||||
if !isWebRTC {
|
||||
item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio)
|
||||
}
|
||||
conv.Lock.Unlock()
|
||||
|
||||
sendEvent(t, types.ResponseContentPartDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Part: item.Assistant.Content[0],
|
||||
})
|
||||
sendEvent(t, types.ResponseOutputItemDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
OutputIndex: 0,
|
||||
Item: item,
|
||||
})
|
||||
}
|
||||
|
||||
// Emit any tool calls, the terminal response.done, and (for server-side
|
||||
// assistant tools) the follow-up turn — shared with the buffered path.
|
||||
emitToolCallItems(ctx, session, conv, t, responseID, toolCalls, content != "", toolTurn)
|
||||
return true
|
||||
}
|
||||
150
core/http/endpoints/openai/realtime_stream_test.go
Normal file
150
core/http/endpoints/openai/realtime_stream_test.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/pkg/grpc/proto"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// transcriptStreamer turns streamed LLM tokens into incremental transcript
|
||||
// deltas, stripping reasoning. Audio is synthesized once from the full message
|
||||
// by the caller, so there is no per-sentence segmentation.
|
||||
var _ = Describe("transcriptStreamer", func() {
|
||||
It("emits one transcript delta per content token", func() {
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{})
|
||||
|
||||
for _, tok := range []string{"Hello", " world.", " Bye"} {
|
||||
s.onToken(tok)
|
||||
}
|
||||
|
||||
Expect(s.content()).To(Equal("Hello world. Bye"))
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello world. Bye"))
|
||||
})
|
||||
|
||||
It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() {
|
||||
// disable_thinking maps to DisableReasoning=true (enable_thinking=false to
|
||||
// the backend). If the model emits thinking anyway, the transcript must
|
||||
// still not leak it: stripping always runs for spoken output.
|
||||
disable := true
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "",
|
||||
reasoning.Config{DisableReasoning: &disable})
|
||||
|
||||
s.onToken("<think>secret plan</think>")
|
||||
s.onToken("The answer is 42.")
|
||||
|
||||
Expect(s.content()).To(Equal("The answer is 42."))
|
||||
Expect(s.content()).ToNot(ContainSubstring("secret plan"))
|
||||
Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan"))
|
||||
})
|
||||
})
|
||||
|
||||
// streamLLMResponse drives a full streamed realtime turn: live transcript
|
||||
// deltas while the LLM generates, then the whole message is synthesized once.
|
||||
var _ = Describe("streamLLMResponse", func() {
|
||||
It("streams transcript deltas then synthesizes the whole message once", func() {
|
||||
on := true
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{"Hello", " world.", " How are you?"},
|
||||
predictResp: backend.LLMResponse{Response: "Hello world. How are you?"},
|
||||
ttsStreamChunks: [][]byte{{9}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// One live transcript delta per streamed token.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
|
||||
// The whole message is synthesized ONCE (not per sentence): a single
|
||||
// emitSpeech replays the one TTS stream chunk.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1))
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?"))
|
||||
})
|
||||
|
||||
It("streams content deltas and emits tool-call items (autoparser tool turn)", func() {
|
||||
on := true
|
||||
// Autoparser path: reply.Message is empty; content + tool calls arrive via
|
||||
// ChatDeltas. Chunk 1 carries content, chunk 2 carries the tool call.
|
||||
contentDelta := []*proto.ChatDelta{{Content: "Let me check."}}
|
||||
toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Paris"}`}}}}
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{"", ""},
|
||||
predictChunkDeltas: [][]*proto.ChatDelta{contentDelta, toolDelta},
|
||||
predictResp: backend.LLMResponse{ChatDeltas: append(append([]*proto.ChatDelta{}, contentDelta...), toolDelta...)},
|
||||
ttsStreamChunks: [][]byte{{9}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
llmCfg.TemplateConfig.UseTokenizerTemplate = true
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// The spoken content was streamed live.
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Let me check."))
|
||||
// The tool call is emitted as a function_call item.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1))
|
||||
// Exactly one terminal response.done.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
|
||||
})
|
||||
|
||||
It("emits only tool-call items for a content-less tool turn (no empty assistant item)", func() {
|
||||
on := true
|
||||
toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Rome"}`}}}}
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{""},
|
||||
predictChunkDeltas: [][]*proto.ChatDelta{toolDelta},
|
||||
predictResp: backend.LLMResponse{ChatDeltas: toolDelta},
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
llmCfg.TemplateConfig.UseTokenizerTemplate = true
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// No content → no transcript deltas and no spurious assistant content item.
|
||||
Expect(t.transcriptDeltaText()).To(Equal(""))
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(0))
|
||||
// The tool call is still emitted.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1))
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
|
||||
})
|
||||
})
|
||||
33
core/http/endpoints/openai/realtime_thinking.go
Normal file
33
core/http/endpoints/openai/realtime_thinking.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// applyPipelineThinking forces the LLM's reasoning/thinking off when the realtime
|
||||
// pipeline sets disable_thinking, mapping to the enable_thinking=false backend
|
||||
// metadata via ReasoningConfig.DisableReasoning. The LLM config passed in is the
|
||||
// per-session copy returned by the config loader, so this does not affect other
|
||||
// users of the same model. When the pipeline does not set disable_thinking the
|
||||
// LLM config is left untouched.
|
||||
func applyPipelineThinking(llm *config.ModelConfig, pipeline config.Pipeline) {
|
||||
if llm == nil || !pipeline.ThinkingDisabled() {
|
||||
return
|
||||
}
|
||||
disable := true
|
||||
llm.ReasoningConfig.DisableReasoning = &disable
|
||||
}
|
||||
|
||||
// spokenReasoningConfig adapts a model's reasoning config for stripping reasoning
|
||||
// OUT of realtime spoken output. ReasoningConfig.DisableReasoning is overloaded:
|
||||
// the backend reads it as the "enable_thinking=false" hint (which pipeline
|
||||
// disable_thinking sets via applyPipelineThinking), but the reasoning extractor
|
||||
// reads it as "skip stripping, assume there is no reasoning". Honouring the latter
|
||||
// when extracting for speech would leak raw <think>…</think> whenever the model
|
||||
// ignores the suppression hint. Spoken output must never contain reasoning, so we
|
||||
// always strip: clear DisableReasoning while keeping custom tokens/tag pairs.
|
||||
func spokenReasoningConfig(cfg reasoning.Config) reasoning.Config {
|
||||
cfg.DisableReasoning = nil
|
||||
return cfg
|
||||
}
|
||||
50
core/http/endpoints/openai/realtime_thinking_test.go
Normal file
50
core/http/endpoints/openai/realtime_thinking_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// applyPipelineThinking lets a realtime pipeline force the LLM's thinking off
|
||||
// (enable_thinking=false metadata) without editing the LLM model config.
|
||||
var _ = Describe("applyPipelineThinking", func() {
|
||||
It("disables reasoning on the LLM config when the pipeline disables thinking", func() {
|
||||
disable := true
|
||||
llm := &config.ModelConfig{}
|
||||
applyPipelineThinking(llm, config.Pipeline{DisableThinking: &disable})
|
||||
Expect(llm.ReasoningConfig.DisableReasoning).ToNot(BeNil())
|
||||
Expect(*llm.ReasoningConfig.DisableReasoning).To(BeTrue())
|
||||
})
|
||||
|
||||
It("leaves the LLM config untouched when the pipeline does not set disable_thinking", func() {
|
||||
llm := &config.ModelConfig{}
|
||||
applyPipelineThinking(llm, config.Pipeline{})
|
||||
Expect(llm.ReasoningConfig.DisableReasoning).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
// spokenReasoningConfig clears DisableReasoning so realtime spoken output always
|
||||
// strips reasoning, even though disable_thinking sets DisableReasoning=true on the
|
||||
// LLM config (which the backend reads as enable_thinking=false).
|
||||
var _ = Describe("spokenReasoningConfig", func() {
|
||||
It("clears DisableReasoning so the extractor still strips leaked reasoning", func() {
|
||||
disable := true
|
||||
out := spokenReasoningConfig(reasoning.Config{DisableReasoning: &disable})
|
||||
Expect(out.DisableReasoning).To(BeNil())
|
||||
})
|
||||
|
||||
It("preserves the other reasoning settings", func() {
|
||||
disable := true
|
||||
out := spokenReasoningConfig(reasoning.Config{
|
||||
DisableReasoning: &disable,
|
||||
ThinkingStartTokens: []string{"<reason>"},
|
||||
TagPairs: []reasoning.TagPair{{Start: "<reason>", End: "</reason>"}},
|
||||
})
|
||||
Expect(out.ThinkingStartTokens).To(Equal([]string{"<reason>"}))
|
||||
Expect(out.TagPairs).To(HaveLen(1))
|
||||
Expect(out.TagPairs[0].Start).To(Equal("<reason>"))
|
||||
})
|
||||
})
|
||||
63
core/http/endpoints/openai/realtime_transcription.go
Normal file
63
core/http/endpoints/openai/realtime_transcription.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
)
|
||||
|
||||
// emitTranscription transcribes a committed utterance and emits the transcription
|
||||
// events for it, returning the final transcript text. With
|
||||
// pipeline.streaming.transcription enabled it streams each transcript fragment as
|
||||
// a conversation.item.input_audio_transcription.delta as the backend produces it,
|
||||
// then a completed event; otherwise it transcribes the whole utterance and emits
|
||||
// a single completed event. delta and completed events share itemID.
|
||||
func emitTranscription(ctx context.Context, t Transport, session *Session, itemID, audioPath string) (string, error) {
|
||||
cfg := session.InputAudioTranscription
|
||||
|
||||
if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTranscription() {
|
||||
final, err := session.ModelInterface.TranscribeStream(ctx, audioPath, cfg.Language, false, false, cfg.Prompt, func(delta string) {
|
||||
_ = t.SendEvent(types.ConversationItemInputAudioTranscriptionDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
|
||||
ItemID: itemID,
|
||||
ContentIndex: 0,
|
||||
Delta: delta,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
transcript := ""
|
||||
if final != nil {
|
||||
transcript = final.Text
|
||||
}
|
||||
if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{
|
||||
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
|
||||
ItemID: itemID,
|
||||
ContentIndex: 0,
|
||||
Transcript: transcript,
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return transcript, nil
|
||||
}
|
||||
|
||||
// Unary fallback: transcribe the whole utterance, emit one completed event.
|
||||
tr, err := session.ModelInterface.Transcribe(ctx, audioPath, cfg.Language, false, false, cfg.Prompt)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if tr == nil {
|
||||
return "", fmt.Errorf("transcribe result is nil")
|
||||
}
|
||||
if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{
|
||||
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
|
||||
ItemID: itemID,
|
||||
ContentIndex: 0,
|
||||
Transcript: tr.Text,
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return tr.Text, nil
|
||||
}
|
||||
54
core/http/endpoints/openai/realtime_transcription_test.go
Normal file
54
core/http/endpoints/openai/realtime_transcription_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
)
|
||||
|
||||
// emitTranscription transcribes a committed utterance, streaming transcript text
|
||||
// deltas when the pipeline opts in, and returns the final transcript text.
|
||||
var _ = Describe("emitTranscription", func() {
|
||||
It("streams transcription deltas then a completed event when streaming is enabled", func() {
|
||||
on := true
|
||||
session := &Session{
|
||||
InputAudioTranscription: &types.AudioTranscription{},
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{Transcription: &on}},
|
||||
},
|
||||
ModelInterface: &fakeModel{
|
||||
transcribeDeltas: []string{"Hel", "lo", " world"},
|
||||
transcribeFinal: &schema.TranscriptionResult{Text: "Hello world"},
|
||||
},
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(transcript).To(Equal("Hello world"))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(3))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1))
|
||||
})
|
||||
|
||||
It("emits a single completed event with no deltas in unary mode", func() {
|
||||
session := &Session{
|
||||
InputAudioTranscription: &types.AudioTranscription{},
|
||||
ModelConfig: &config.ModelConfig{}, // streaming off
|
||||
ModelInterface: &fakeModel{transcribeFinal: &schema.TranscriptionResult{Text: "Hi"}},
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(transcript).To(Equal("Hi"))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(0))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1))
|
||||
})
|
||||
})
|
||||
@@ -31,6 +31,41 @@ This configuration links the following components:
|
||||
|
||||
Make sure all referenced models (`silero-vad-ggml`, `whisper-large-turbo`, `qwen3-4b`, `tts-1`) are also installed or defined in your LocalAI instance.
|
||||
|
||||
### Streaming the pipeline
|
||||
|
||||
By default each stage runs to completion before the next begins: the whole utterance is transcribed, the full LLM reply is generated, then it is synthesized. Each stage can instead be streamed incrementally, which lowers the time-to-first-audio of a turn:
|
||||
|
||||
```yaml
|
||||
name: gpt-realtime
|
||||
pipeline:
|
||||
vad: silero-vad-ggml
|
||||
transcription: whisper-large-turbo
|
||||
llm: qwen3-4b
|
||||
tts: tts-1
|
||||
streaming:
|
||||
llm: true # stream LLM tokens as transcript deltas
|
||||
tts: true # emit audio deltas per synthesized chunk
|
||||
transcription: true # stream transcript text deltas of the user's speech
|
||||
```
|
||||
|
||||
- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces (requires a backend that supports streaming synthesis), instead of one delta for the whole utterance. Falls back to a single unary delta otherwise.
|
||||
- **streaming.transcription**: stream `conversation.item.input_audio_transcription.delta` events as the transcript is produced (requires a transcription backend that supports streaming).
|
||||
- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events. The full reply is buffered and synthesized once it is complete — streamed as audio chunks when `streaming.tts` is enabled (and the TTS backend supports it), otherwise as a single unary delta. Reasoning/thinking is always stripped from the spoken transcript. Tool calls are supported while streaming when the LLM uses its tokenizer template (`use_tokenizer_template: true`): the backend's autoparser then delivers content and tool calls separately, so the spoken transcript never leaks tool-call tokens. Grammar-based function calling keeps the buffered path.
|
||||
|
||||
All streaming flags are off by default, so existing pipelines are unaffected.
|
||||
|
||||
### Disabling thinking
|
||||
|
||||
For reasoning models, you can force the pipeline LLM's thinking off without editing the LLM model config:
|
||||
|
||||
```yaml
|
||||
pipeline:
|
||||
llm: qwen3-4b
|
||||
disable_thinking: true # maps to enable_thinking=false for the realtime LLM
|
||||
```
|
||||
|
||||
This is applied only to the realtime session's copy of the LLM config, so it does not affect other users of the same model. Leave it unset to use the LLM model config's own reasoning settings.
|
||||
|
||||
## Transports
|
||||
|
||||
The Realtime API supports two transports: **WebSocket** and **WebRTC**.
|
||||
|
||||
Reference in New Issue
Block a user