diff --git a/backend/go/parakeet-cpp/Makefile b/backend/go/parakeet-cpp/Makefile index 5fe780ac4..d8f148d6f 100644 --- a/backend/go/parakeet-cpp/Makefile +++ b/backend/go/parakeet-cpp/Makefile @@ -1,6 +1,6 @@ # parakeet-cpp backend Makefile. # -# Upstream pin lives below as PARAKEET_VERSION?=843600590f96a31467a5199f827c253f34c110f7 +# Upstream pin lives below as PARAKEET_VERSION?=50dfc24b4faa4ee23a1f59401f1d0c87fc4042b0 # (.github/bump_deps.sh) can find and update it - matches the # whisper.cpp / ds4 / vibevoice-cpp convention. # @@ -15,7 +15,7 @@ # That's what the L0 smoke test uses. The default target below does the # proper clone-at-pin + cmake build so CI doesn't need a side-checkout. -PARAKEET_VERSION?=843600590f96a31467a5199f827c253f34c110f7 +PARAKEET_VERSION?=50dfc24b4faa4ee23a1f59401f1d0c87fc4042b0 PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp GOCMD?=go diff --git a/backend/go/parakeet-cpp/batcher.go b/backend/go/parakeet-cpp/batcher.go index 4a7c169e7..d66d2f0d5 100644 --- a/backend/go/parakeet-cpp/batcher.go +++ b/backend/go/parakeet-cpp/batcher.go @@ -7,8 +7,12 @@ import "time" type batchRequest struct { pcm []float32 decoder int32 - tag string - reply chan batchReply + // language is the per-request target locale ("" means the model default). + // parakeet.cpp's batched C-API takes ONE target_lang for the whole batch, + // so the dispatcher only coalesces requests that share a language. + language string + tag string + reply chan batchReply } // batchReply carries one per-item JSON object string (an element of the C-API's @@ -43,13 +47,25 @@ func newBatcher(maxSize int, maxWait time.Duration, runBatch func([]*batchReques // run is the dispatcher loop: accumulate submitted requests until either maxSize // is reached or maxWait elapses since the first queued request, then dispatch. // Exits when stop is closed (draining any partially-filled batch first). +// +// A batch carries ONE language (parakeet.cpp's batched C-API takes a single +// target_lang), so a request whose language differs from the batch leader is +// not coalesced: it is held in carry and becomes the leader of the next batch. +// carry is therefore never dropped and its caller never deadlocks: every batch +// (including a lone carry on stop) is dispatched, and runBatch replies to all. func (b *batcher) run(stop <-chan struct{}) { + var carry *batchRequest for { var first *batchRequest - select { - case first = <-b.submit: - case <-stop: - return + if carry != nil { + // A mismatched request from the previous fill leads this batch. + first, carry = carry, nil + } else { + select { + case first = <-b.submit: + case <-stop: + return + } } batch := []*batchRequest{first} @@ -64,12 +80,22 @@ func (b *batcher) run(stop <-chan struct{}) { for len(batch) < b.maxSize { select { case r := <-b.submit: + if r.language != first.language { + // Different language: carry it to the next batch so this + // batch stays single-language, then dispatch what we have. + carry = r + break fill + } batch = append(batch, r) case <-timer.C: break fill case <-stop: timer.Stop() b.runBatch(batch) + // Don't strand a carried request's caller on shutdown. + if carry != nil { + b.runBatch([]*batchRequest{carry}) + } return } } diff --git a/backend/go/parakeet-cpp/batcher_test.go b/backend/go/parakeet-cpp/batcher_test.go index e51122ee5..b0d10f8dc 100644 --- a/backend/go/parakeet-cpp/batcher_test.go +++ b/backend/go/parakeet-cpp/batcher_test.go @@ -105,4 +105,60 @@ var _ = Describe("batcher", func() { go func() { <-rep }() Eventually(dispatched, "2s").Should(Receive(Equal(1))) }) + + It("never coalesces requests with different languages into one batch", func() { + // parakeet.cpp's batched C-API takes ONE target_lang per batch, so the + // dispatcher must keep every dispatched batch single-language. Submit a + // mix of languages and assert (a) no batch ever carries more than one + // distinct language and (b) every submitted request still gets a reply + // (the mismatched carry-over is never dropped). + var mu sync.Mutex + var langsPerBatch [][]string + run := func(reqs []*batchRequest) { + seen := map[string]struct{}{} + var distinct []string + for _, r := range reqs { + if _, ok := seen[r.language]; !ok { + seen[r.language] = struct{}{} + distinct = append(distinct, r.language) + } + } + mu.Lock() + langsPerBatch = append(langsPerBatch, distinct) + mu.Unlock() + echoReply(reqs) + } + // Large window + size so the fill loop stays open across submits and the + // language constraint (not the timer) is what splits the batches. + b := newBatcher(16, 200*time.Millisecond, run) + stop := make(chan struct{}) + go b.run(stop) + defer close(stop) + + langs := []string{"en", "en", "de", "de", "en", "fr", "fr"} + const N = 7 + var wg sync.WaitGroup + got := make([]string, N) + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + rep := make(chan batchReply, 1) + b.submit <- &batchRequest{tag: string(rune('a' + i)), language: langs[i], reply: rep} + got[i] = (<-rep).json + }(i) + } + wg.Wait() + + mu.Lock() + defer mu.Unlock() + // Invariant: every dispatched batch is single-language. + for _, distinct := range langsPerBatch { + Expect(len(distinct)).To(Equal(1), "a batch coalesced more than one language: %v", distinct) + } + // Liveness: every request got a reply (carry-over never stranded). + for i := 0; i < N; i++ { + Expect(got[i]).To(Equal(string(rune('a' + i)))) + } + }) }) diff --git a/backend/go/parakeet-cpp/goparakeetcpp.go b/backend/go/parakeet-cpp/goparakeetcpp.go index 79ed2e7b9..4821b9c40 100644 --- a/backend/go/parakeet-cpp/goparakeetcpp.go +++ b/backend/go/parakeet-cpp/goparakeetcpp.go @@ -48,6 +48,13 @@ var ( // side reads them as const float*/const int*. CppTranscribePcmBatchJSON func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32) uintptr + // CppTranscribePcmBatchJSONLang is the multilingual variant of the batched + // JSON entry point: identical, plus a trailing target_lang. "" (the model + // default, "auto") is passed for non-prompt models, which ignore it; an + // unknown locale on a prompt model returns 0 and sets last_error. Present + // only in newer libparakeet.so; nil falls back to CppTranscribePcmBatchJSON. + CppTranscribePcmBatchJSONLang func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32, targetLang string) uintptr + // Cache-aware streaming (RNN-T) entry points. stream_begin returns 0 for // non-streaming models. feed/finalize return a malloc'd char* (uintptr, // freed via CppFreeString); feed writes 1 to *eouOut on an /. @@ -55,6 +62,11 @@ var ( CppStreamFeed func(s uintptr, pcm []float32, nSamples int32, eouOut unsafe.Pointer) uintptr CppStreamFinalize func(s uintptr) uintptr CppStreamFree func(s uintptr) + + // CppStreamBeginLang is the multilingual variant of stream_begin: identical, + // plus a trailing target_lang ("" means the model default). Present only in + // newer libparakeet.so; nil falls back to CppStreamBegin. + CppStreamBeginLang func(ctx uintptr, targetLang string) uintptr ) // streamChunkSamples is how much 16 kHz mono PCM we hand to stream_feed per @@ -187,8 +199,19 @@ func (p *ParakeetCpp) runBatch(reqs []*batchRequest) { if len(reqs) > 0 { dec = reqs[0].decoder } + // All requests in a batch share one language (the batcher coalesces only + // same-language requests), so any element's language describes the batch. + lang := "" + if len(reqs) > 0 { + lang = reqs[0].language + } p.engineMu.Lock() - cstr := CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec) + var cstr uintptr + if CppTranscribePcmBatchJSONLang != nil { + cstr = CppTranscribePcmBatchJSONLang(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec, lang) + } else { + cstr = CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec) + } p.engineMu.Unlock() if cstr == 0 { err := fmt.Errorf("parakeet-cpp: batch transcribe failed: %s", CppLastError(p.ctxPtr)) @@ -226,8 +249,9 @@ func (p *ParakeetCpp) runBatch(reqs []*batchRequest) { // OpenAI API, whose default is segment-level); token ids always populate // Segment.Tokens. // -// translate/diarize/prompt/temperature/language/threads are not applicable to -// parakeet and are ignored; streaming is handled by AudioTranscriptionStream +// translate/diarize/prompt/temperature/threads are not applicable to parakeet +// and are ignored; language is honored on the batched + streaming paths (see +// opts.GetLanguage() below); streaming is handled by AudioTranscriptionStream // (L2). func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) { if p.ctxPtr == 0 { @@ -271,7 +295,7 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip } rep := make(chan batchReply, 1) select { - case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, reply: rep}: + case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, language: opts.GetLanguage(), reply: rep}: case <-ctx.Done(): return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled") } @@ -361,7 +385,12 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra return status.Error(codes.Canceled, "transcription cancelled") } - stream := CppStreamBegin(p.ctxPtr) + var stream uintptr + if CppStreamBeginLang != nil { + stream = CppStreamBeginLang(p.ctxPtr, opts.GetLanguage()) + } else { + stream = CppStreamBegin(p.ctxPtr) + } if stream == 0 { // Not a cache-aware streaming model: run a normal offline // transcription and emit it as one delta + a closing final result. diff --git a/backend/go/parakeet-cpp/main.go b/backend/go/parakeet-cpp/main.go index 32d94b7b1..23b4ec8a1 100644 --- a/backend/go/parakeet-cpp/main.go +++ b/backend/go/parakeet-cpp/main.go @@ -65,6 +65,17 @@ func main() { purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json") } + // Per-request language variants (multilingual nemotron). Same probe pattern: + // present only in libparakeet.so built with multilingual support, so the + // backend still loads against an older library and falls back to the + // non-lang batched + streaming entry points (model default / "auto"). + if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json_lang"); err == nil && sym != 0 { + purego.RegisterLibFunc(&CppTranscribePcmBatchJSONLang, lib, "parakeet_capi_transcribe_pcm_batch_json_lang") + } + if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_begin_lang"); err == nil && sym != 0 { + purego.RegisterLibFunc(&CppStreamBeginLang, lib, "parakeet_capi_stream_begin_lang") + } + fmt.Fprintf(os.Stderr, "[parakeet-cpp] ABI=%d\n", CppAbiVersion()) flag.Parse() diff --git a/gallery/index.yaml b/gallery/index.yaml index f6089b9b4..570a6fcea 100644 --- a/gallery/index.yaml +++ b/gallery/index.yaml @@ -31940,6 +31940,41 @@ - filename: parakeet-cpp/tdt_ctc-1.1b-f16.gguf uri: huggingface://mudler/parakeet-cpp-gguf/tdt_ctc-1.1b-f16.gguf sha256: cd53f64eefac2623a12f2f118ef50b56622dc3012f42c815c6adf0d08292f387 +- name: parakeet-cpp-nemotron-3.5-asr-streaming-0.6b + url: github:mudler/LocalAI/gallery/virtual.yaml@master + urls: + - https://huggingface.co/mudler/parakeet-cpp-gguf + - https://huggingface.co/nvidia/nemotron-3.5-asr-streaming-0.6b + - https://github.com/mudler/parakeet.cpp + description: | + Multilingual (40+ locales), prompt-conditioned, cache-aware streaming FastConformer RNN-T, 0.6B. + Q8_0 GGUF for the parakeet-cpp backend (C++/ggml port of NVIDIA NeMo). Byte-identical to NeMo at + WER 0 offline and streaming, about 2.5x faster than NeMo on CPU with no GPU. Select a language with + the request "language" field (for example en, de, es, ja-JP), or leave it empty for automatic + detection. License OpenMDW-1.1. + license: other + tags: + - parakeet + - parakeet-cpp + - nemotron + - asr + - speech-recognition + - stt + - multilingual + - streaming + - gguf + - ggml + overrides: + backend: parakeet-cpp + known_usecases: + - transcript + name: parakeet-cpp-nemotron-3.5-asr-streaming-0.6b + parameters: + model: parakeet-cpp/nemotron-3.5-asr-streaming-0.6b-q8_0.gguf + files: + - filename: parakeet-cpp/nemotron-3.5-asr-streaming-0.6b-q8_0.gguf + uri: huggingface://mudler/parakeet-cpp-gguf/nemotron-3.5-asr-streaming-0.6b-q8_0.gguf + sha256: ba2f13eccd4a5245be728f77e6149bd6a4fdcdd133ff2e08ac6005bcef7a99f1 - name: parakeet-crispasr url: github:mudler/LocalAI/gallery/virtual.yaml@master urls: