feat(whisper-cpp): implement streaming transcription (#9751)

* test(whisper): wire e2e streaming transcription target

Adds test-extra-backend-whisper-transcription, mirroring the existing
llama-cpp / sherpa-onnx / vibevoice-cpp targets. The generic
AudioTranscriptionStream spec at tests/e2e-backends/backend_test.go:644
fails today because backend/go/whisper has no streaming impl - this
target is the failing TDD gate that the next phase makes pass.

Confirmed RED locally: 3 Passed (health, load, offline transcription),
1 Failed (streaming spec hits its 300s context deadline because the
base implementation returns 'unimplemented' but doesn't close the
result channel, leaving the gRPC stream open until the client times
out).

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(whisper-cpp): expose new_segment_callback to the Go side

Adds set_new_segment_callback() and a C-side trampoline that whisper.cpp
invokes once per new text segment during whisper_full(). The trampoline
dispatches (idx_first, n_new, user_data) to a Go function pointer
registered via purego.NewCallback - text and timings are pulled by Go
through the existing get_segment_text/get_segment_t0/get_segment_t1
getters.

Wires the hook only when streaming is actually requested, to avoid a
per-segment function-pointer dispatch on the offline path.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(whisper-cpp): implement AudioTranscriptionStream

Wires whisper.cpp's new_segment_callback through purego back to Go so
the streaming transcription RPC produces real, time-correlated deltas
while whisper_full() is still decoding. Each segment becomes one
TranscriptStreamResponse{Delta}; whisper_full's return is the
TranscriptStreamResponse{FinalResult} carrying the full segment list,
language, and duration.

Per-call state is tracked in a sync.Map keyed by an atomic counter; the
Go callback registered via purego.NewCallback is a singleton, dispatched
through user_data. SingleThread today means only one entry is ever live,
but the map shape matches the sherpa-onnx TTS callback pattern.

The streaming path's final.Text is the literal concat of every emitted
delta (a strings.Builder accumulated by onNewSegment) so the e2e
invariant `final.Text == concat(deltas)` holds exactly. The first delta
has no leading space; subsequent deltas are space-prefixed. The offline
AudioTranscription path is unchanged.

Closes the gap with sherpa-onnx, vibevoice-cpp, llama-cpp, and tinygrad,
which already implement AudioTranscriptionStream.

Verified GREEN locally: make test-extra-backend-whisper-transcription
passes 4/4 specs (3 Passed initially under RED, +1 streaming spec now).

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(whisper-cpp): assert progressive multi-segment streaming

Drives AudioTranscriptionStream against a real long-audio fixture and
asserts len(deltas) >= 2. The generic e2e spec at
tests/e2e-backends/backend_test.go:644 only checks len(deltas) >= 1
which is satisfied by both real and faked streaming - this spec is the
guardrail that a future "fake" impl can't sneak past.

Skipped by default (env-gated, like the cancellation spec); set
WHISPER_LIBRARY, WHISPER_MODEL_PATH, and WHISPER_AUDIO_PATH to a 30+
second clip to run.

Verified locally with a 55s 5x-JFK concat against ggml-base.en.bin:
1 Passed in 7.3s, deltas >= 2, finalSegmentCount >= 2,
concat(deltas) == final.Text.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* ci(whisper-cpp): add transcription gRPC e2e job

Mirrors tests-sherpa-onnx-grpc-transcription /
tests-llama-cpp-grpc-transcription. Runs make
test-extra-backend-whisper-transcription whenever the whisper backend
or the run-all switch fires, so a pin-bump or refactor that breaks
streaming transcription gets caught before merge.

The whisper output on detect-changes is already emitted by
scripts/changed-backends.js (it iterates allBackendPaths); this PR
just exposes it as a workflow output and consumes it.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(whisper-cpp): silence errcheck on AudioTranscriptionStream defers

golangci-lint runs with new-from-merge-base=origin/master, so the
identical defer patterns in the existing offline AudioTranscription
path are grandfathered while the new ones in AudioTranscriptionStream
trip errcheck. Wrap both defers in `func() { _ = ... }()` to match what
errcheck wants without altering behavior. The errors from os.RemoveAll
and *os.File.Close are not actionable inside a defer here (we're
already returning), matching the offline path's contract.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-05-10 23:11:46 +02:00
committed by GitHub
parent 4715a68660
commit 19d59102d5
7 changed files with 338 additions and 0 deletions

View File

@@ -43,6 +43,7 @@ jobs:
insightface: ${{ steps.detect.outputs.insightface }}
speaker-recognition: ${{ steps.detect.outputs.speaker-recognition }}
sherpa-onnx: ${{ steps.detect.outputs.sherpa-onnx }}
whisper: ${{ steps.detect.outputs.whisper }}
steps:
- name: Checkout repository
uses: actions/checkout@v6
@@ -583,6 +584,27 @@ jobs:
- name: Build sherpa-onnx backend image and run streaming ASR gRPC e2e tests
run: |
make test-extra-backend-sherpa-onnx-transcription
# End-to-end transcription via the e2e-backends gRPC harness against
# the whisper.cpp backend. Drives AudioTranscription (offline) and
# AudioTranscriptionStream (real, segment-callback-driven deltas) on
# ggml-base.en + the JFK 11s clip.
tests-whisper-grpc-transcription:
needs: detect-changes
if: needs.detect-changes.outputs.whisper == 'true' || needs.detect-changes.outputs.run-all == 'true'
runs-on: ubuntu-latest
timeout-minutes: 90
steps:
- name: Clone
uses: actions/checkout@v6
with:
submodules: true
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.25.4'
- name: Build whisper backend image and run transcription gRPC e2e tests
run: |
make test-extra-backend-whisper-transcription
# VITS TTS via the sherpa-onnx backend. Drives both TTS (file write) and
# TTSStream (PCM chunks) on the e2e-backends harness.
tests-sherpa-onnx-grpc-tts:

View File

@@ -897,6 +897,18 @@ test-extra-backend-vibevoice-cpp-transcription: docker-build-vibevoice-cpp
BACKEND_TEST_CAPS=health,load,transcription \
$(MAKE) test-extra-backend
## Audio transcription wrapper for the whisper.cpp backend.
## Drives the AudioTranscription / AudioTranscriptionStream RPCs against
## ggml-base.en (~145 MB) using the JFK 11s clip. The streaming spec
## asserts len(deltas) >= 1 and concat(deltas) == final.Text - whisper-
## specific multi-segment assertions live in backend/go/whisper/gowhisper_test.go.
test-extra-backend-whisper-transcription: docker-build-whisper
BACKEND_IMAGE=local-ai-backend:whisper \
BACKEND_TEST_MODEL_URL=https://huggingface.co/ggerganov/whisper.cpp/resolve/main/ggml-base.en.bin \
BACKEND_TEST_AUDIO_URL=https://github.com/ggml-org/whisper.cpp/raw/master/samples/jfk.wav \
BACKEND_TEST_CAPS=health,load,transcription \
$(MAKE) test-extra-backend
## LocalVQE audio transform (joint AEC + noise suppression + dereverb).
## Exercises the audio_transform capability end-to-end: batch transform
## of a real WAV fixture and bidi streaming of synthetic silent frames.

View File

@@ -10,14 +10,38 @@ static std::vector<float> flat_segs;
static std::atomic<int> g_abort{0};
static std::atomic<uintptr_t> g_go_new_segment_cb{0};
static std::atomic<uintptr_t> g_go_new_segment_user_data{0};
static bool abort_cb(void * /*user_data*/) {
return g_abort.load(std::memory_order_relaxed) != 0;
}
static void new_segment_cb(struct whisper_context *cb_ctx,
struct whisper_state * /*state*/, int n_new,
void * /*user_data*/) {
uintptr_t go_cb = g_go_new_segment_cb.load(std::memory_order_relaxed);
if (go_cb == 0) {
return;
}
int total = whisper_full_n_segments(cb_ctx);
int idx_first = total - n_new;
if (idx_first < 0) {
idx_first = 0;
}
uintptr_t ud = g_go_new_segment_user_data.load(std::memory_order_relaxed);
reinterpret_cast<go_new_segment_cb>(go_cb)(idx_first, n_new, ud);
}
extern "C" void set_abort(int v) {
g_abort.store(v, std::memory_order_relaxed);
}
extern "C" void set_new_segment_callback(uintptr_t cb_ptr, uintptr_t user_data) {
g_go_new_segment_cb.store(cb_ptr, std::memory_order_relaxed);
g_go_new_segment_user_data.store(user_data, std::memory_order_relaxed);
}
static void ggml_log_cb(enum ggml_log_level level, const char *log,
void *data) {
const char *level_str;
@@ -139,6 +163,14 @@ int transcribe(uint32_t threads, char *lang, bool translate, bool tdrz,
// ggml abort hook so a subsequent set_abort(1) from Go aborts the next
// compute graph step.
g_abort.store(0, std::memory_order_relaxed);
// Only install the new-segment callback when streaming is requested
// (Go side calls set_new_segment_callback before transcribe()). Leaving
// it always-on is harmless but adds a function-pointer dispatch per
// segment for the offline path.
if (g_go_new_segment_cb.load(std::memory_order_relaxed) != 0) {
wparams.new_segment_callback = new_segment_cb;
wparams.new_segment_callback_user_data = nullptr;
}
wparams.abort_callback = abort_cb;
wparams.abort_callback_user_data = nullptr;

View File

@@ -16,4 +16,15 @@ int n_tokens(int i);
int32_t get_token_id(int i, int j);
bool get_segment_speaker_turn_next(int i);
void set_abort(int v);
// Function pointer from Go (returned by purego.NewCallback). Invoked once
// per new-segment event during whisper_full(). The callback runs on the
// decode thread - if Go blocks (slow gRPC consumer), the decode blocks
// too. That is the intended backpressure path.
typedef void (*go_new_segment_cb)(int idx_first, int n_new, uintptr_t user_data);
// Install the callback used by the next transcribe() call. Pass cb=0 to
// clear. user_data is opaque to C; the Go side uses it to look up
// per-call state.
void set_new_segment_callback(uintptr_t cb_ptr, uintptr_t user_data);
}

View File

@@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/go-audio/wav"
@@ -29,8 +30,83 @@ var (
CppGetTokenID func(i int, j int) int
CppGetSegmentSpeakerTurnNext func(i int) bool
CppSetAbort func(v int)
// Set by main.go via purego.RegisterLibFunc. Installs (or clears with cb=0)
// the C-side trampoline that whisper.cpp invokes per new segment.
CppSetNewSegmentCallback func(cbPtr uintptr, userData uintptr)
)
// streamCallStates maps per-AudioTranscriptionStream call IDs to the
// state the Go callback needs to emit deltas. Only one entry is ever
// live today (base.SingleThread), but the map shape mirrors
// sherpa-onnx's TTS callback registry and survives a future SingleThread
// removal without a contract change.
var (
streamCallStates sync.Map // uint64 -> *streamCallState
streamCallSeq atomic.Uint64
goNewSegmentCb uintptr // purego.NewCallback(onNewSegment) result; set in main.go at boot
)
type streamCallState struct {
results chan *pb.TranscriptStreamResponse
diarize bool
// nextIdx tracks how many segments we've already emitted. The C
// trampoline passes idx_first = total - n_new, but we walk from
// nextIdx to (idx_first + n_new) defensively in case whisper.cpp ever
// coalesces multiple commits into a single callback invocation.
nextIdx int
// assembled mirrors the literal concat of every Delta sent on results.
// We reuse it as the final TranscriptResult.Text so the e2e
// invariant `final.Text == concat(deltas)` holds exactly. Written from
// the cgo decode thread inside onNewSegment and read by the streaming
// method after CppTranscribe returns; the cgo boundary provides the
// happens-before edge.
assembled strings.Builder
}
// onNewSegment is the Go side of the C trampoline declared in
// gowhisper.cpp:new_segment_cb. Whisper.cpp invokes it once per
// new-segment event during whisper_full(). Reads segment text via the
// existing CppGetSegment* getters (safe to call against the singleton
// ctx; whisper.cpp is the only writer and it has already published the
// segments by the time this fires).
//
// Sends deltas synchronously: if the channel is full, this blocks the
// whisper decode thread. That's the intended backpressure path -
// dropping deltas would break the concat(deltas) == final.Text invariant
// the e2e suite asserts.
func onNewSegment(idxFirst int32, nNew int32, userData uintptr) {
v, ok := streamCallStates.Load(uint64(userData))
if !ok {
return // call already torn down (race with cancel + cb fire)
}
state := v.(*streamCallState)
end := int(idxFirst) + int(nNew)
for i := state.nextIdx; i < end; i++ {
txt := strings.ToValidUTF8(strings.Clone(CppGetSegmentText(i)), "<22>")
txt = strings.TrimSpace(txt)
if state.diarize && CppGetSegmentSpeakerTurnNext(i) {
txt += " [SPEAKER_TURN]"
}
if txt == "" {
state.nextIdx = i + 1
continue
}
// Prefix subsequent deltas with a single space so the assembled
// stream reads as one space-joined transcript. The first delta has
// no leading space, otherwise concat(deltas) would not match
// final.Text and the e2e invariant would break.
var delta string
if state.assembled.Len() == 0 {
delta = txt
} else {
delta = " " + txt
}
state.results <- &pb.TranscriptStreamResponse{Delta: delta}
state.assembled.WriteString(delta)
state.nextIdx = i + 1
}
}
type Whisper struct {
base.SingleThread
}
@@ -200,3 +276,120 @@ func (w *Whisper) AudioTranscription(ctx context.Context, opts *pb.TranscriptReq
Duration: duration,
}, nil
}
// AudioTranscriptionStream runs whisper_full() and emits deltas via
// whisper.cpp's new_segment_callback as segments are decoded, then a
// final TranscriptResult. The offline AudioTranscription is unchanged;
// both paths share whisper's single-instance ctx and the SingleThread
// concurrency model.
func (w *Whisper) AudioTranscriptionStream(ctx context.Context, opts *pb.TranscriptRequest, results chan *pb.TranscriptStreamResponse) error {
defer close(results)
if err := ctx.Err(); err != nil {
return status.Error(codes.Canceled, "transcription cancelled")
}
dir, err := os.MkdirTemp("", "whisper")
if err != nil {
return err
}
defer func() { _ = os.RemoveAll(dir) }()
convertedPath := filepath.Join(dir, "converted.wav")
if err := utils.AudioToWav(opts.Dst, convertedPath); err != nil {
return err
}
fh, err := os.Open(convertedPath)
if err != nil {
return err
}
defer func() { _ = fh.Close() }()
d := wav.NewDecoder(fh)
buf, err := d.FullPCMBuffer()
if err != nil {
return err
}
data := buf.AsFloat32Buffer().Data
var duration float32
if buf.Format != nil && buf.Format.SampleRate > 0 {
duration = float32(len(data)) / float32(buf.Format.SampleRate)
}
// Register per-call state and install the C-side callback. defer
// teardown so even a panic clears the C pointer (otherwise a stale
// callback fires on the next AudioTranscription call).
callID := streamCallSeq.Add(1)
state := &streamCallState{
results: results,
diarize: opts.Diarize,
}
streamCallStates.Store(callID, state)
CppSetNewSegmentCallback(goNewSegmentCb, uintptr(callID))
defer func() {
CppSetNewSegmentCallback(0, 0)
streamCallStates.Delete(callID)
}()
// Same abort-watcher pattern as AudioTranscription. Joined synchronously
// so a late CppSetAbort(1) cannot fire after this function returns.
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
CppSetAbort(1)
case <-done:
}
}()
defer func() {
close(done)
wg.Wait()
}()
segsLen := uintptr(0xdeadbeef)
segsLenPtr := unsafe.Pointer(&segsLen)
ret := CppTranscribe(opts.Threads, opts.Language, opts.Translate, opts.Diarize, data, uintptr(len(data)), segsLenPtr, opts.Prompt)
if ret == 2 {
return status.Error(codes.Canceled, "transcription cancelled")
}
if ret != 0 {
return fmt.Errorf("Failed Transcribe")
}
// Build the final TranscriptResult. Segments[] mirrors the offline
// path so the SSE done event carries the same per-segment shape.
// final.Text reuses the assembled stream so concat(deltas) == final.Text
// holds exactly, matching the e2e contract.
segments := []*pb.TranscriptSegment{}
for i := range int(segsLen) {
s := CppGetSegmentStart(i) * 10000000
t := CppGetSegmentEnd(i) * 10000000
txt := strings.ToValidUTF8(strings.Clone(CppGetSegmentText(i)), "<22>")
tokens := make([]int32, CppNTokens(i))
if opts.Diarize && CppGetSegmentSpeakerTurnNext(i) {
txt += " [SPEAKER_TURN]"
}
for j := range tokens {
tokens[j] = int32(CppGetTokenID(i, j))
}
segments = append(segments, &pb.TranscriptSegment{
Id: int32(i),
Text: txt,
Start: s, End: t,
Tokens: tokens,
})
}
final := &pb.TranscriptResult{
Segments: segments,
Text: state.assembled.String(),
Language: opts.Language,
Duration: duration,
}
results <- &pb.TranscriptStreamResponse{FinalResult: final}
return nil
}

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"os"
"strings"
"sync"
"testing"
"time"
@@ -52,6 +53,7 @@ func ensureLibLoaded() {
purego.RegisterLibFunc(&CppGetTokenID, gosd, "get_token_id")
purego.RegisterLibFunc(&CppGetSegmentSpeakerTurnNext, gosd, "get_segment_speaker_turn_next")
purego.RegisterLibFunc(&CppSetAbort, gosd, "set_abort")
purego.RegisterLibFunc(&CppSetNewSegmentCallback, gosd, "set_new_segment_callback")
})
if libLoadErr != nil {
Skip("whisper library not loadable: " + libLoadErr.Error())
@@ -109,4 +111,64 @@ var _ = Describe("Whisper", func() {
Expect(res.Text).ToNot(BeEmpty(), "post-cancel transcription returned empty text")
})
})
Context("AudioTranscriptionStream", func() {
It("emits multiple deltas progressively for a multi-segment clip", func() {
modelPath, audioPath := fixturesOrSkip()
ensureLibLoaded()
// The streaming method dispatches through the package-level
// goNewSegmentCb. main.go normally builds it; in this test
// process main() is never called, so build it here lazily.
// purego.NewCallback returns a stable pointer; calling it once
// per process is correct.
if goNewSegmentCb == 0 {
goNewSegmentCb = purego.NewCallback(onNewSegment)
}
w := &Whisper{}
Expect(w.Load(&pb.ModelOptions{ModelFile: modelPath})).To(Succeed())
results := make(chan *pb.TranscriptStreamResponse, 64)
done := make(chan error, 1)
go func() {
done <- w.AudioTranscriptionStream(context.Background(), &pb.TranscriptRequest{
Dst: audioPath,
Threads: 4,
Language: "en",
Stream: true,
}, results)
}()
var deltas []string
var assembled strings.Builder
var finalText string
var finalSegmentCount int
for chunk := range results {
if d := chunk.GetDelta(); d != "" {
deltas = append(deltas, d)
assembled.WriteString(d)
}
if final := chunk.GetFinalResult(); final != nil {
finalText = final.GetText()
finalSegmentCount = len(final.GetSegments())
}
}
Expect(<-done).ToNot(HaveOccurred())
// The whisper-specific bar: real streaming via new_segment_callback
// fires once per decoded segment, so a multi-segment clip MUST
// produce >=2 delta events. A faked-streaming impl (run
// whisper_full to completion, then walk the segment list) would
// also pass len(deltas) >= 1, which is why the generic e2e spec
// is not strict enough.
Expect(len(deltas)).To(BeNumerically(">=", 2),
"expected multiple deltas from a multi-segment clip, got %d (assembled=%q)",
len(deltas), assembled.String())
Expect(finalSegmentCount).To(BeNumerically(">=", 2),
"expected final to carry multiple segments")
Expect(assembled.String()).To(Equal(finalText),
"concat(deltas) must equal final.Text")
})
})
})

View File

@@ -42,12 +42,18 @@ func main() {
{&CppGetTokenID, "get_token_id"},
{&CppGetSegmentSpeakerTurnNext, "get_segment_speaker_turn_next"},
{&CppSetAbort, "set_abort"},
{&CppSetNewSegmentCallback, "set_new_segment_callback"},
}
for _, lf := range libFuncs {
purego.RegisterLibFunc(lf.FuncPtr, gosd, lf.Name)
}
// Build a stable C-callable function pointer from the Go callback. The
// pointer lives for the lifetime of the process; per-call dispatch is
// keyed by user_data through streamCallStates.
goNewSegmentCb = purego.NewCallback(onNewSegment)
flag.Parse()
if err := grpc.StartServer(*addr, &Whisper{}); err != nil {