From 98ed541b228b73bcab17a332ffc4b36ec80dac6a Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:23:04 +0000 Subject: [PATCH] feat(realtime): streaming transcription text deltas Add emitTranscription and route commitUtterance through it. With pipeline.streaming.transcription set it streams each transcript fragment as a conversation.item.input_audio_transcription.delta via TranscribeStream then a completed event; otherwise it preserves the single completed-event unary behaviour. Returns the final transcript for response generation. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 22 ++----- .../openai/realtime_transcription.go | 63 +++++++++++++++++++ .../openai/realtime_transcription_test.go | 54 ++++++++++++++++ 3 files changed, 122 insertions(+), 17 deletions(-) create mode 100644 core/http/endpoints/openai/realtime_transcription.go create mode 100644 core/http/endpoints/openai/realtime_transcription_test.go diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index bbb11a681..b14f64b4d 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1260,27 +1260,15 @@ func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Co // TODO: If we have a real any-to-any model then transcription is optional var transcript string if session.InputAudioTranscription != nil { - tr, err := session.ModelInterface.Transcribe(ctx, f.Name(), session.InputAudioTranscription.Language, false, false, session.InputAudioTranscription.Prompt) + // emitTranscription streams transcript deltas when + // pipeline.streaming.transcription is set, otherwise emits a single + // completed event; either way it returns the final transcript text. + var err error + transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name()) if err != nil { sendError(t, "transcription_failed", err.Error(), "", "event_TODO") return - } else if tr == nil { - sendError(t, "transcription_failed", "trancribe result is nil", "", "event_TODO") - return } - - transcript = tr.Text - sendEvent(t, types.ConversationItemInputAudioTranscriptionCompletedEvent{ - ServerEventBase: types.ServerEventBase{ - EventID: "event_TODO", - }, - - ItemID: generateItemID(), - // ResponseID: "resp_TODO", // Not needed for transcription completed event - // OutputIndex: 0, - ContentIndex: 0, - Transcript: transcript, - }) } else { sendNotImplemented(t, "any-to-any models") return diff --git a/core/http/endpoints/openai/realtime_transcription.go b/core/http/endpoints/openai/realtime_transcription.go new file mode 100644 index 000000000..44456101c --- /dev/null +++ b/core/http/endpoints/openai/realtime_transcription.go @@ -0,0 +1,63 @@ +package openai + +import ( + "context" + "fmt" + + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" +) + +// emitTranscription transcribes a committed utterance and emits the transcription +// events for it, returning the final transcript text. With +// pipeline.streaming.transcription enabled it streams each transcript fragment as +// a conversation.item.input_audio_transcription.delta as the backend produces it, +// then a completed event; otherwise it transcribes the whole utterance and emits +// a single completed event. delta and completed events share itemID. +func emitTranscription(ctx context.Context, t Transport, session *Session, itemID, audioPath string) (string, error) { + cfg := session.InputAudioTranscription + + if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTranscription() { + final, err := session.ModelInterface.TranscribeStream(ctx, audioPath, cfg.Language, false, false, cfg.Prompt, func(delta string) { + _ = t.SendEvent(types.ConversationItemInputAudioTranscriptionDeltaEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Delta: delta, + }) + }) + if err != nil { + return "", err + } + transcript := "" + if final != nil { + transcript = final.Text + } + if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Transcript: transcript, + }); err != nil { + return "", err + } + return transcript, nil + } + + // Unary fallback: transcribe the whole utterance, emit one completed event. + tr, err := session.ModelInterface.Transcribe(ctx, audioPath, cfg.Language, false, false, cfg.Prompt) + if err != nil { + return "", err + } + if tr == nil { + return "", fmt.Errorf("transcribe result is nil") + } + if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Transcript: tr.Text, + }); err != nil { + return "", err + } + return tr.Text, nil +} diff --git a/core/http/endpoints/openai/realtime_transcription_test.go b/core/http/endpoints/openai/realtime_transcription_test.go new file mode 100644 index 000000000..f3f760fd8 --- /dev/null +++ b/core/http/endpoints/openai/realtime_transcription_test.go @@ -0,0 +1,54 @@ +package openai + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" +) + +// emitTranscription transcribes a committed utterance, streaming transcript text +// deltas when the pipeline opts in, and returns the final transcript text. +var _ = Describe("emitTranscription", func() { + It("streams transcription deltas then a completed event when streaming is enabled", func() { + on := true + session := &Session{ + InputAudioTranscription: &types.AudioTranscription{}, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{Transcription: &on}}, + }, + ModelInterface: &fakeModel{ + transcribeDeltas: []string{"Hel", "lo", " world"}, + transcribeFinal: &schema.TranscriptionResult{Text: "Hello world"}, + }, + } + t := &fakeTransport{} + + transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav") + + Expect(err).ToNot(HaveOccurred()) + Expect(transcript).To(Equal("Hello world")) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(3)) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1)) + }) + + It("emits a single completed event with no deltas in unary mode", func() { + session := &Session{ + InputAudioTranscription: &types.AudioTranscription{}, + ModelConfig: &config.ModelConfig{}, // streaming off + ModelInterface: &fakeModel{transcribeFinal: &schema.TranscriptionResult{Text: "Hi"}}, + } + t := &fakeTransport{} + + transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav") + + Expect(err).ToNot(HaveOccurred()) + Expect(transcript).To(Equal("Hi")) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(0)) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1)) + }) +})