feat(parakeet-cpp): dynamic batching for concurrent transcription requests (#10112)

* feat(parakeet-cpp): dynamic-batching scheduler (queue + dispatcher)

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(parakeet-cpp): dynamic batching for AudioTranscription via batched JSON C-API

Drop SingleThread; route unary transcription through the in-process batcher
which coalesces concurrent requests into one batched engine call. Streaming
stays mutually exclusive via engineMu. Adds batch_max_size / batch_max_wait_ms
options (size=1 disables; recommended on CPU).

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(parakeet-cpp): tear down dispatcher in Free; log batch config; preallocate; clarify stream lock

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(parakeet-cpp): Ginkgo batcher tests; optional batch C-API binding with per-request fallback

The batched JSON C-API symbol exists only in newer libparakeet.so (ABI >= 2);
probe it with Dlsym and register optionally so the backend still loads against
an older library, falling back to per-request transcription. Rewrites the
batcher unit tests as Ginkgo/Gomega specs (forbidigo bans t.Fatal in tests).

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(parakeet-cpp): debug-log coalesced batch size in runBatch

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(parakeet-cpp): default batch_max_size to 1 (batching opt-in)

Dynamic batching now defaults off (batch_max_size:1, one request at a
time). Raise batch_max_size to opt in: it is a large throughput win on
GPU under concurrent load, but on CPU and low-concurrency setups it only
adds latency, so off is the safer default. The startup log now states
whether batching is on or off, and the audio-to-text docs are updated to
match.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* chore(parakeet-cpp): bump parakeet.cpp to 8a7c482 (batched decode + B=1 fast-path)

parakeet.cpp PR #1 merged the batched encoder/decode and the B=1 encoder
fast-path to master. Point PARAKEET_VERSION at that commit so the backend
builds the batched C-API (parakeet_capi_transcribe_pcm_batch_json) that the
dynamic batcher calls; the prior pin (30a3075) predated it, so only the
per-request fallback path was exercised. Verified the shared lib builds with
the backend's CMake flags and exports the batch symbol.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-06-02 14:49:02 +02:00
committed by GitHub
parent a5a0b3dc4e
commit 860f9d63ad
6 changed files with 391 additions and 41 deletions

View File

@@ -0,0 +1,79 @@
package main
import "time"
// batchRequest is one in-flight unary transcription waiting to be batched.
// In production pcm/decoder are set; tag is an opaque marker used by tests.
type batchRequest struct {
pcm []float32
decoder int32
tag string
reply chan batchReply
}
// batchReply carries one per-item JSON object string (an element of the C-API's
// JSON array) or an error back to the waiting handler goroutine.
type batchReply struct {
json string
err error
}
// batcher coalesces concurrent batchRequests into batched runBatch calls. A
// single run() goroutine is the sole caller of runBatch, so runBatch (which in
// production calls the thread-unsafe C engine) is never entered concurrently.
type batcher struct {
submit chan *batchRequest
maxSize int
maxWait time.Duration
runBatch func(reqs []*batchRequest) // must deliver a reply to every req
}
func newBatcher(maxSize int, maxWait time.Duration, runBatch func([]*batchRequest)) *batcher {
if maxSize < 1 {
maxSize = 1
}
return &batcher{
submit: make(chan *batchRequest),
maxSize: maxSize,
maxWait: maxWait,
runBatch: runBatch,
}
}
// run is the dispatcher loop: accumulate submitted requests until either maxSize
// is reached or maxWait elapses since the first queued request, then dispatch.
// Exits when stop is closed (draining any partially-filled batch first).
func (b *batcher) run(stop <-chan struct{}) {
for {
var first *batchRequest
select {
case first = <-b.submit:
case <-stop:
return
}
batch := []*batchRequest{first}
// maxSize==1 disables batching: dispatch immediately (passthrough).
if b.maxSize == 1 {
b.runBatch(batch)
continue
}
timer := time.NewTimer(b.maxWait)
fill:
for len(batch) < b.maxSize {
select {
case r := <-b.submit:
batch = append(batch, r)
case <-timer.C:
break fill
case <-stop:
timer.Stop()
b.runBatch(batch)
return
}
}
timer.Stop()
b.runBatch(batch)
}
}

View File

@@ -0,0 +1,108 @@
package main
import (
"sync"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("batcher", func() {
echoReply := func(reqs []*batchRequest) {
for _, r := range reqs {
r.reply <- batchReply{json: r.tag}
}
}
It("coalesces concurrent submits into batches", func() {
var mu sync.Mutex
var sizes []int
run := func(reqs []*batchRequest) {
mu.Lock()
sizes = append(sizes, len(reqs))
mu.Unlock()
echoReply(reqs)
}
b := newBatcher(4, 50*time.Millisecond, run)
stop := make(chan struct{})
go b.run(stop)
defer close(stop)
const N = 4
var wg sync.WaitGroup
got := make([]string, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
rep := make(chan batchReply, 1)
b.submit <- &batchRequest{tag: string(rune('a' + i)), reply: rep}
got[i] = (<-rep).json
}(i)
}
wg.Wait()
mu.Lock()
defer mu.Unlock()
total, maxBatch := 0, 0
for _, s := range sizes {
total += s
if s > maxBatch {
maxBatch = s
}
}
Expect(total).To(Equal(N))
Expect(maxBatch).To(BeNumerically(">=", 2), "expected at least one batch to coalesce >1 request")
})
It("dispatches when max size is reached", func() {
dispatched := make(chan int, 8)
run := func(reqs []*batchRequest) {
dispatched <- len(reqs)
echoReply(reqs)
}
b := newBatcher(2, time.Hour, run) // huge window: only size can trigger
stop := make(chan struct{})
go b.run(stop)
defer close(stop)
for i := 0; i < 2; i++ {
rep := make(chan batchReply, 1)
b.submit <- &batchRequest{tag: "x", reply: rep}
go func(rep chan batchReply) { <-rep }(rep)
}
Eventually(dispatched, "2s").Should(Receive(Equal(2)))
})
It("dispatches when the wait window elapses", func() {
dispatched := make(chan int, 8)
run := func(reqs []*batchRequest) {
dispatched <- len(reqs)
echoReply(reqs)
}
b := newBatcher(8, 20*time.Millisecond, run) // size unreachable; window fires
stop := make(chan struct{})
go b.run(stop)
defer close(stop)
rep := make(chan batchReply, 1)
b.submit <- &batchRequest{tag: "x", reply: rep}
go func() { <-rep }()
Eventually(dispatched, "2s").Should(Receive(Equal(1)))
})
It("bypasses batching when max size is 1", func() {
dispatched := make(chan int, 8)
run := func(reqs []*batchRequest) {
dispatched <- len(reqs)
echoReply(reqs)
}
b := newBatcher(1, time.Hour, run) // size 1 => immediate dispatch
stop := make(chan struct{})
go b.run(stop)
defer close(stop)
rep := make(chan batchReply, 1)
b.submit <- &batchRequest{tag: "x", reply: rep}
go func() { <-rep }()
Eventually(dispatched, "2s").Should(Receive(Equal(1)))
})
})

View File

@@ -7,13 +7,17 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"unsafe"
"github.com/go-audio/wav"
"github.com/mudler/LocalAI/pkg/grpc/base"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/xlog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -34,6 +38,15 @@ var (
CppFreeString func(s uintptr)
CppLastError func(ctx uintptr) string
// Batched JSON transcription: takes a concatenated float buffer of clips
// plus their per-clip sample counts (sum(nSamples)==len(samplesConcat))
// and returns a malloc'd char* JSON ARRAY of per-clip {"text","words",
// "tokens"} objects (uintptr, freed via CppFreeString). purego passes the
// Go slices as the base pointer of their backing array (kept alive for the
// call), matching the CppStreamFeed pcm []float32 binding pattern; the C
// side reads them as const float*/const int*.
CppTranscribePcmBatchJSON func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32) uintptr
// Cache-aware streaming (RNN-T) entry points. stream_begin returns 0 for
// non-streaming models. feed/finalize return a malloc'd char* (uintptr,
// freed via CppFreeString); feed writes 1 to *eouOut on an <EOU>/<EOB>.
@@ -77,11 +90,18 @@ type transcriptToken struct {
}
// ParakeetCpp owns a single loaded parakeet_ctx. The C engine is a
// thread-unsafe singleton (mirrors whisper.cpp / vibevoice.cpp), so we
// serialize calls through base.SingleThread.
// thread-unsafe singleton (mirrors whisper.cpp / vibevoice.cpp). Rather than
// serialize every call through base.SingleThread, we route unary
// transcription through an in-process batcher (its sole dispatcher goroutine
// is the only caller of the engine on that path) and guard the shared engine
// with engineMu so a streaming session and a batched-unary dispatch never
// touch it concurrently.
type ParakeetCpp struct {
base.SingleThread
ctxPtr uintptr
base.Base
ctxPtr uintptr
engineMu sync.Mutex // sole guard of the one C engine (dispatcher + streaming)
bat *batcher
batStop chan struct{}
}
// Load is the LocalAI gRPC entry point for LoadModel: it calls
@@ -100,13 +120,103 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error {
return fmt.Errorf("parakeet-cpp: parakeet_capi_load failed for %q", opts.ModelFile)
}
p.ctxPtr = ctx
// Dynamic batching knobs (model YAML options:, key:value form). Batching is
// OFF by default (batch_max_size:1): each request runs on its own. On GPU,
// raising batch_max_size coalesces concurrent requests into one batched
// engine call and improves throughput under load; leave it at 1 on CPU and
// for low-concurrency setups, where batching only adds latency.
maxSize := optInt(opts, "batch_max_size", 1)
maxWaitMs := optInt(opts, "batch_max_wait_ms", 15)
if maxWaitMs < 0 {
maxWaitMs = 0
}
if CppTranscribePcmBatchJSON != nil {
p.batStop = make(chan struct{})
p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch)
go p.bat.run(p.batStop) // dispatcher runs until Free closes batStop
if maxSize > 1 {
xlog.Info("parakeet-cpp: dynamic batching enabled",
"batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs)
} else {
xlog.Info("parakeet-cpp: dynamic batching off (batch_max_size=1); " +
"set batch_max_size>1 to coalesce concurrent requests on GPU")
}
} else {
xlog.Info("parakeet-cpp: batched C-API not present in libparakeet.so; " +
"batching disabled, using per-request transcription")
}
return nil
}
// AudioTranscription runs parakeet_capi_transcribe_path_json on the wav at
// opts.Dst with the default decoder (decoder=0, which selects the right head
// per architecture: transducer for tdt/rnnt/hybrid, CTC for ctc) and shapes
// the per-word timestamps into a LocalAI TranscriptResult.
// optInt reads an integer model option (key:value form) from ModelOptions,
// returning def when absent or unparseable. The options array carries the
// model YAML's options: entries (see core/config; siblings such as
// acestep-cpp parse the same key:value form via strings.Cut on ":").
func optInt(opts *pb.ModelOptions, key string, def int) int {
for _, o := range opts.GetOptions() {
k, v, ok := strings.Cut(o, ":")
if ok && strings.TrimSpace(k) == key {
if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
return n
}
}
}
return def
}
// runBatch is the dispatcher's batch handler and the ONLY caller of the C
// engine on the unary path. It concatenates the batch PCM, calls the batched
// JSON C-API under engineMu, splits the JSON array, and replies to each request.
func (p *ParakeetCpp) runBatch(reqs []*batchRequest) {
// Observability: the actual coalesced batch size per engine call. Debug-level
// so it stays silent in normal operation but lets operators confirm/tune batching.
xlog.Debug("parakeet-cpp: dispatching batch", "size", len(reqs))
nSamples := make([]int32, len(reqs))
total := 0
for i, r := range reqs {
nSamples[i] = int32(len(r.pcm))
total += len(r.pcm)
}
concat := make([]float32, 0, total)
for _, r := range reqs {
concat = append(concat, r.pcm...)
}
var dec int32
if len(reqs) > 0 {
dec = reqs[0].decoder
}
p.engineMu.Lock()
cstr := CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec)
p.engineMu.Unlock()
if cstr == 0 {
err := fmt.Errorf("parakeet-cpp: batch transcribe failed: %s", CppLastError(p.ctxPtr))
for _, r := range reqs {
r.reply <- batchReply{err: err}
}
return
}
raw := goStringFromCPtr(cstr)
CppFreeString(cstr)
var docs []json.RawMessage
if err := json.Unmarshal([]byte(raw), &docs); err != nil || len(docs) != len(reqs) {
e := fmt.Errorf("parakeet-cpp: batch json: got %d results for %d reqs (%v)", len(docs), len(reqs), err)
for _, r := range reqs {
r.reply <- batchReply{err: e}
}
return
}
for i, r := range reqs {
r.reply <- batchReply{json: string(docs[i])}
}
}
// AudioTranscription decodes the wav at opts.Dst to 16 kHz mono PCM and
// submits it to the in-process batcher, which coalesces concurrent requests
// into a single batched engine call (parakeet_capi_transcribe_pcm_batch_json)
// with the default decoder (decoder=0, which selects the right head per
// architecture: transducer for tdt/rnnt/hybrid, CTC for ctc) and shapes the
// per-word timestamps into a LocalAI TranscriptResult.
//
// Parakeet emits word- and token-level timestamps but no native segment
// boundaries, so we synthesise a single whole-clip segment spanning the first
@@ -118,7 +228,7 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error {
// translate/diarize/prompt/temperature/language/threads are not applicable to
// parakeet and are ignored; streaming is handled by AudioTranscriptionStream
// (L2).
func (p *ParakeetCpp) AudioTranscription(_ context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) {
func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) {
if p.ctxPtr == 0 {
return pb.TranscriptResult{}, errors.New("parakeet-cpp: model not loaded")
}
@@ -126,61 +236,74 @@ func (p *ParakeetCpp) AudioTranscription(_ context.Context, opts *pb.TranscriptR
return pb.TranscriptResult{}, errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required")
}
cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0)
if cstr == 0 {
msg := CppLastError(p.ctxPtr)
if msg == "" {
msg = "unknown error"
// Fallback when the batched C-API is unavailable: transcribe directly from
// the file path (original behavior, no batching).
if p.bat == nil {
cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0)
if cstr == 0 {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", CppLastError(p.ctxPtr))
}
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", msg)
raw := goStringFromCPtr(cstr)
CppFreeString(cstr)
var doc transcriptJSON
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts), nil
}
raw := goStringFromCPtr(cstr)
CppFreeString(cstr)
// Batched path: decode to PCM, submit to the batcher, wait for this request's
// JSON element. The dispatcher is the sole engine caller on this path; both
// sends honour ctx cancellation.
pcm, _, err := decodeWavMono16k(opts.Dst)
if err != nil {
return pb.TranscriptResult{}, err
}
rep := make(chan batchReply, 1)
select {
case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, reply: rep}:
case <-ctx.Done():
return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled")
}
var res batchReply
select {
case res = <-rep:
case <-ctx.Done():
return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled")
}
if res.err != nil {
return pb.TranscriptResult{}, res.err
}
var doc transcriptJSON
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
if err := json.Unmarshal([]byte(res.json), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts), nil
}
// transcriptResultFromDoc maps a decoded transcriptJSON to a TranscriptResult,
// synthesising a single whole-clip segment and attaching word timings only when
// the caller requested word granularity. Shared by the batched and direct paths.
func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest) pb.TranscriptResult {
text := strings.TrimSpace(doc.Text)
words := make([]*pb.TranscriptWord, 0, len(doc.Words))
for _, w := range doc.Words {
words = append(words, &pb.TranscriptWord{
Start: secondsToNanos(w.Start),
End: secondsToNanos(w.End),
Text: w.W,
})
words = append(words, &pb.TranscriptWord{Start: secondsToNanos(w.Start), End: secondsToNanos(w.End), Text: w.W})
}
tokens := make([]int32, 0, len(doc.Tokens))
for _, t := range doc.Tokens {
tokens = append(tokens, t.ID)
}
// Single whole-clip segment, spanning the first word start to the last
// word end (0/0 when the clip produced no words).
var segStart, segEnd int64
if len(words) > 0 {
segStart = words[0].Start
segEnd = words[len(words)-1].End
}
seg := &pb.TranscriptSegment{
Id: 0,
Start: segStart,
End: segEnd,
Text: text,
Tokens: tokens,
}
seg := &pb.TranscriptSegment{Id: 0, Start: segStart, End: segEnd, Text: text, Tokens: tokens}
if wordsRequested(opts.TimestampGranularities) {
seg.Words = words
}
return pb.TranscriptResult{
Text: text,
Segments: []*pb.TranscriptSegment{seg},
}, nil
return pb.TranscriptResult{Text: text, Segments: []*pb.TranscriptSegment{seg}}
}
// wordsRequested reports whether the caller asked for word-level timestamps.
@@ -243,6 +366,14 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
return nil
}
defer CppStreamFree(stream)
// The C engine is a single shared context: a streaming session and a batched
// unary dispatch must never touch it at once, so hold engineMu for the whole
// stream. This lock is intentionally taken AFTER the non-streaming fallback
// above returns: that fallback goes through AudioTranscription -> the batcher
// -> runBatch, which itself acquires engineMu, so locking here first would
// deadlock. Do not hoist this lock above the fallback.
p.engineMu.Lock()
defer p.engineMu.Unlock()
data, duration, err := decodeWavMono16k(opts.Dst)
if err != nil {
@@ -362,6 +493,12 @@ func decodeWavMono16k(path string) ([]float32, float32, error) {
// Free releases the underlying parakeet_ctx. Called by LocalAI when the
// model is unloaded.
func (p *ParakeetCpp) Free() error {
// Stop the dispatcher before releasing the engine so no in-flight runBatch
// can touch a freed ctx (close leak / use-after-free on reload).
if p.batStop != nil {
close(p.batStop)
p.batStop = nil
}
if p.ctxPtr != 0 {
CppFree(p.ctxPtr)
p.ctxPtr = 0

View File

@@ -43,6 +43,9 @@ func ensureLibLoaded() {
purego.RegisterLibFunc(&CppFree, lib, "parakeet_capi_free")
purego.RegisterLibFunc(&CppTranscribePath, lib, "parakeet_capi_transcribe_path")
purego.RegisterLibFunc(&CppTranscribePathJSON, lib, "parakeet_capi_transcribe_path_json")
if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json")
}
purego.RegisterLibFunc(&CppStreamBegin, lib, "parakeet_capi_stream_begin")
purego.RegisterLibFunc(&CppStreamFeed, lib, "parakeet_capi_stream_feed")
purego.RegisterLibFunc(&CppStreamFinalize, lib, "parakeet_capi_stream_finalize")

View File

@@ -58,6 +58,13 @@ func main() {
purego.RegisterLibFunc(lf.FuncPtr, lib, lf.Name)
}
// The batched-JSON entry point exists only in newer libparakeet.so (ABI >= 2).
// Probe with Dlsym and register only if present, so the backend still loads
// against an older library (it falls back to per-request transcription).
if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json")
}
fmt.Fprintf(os.Stderr, "[parakeet-cpp] ABI=%d\n", CppAbiVersion())
flag.Parse()

View File

@@ -187,6 +187,22 @@ curl http://localhost:8080/v1/audio/transcriptions \
For real-time use, load a cache-aware streaming model (e.g. `realtime_eou_120m-v1-*.gguf`) and pass `-F stream=true`. Deltas are emitted as the audio is decoded, with end-of-utterance events closing each segment.
### Dynamic batching
The backend can coalesce concurrent transcription requests into a single batched engine call, which improves throughput on GPU when many requests arrive at once. Batching is **off by default** (`batch_max_size:1`, one request at a time); raise it to opt in. Two `options:` knobs control it:
```yaml
name: parakeet-110m
backend: parakeet-cpp
parameters:
model: tdt_ctc-110m-f16.gguf
options:
- batch_max_size:8 # max requests coalesced into one batch (default 1 = off)
- batch_max_wait_ms:15 # how long to wait to fill a batch, in ms (default 15)
```
By default each request runs on its own. Raise `batch_max_size` (for example 4 to 16) to enable batching; it pays off on GPU under concurrent load, where coalescing the per-step decode GEMMs across requests is a large throughput win. Leave it at 1 on CPU and for low-concurrency setups, where batching only adds latency. Batching only affects concurrent unary requests; streaming sessions always run on their own.
## See also
- [Audio Transform]({{< relref "audio-transform.md" >}}) — clean up the audio (echo cancellation, noise suppression, dereverberation) before passing it to a transcription model.