Files
LocalAI/backend/go/parakeet-cpp/goparakeetcpp.go
Richard Palethorpe 5d0c43ec6e feat(realtime): Semantic VAD EOU token (#10444)
* feat(realtime): EOU-driven semantic_vad turn detection

Add a `semantic_vad` turn-detection mode to the realtime API that feeds
the transcription model live and decides "the user finished speaking"
from the `<EOU>` end-of-utterance token rather than from silence alone.
When EOU fires the turn commits immediately (~0.3s); otherwise it falls
back to an eagerness-scaled silence threshold (low/med/high = 8/4/2s).

Plumbing, bottom to top:

- proto: `AudioTranscriptionLive` bidirectional RPC (config-first oneof,
  mono float PCM @16k, ready-ack / Unimplemented degrade signal) plus
  `TranscriptResult.eou` for the unary retranscribe gate.
- pkg/grpc: client/server/base/embed scaffolding for the bidi stream,
  modeled on AudioTransformStream; release stream conns on terminal Recv.
- parakeet-cpp: live transcription RPC with per-C-call engine locking
  (one live stream per turn, finalize+free at commit); bump parakeet.cpp
  to ABI v5 — incremental StreamingMel (no more quadratic per-feed mel
  recompute that delayed EOU on long turns) and the <EOU>/<EOB> split;
  strip the literal <EOU>/<EOB> from offline text and set Eou.
- core/backend: LiveTranscriptionSession wrapper + pipeline
  `turn_detection:` config block (type/eagerness/retranscribe).
- realtime: semantic_vad integration — live input captions streamed as
  transcription deltas while the user speaks, EOU-immediate commit with
  eagerness fallback, optional retranscribe gate (batch re-decode must
  also end in <EOU> to confirm), clause synthesis off the LLM token
  callback, and per-turn live-transcription / model_load telemetry.
- UI: show the realtime pipeline components as a vertical list.

Docs and tests included; opt-in via the pipeline YAML or per-session
`session.update`. Non-streaming STT backends degrade to silence-only.

Assisted-by: Claude Code:claude-opus-4-8 [Read] [Edit] [Write] [Bash]
Assisted-by: Claude Code:claude-fable-5 [Read] [Edit] [Bash]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(realtime): explicit formally-verified state machines + parakeet streaming driver

The realtime API had several implicit state machines whose state was inferred
from scattered booleans, channels, and five separate mutexes, leaving
illegal/inconsistent states reachable. Make them explicit and keep the
implementation in step with a formal design; rework the parakeet streaming
backend along the same lines.

Realtime state machines (M1-M5). Each is a sealed sum-type State/Event/Effect
with a total, pure Next(state,event)->(state,[]effect) behind a single-writer
Coordinator:

  M1 conncoord    connection lifecycle: VAD toggle + once-only teardown
                  (replaces vadServerStarted + a `done` channel closed from
                  two sites).
  M2 turncoord    turn detection: collapses speechStarted and the live-stream
                  "turn open" flag into one state, so discardTurn can no longer
                  desync them and suppress the next onset.
  M3 respcoord    response coordination: serializes the dual-writer
                  start/cancel so at most one response is live; one
                  response.done per response.create.
  M4 compactcoord conversation compaction: single-flight (replaces the
                  `compacting atomic.Bool` CAS).
  M5 ttscoord     TTS pipeline: open->closing->closed, idempotent wait(),
                  rejects enqueue-after-close (was a silent drop).

The Coordinator/Sink/Next plumbing — only the sealed types and Next differed
per machine — is extracted once into core/http/endpoints/openai/coordinator as
a generic Coordinator[S,E,F]; each machine keeps its public API via type
aliases, so no sink, call-site, or test moved.

Hierarchy. session_lifecycle.fizz models M1 as the parent region with its
children (M2/M3/M4) as one statechart and asserts ChildrenDieWithParent (conn
torn => all children terminal, none start after teardown). respcoord and
compactcoord gain an absorbing Terminated state + Shutdown event; conncoord's
teardown drives the children terminal. This closes a compaction teardown gap: a
fire-and-forget compaction could outlive a torn session — compactionSink now
takes a session-scoped cancellable context + WaitGroup and joins the in-flight
summarize+evict on shutdown.

Formal verification. formal-verification/ holds one authoritative FizzBee spec
per machine plus the composition spec, each with an always-assertion and a
documented one-line edit that makes the checker fail (verified non-vacuous).
scripts/realtime-conformance.sh is fail-closed: all Go conformance suites under
-race AND a model-check of every .fizz spec; a missing FizzBee is a hard error
(only the loud REALTIME_CONFORMANCE_SKIP_FIZZBEE=1 bypasses it, never in CI).
FizzBee is pinned by sha256 and installed via scripts/install-fizzbee.sh into
.tools/ (gitignored). Wired as make test-realtime-conformance, a CI workflow,
and a pre-commit path filter. Go conformance tests are Ginkgo/Gomega (per the
repo's forbidigo lint): transition tables + fixed-seed property walks +
concurrent/-race specs, no rapid dependency. Design map:
docs/design/realtime-state-machines.md.

Parakeet streaming backend. The same treatment applied to the parakeet-cpp
streaming paths:
- AudioTranscriptionStream returns codes.Unimplemented for non-streaming models
  instead of decoding offline and emitting it as one delta + final. A client
  that asked for streaming learns the model cannot stream rather than receiving
  a batch result shaped like a stream. New grpcerrors.StreamTranscriptionUnsupported
  carries that signal; the HTTP /v1/audio/transcriptions stream path surfaces it
  as an SSE error event. Mirrors AudioTranscriptionLive, which already did this.
- utteranceBoundary (boundary.go): a single definition of the end-of-utterance
  latch, replacing three open-coded finalEou toggles. Modelled as a two-valued
  type so illegal states are unrepresentable.
- Shared decode driver (driver.go): streamFeedResult (one per-feed event) +
  feedChunk (hides the ABI v4 JSON vs text-only split) + feedSlices + flushTail.
  The feed loop is written once.
- AudioTranscriptionLive becomes a bidi adapter: it streams the per-feed
  {delta,eou,eob,words} the realtime turn detector consumes and a terminal
  FinalResult carrying only Text. Segments/duration/eou are offline-only and no
  longer produced (nor read) on the live path; liveTraceState drops the terminal
  eou and keeps the per-feed eou_events count.
- AudioTranscriptionStream + streamJSON merge into one driver-based function;
  streamSegmenter is generalized to the unified event with a text-only fallback
  that preserves the legacy (no-words) library's per-utterance segmentation.

Verified: build/vet/gofumpt clean, golangci-lint 0 issues, all coordinator and
parakeet packages under -race, the fail-closed conformance gate green, and
make test-realtime (12 e2e WS+WebRTC).

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

---------

Signed-off-by: Richard Palethorpe <io@richiejp.com>
2026-06-30 09:01:22 +02:00

862 lines
33 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"unsafe"
"github.com/go-audio/wav"
"github.com/mudler/LocalAI/pkg/grpc/base"
"github.com/mudler/LocalAI/pkg/grpc/grpcerrors"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/xlog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// purego-bound entry points from libparakeet.so. Names match
// parakeet_capi.h exactly so a `nm libparakeet.so | grep parakeet_capi`
// is enough to spot drift.
//
// Functions that return char* are declared as uintptr so we can call
// parakeet_capi_free_string on the same pointer after copying, the
// C-API contract is "caller owns and must free the returned buffer".
var (
CppAbiVersion func() int32
CppLoad func(ggufPath string) uintptr
CppFree func(ctx uintptr)
CppTranscribePath func(ctx uintptr, wavPath string, decoder int32) uintptr
CppTranscribePathJSON func(ctx uintptr, wavPath string, decoder int32) uintptr
CppFreeString func(s uintptr)
CppLastError func(ctx uintptr) string
// Batched JSON transcription: takes a concatenated float buffer of clips
// plus their per-clip sample counts (sum(nSamples)==len(samplesConcat))
// and returns a malloc'd char* JSON ARRAY of per-clip {"text","words",
// "tokens"} objects (uintptr, freed via CppFreeString). purego passes the
// Go slices as the base pointer of their backing array (kept alive for the
// call), matching the CppStreamFeed pcm []float32 binding pattern; the C
// side reads them as const float*/const int*.
CppTranscribePcmBatchJSON func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32) uintptr
// CppTranscribePcmBatchJSONLang is the multilingual variant of the batched
// JSON entry point: identical, plus a trailing target_lang. "" (the model
// default, "auto") is passed for non-prompt models, which ignore it; an
// unknown locale on a prompt model returns 0 and sets last_error. Present
// only in newer libparakeet.so; nil falls back to CppTranscribePcmBatchJSON.
CppTranscribePcmBatchJSONLang func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32, targetLang string) uintptr
// Cache-aware streaming (RNN-T) entry points. stream_begin returns 0 for
// non-streaming models. feed/finalize return a malloc'd char* (uintptr,
// freed via CppFreeString); feed writes 1 to *eouOut on an <EOU>/<EOB>.
CppStreamBegin func(ctx uintptr) uintptr
CppStreamFeed func(s uintptr, pcm []float32, nSamples int32, eouOut unsafe.Pointer) uintptr
CppStreamFinalize func(s uintptr) uintptr
CppStreamFree func(s uintptr)
// CppStreamBeginLang is the multilingual variant of stream_begin: identical,
// plus a trailing target_lang ("" means the model default). Present only in
// newer libparakeet.so; nil falls back to CppStreamBegin.
CppStreamBeginLang func(ctx uintptr, targetLang string) uintptr
// Streaming JSON variants (ABI v4): feed/finalize returning a malloc'd char*
// JSON document {text,eou,frame_sec,words} (uintptr, freed via CppFreeString)
// so streaming segments can carry per-word timestamps. Present only in newer
// libparakeet.so; nil falls back to the text-only CppStreamFeed/Finalize path.
CppStreamFeedJSON func(s uintptr, pcm []float32, nSamples int32) uintptr
CppStreamFinalizeJSON func(s uintptr) uintptr
)
// streamChunkSamples is how much 16 kHz mono PCM we hand to stream_feed per
// call (1 s). The session buffers internally and decodes once a full
// cache-aware encoder chunk is available, so this only bounds how often we
// poll for newly-finalized text, not the model's actual chunk size.
const streamChunkSamples = 16000
// transcriptJSON mirrors the document returned by
// parakeet_capi_transcribe_path_json (see parakeet_capi.h):
//
// {"text":"...",
// "words":[{"w":"...","start":0.480,"end":0.640,"conf":0.9100}, ...],
// "tokens":[{"id":123,"t":0.480,"conf":0.9100}, ...]}
//
// "start"/"end"/"t" are seconds; "conf" is confidence in (0,1].
type transcriptJSON struct {
Text string `json:"text"`
FrameSec float64 `json:"frame_sec"`
Words []transcriptWord `json:"words"`
Tokens []transcriptToken `json:"tokens"`
}
// streamFeedJSON mirrors the document returned by
// parakeet_capi_stream_feed_json / parakeet_capi_stream_finalize_json (ABI v5):
//
// {"text":"...","eou":0,"eob":0,"frame_sec":0.080000,
// "words":[{"w":"...","start":0.480,"end":0.640,"conf":0.9100}, ...]}
//
// "text" is the newly-finalized text since the last call. Under ABI v5 "eou"
// is 1 iff an <EOU> fired this feed (the user yielded the turn) and "eob" 1
// iff an <EOB> fired (a backchannel like "uh-huh" ended — NOT a turn
// boundary). A v4 library has no "eob" field and its "eou" conflates both
// tokens: Eob stays 0 and Eou keeps the old any-event meaning. "words" are
// the words finalized this call with absolute (stream-relative) start/end
// seconds.
type streamFeedJSON struct {
Text string `json:"text"`
Eou int `json:"eou"`
Eob int `json:"eob"`
FrameSec float64 `json:"frame_sec"`
Words []transcriptWord `json:"words"`
}
type transcriptWord struct {
W string `json:"w"`
Start float64 `json:"start"`
End float64 `json:"end"`
Conf float64 `json:"conf"`
}
type transcriptToken struct {
ID int32 `json:"id"`
T float64 `json:"t"`
Conf float64 `json:"conf"`
}
// ParakeetCpp owns a single loaded parakeet_ctx. The C engine is a
// thread-unsafe singleton (mirrors whisper.cpp / vibevoice.cpp). Rather than
// serialize every call through base.SingleThread, we route unary
// transcription through an in-process batcher (its sole dispatcher goroutine
// is the only caller of the engine on that path) and guard the shared engine
// with engineMu so a streaming session and a batched-unary dispatch never
// touch it concurrently.
type ParakeetCpp struct {
base.Base
ctxPtr uintptr
engineMu sync.Mutex // sole guard of the one C engine (dispatcher + streaming)
bat *batcher
batStop chan struct{}
// segmentGapFrames is NeMo's segment_gap_threshold in ENCODER FRAMES (model
// YAML option, default 0=off). When >0 it adds NeMo's silence-gap split on
// top of the punctuation split; converted to seconds via the JSON frame_sec.
segmentGapFrames int
}
// Load is the LocalAI gRPC entry point for LoadModel: it calls
// parakeet_capi_load with the GGUF path and stashes the resulting
// opaque context pointer for AudioTranscription.
func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error {
if opts.ModelFile == "" {
return errors.New("parakeet-cpp: ModelFile is required")
}
ctx := CppLoad(opts.ModelFile)
if ctx == 0 {
// No ctx to ask for last_error (the C-API's last-error buffer
// lives on the ctx that was never returned). Surface the path
// so the operator at least knows which load failed.
return fmt.Errorf("parakeet-cpp: parakeet_capi_load failed for %q", opts.ModelFile)
}
p.ctxPtr = ctx
// Dynamic batching knobs (model YAML options:, key:value form). Batching is
// OFF by default (batch_max_size:1): each request runs on its own. On GPU,
// raising batch_max_size coalesces concurrent requests into one batched
// engine call and improves throughput under load; leave it at 1 on CPU and
// for low-concurrency setups, where batching only adds latency.
maxSize := optInt(opts, "batch_max_size", 1)
maxWaitMs := optInt(opts, "batch_max_wait_ms", 15)
if maxWaitMs < 0 {
maxWaitMs = 0
}
// NeMo's segment_gap_threshold (encoder frames, default 0=off). Off by
// default matches NeMo's default (punctuation-only segments); when set it
// additionally splits segments on inter-word silence (see transcriptResultFromDoc).
p.segmentGapFrames = optInt(opts, "segment_gap_threshold", 0)
if CppTranscribePcmBatchJSON != nil {
p.batStop = make(chan struct{})
p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch)
go p.bat.run(p.batStop) // dispatcher runs until Free closes batStop
if maxSize > 1 {
xlog.Info("parakeet-cpp: dynamic batching enabled",
"batch_max_size", maxSize, "batch_max_wait_ms", maxWaitMs)
} else {
xlog.Info("parakeet-cpp: dynamic batching off (batch_max_size=1); " +
"set batch_max_size>1 to coalesce concurrent requests on GPU")
}
} else {
xlog.Info("parakeet-cpp: batched C-API not present in libparakeet.so; " +
"batching disabled, using per-request transcription")
}
return nil
}
// optInt reads an integer model option (key:value form) from ModelOptions,
// returning def when absent or unparseable. The options array carries the
// model YAML's options: entries (see core/config; siblings such as
// acestep-cpp parse the same key:value form via strings.Cut on ":").
func optInt(opts *pb.ModelOptions, key string, def int) int {
for _, o := range opts.GetOptions() {
k, v, ok := strings.Cut(o, ":")
if ok && strings.TrimSpace(k) == key {
if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
return n
}
}
}
return def
}
// runBatch is the dispatcher's batch handler and the ONLY caller of the C
// engine on the unary path. It concatenates the batch PCM, calls the batched
// JSON C-API under engineMu, splits the JSON array, and replies to each request.
func (p *ParakeetCpp) runBatch(reqs []*batchRequest) {
// Observability: the actual coalesced batch size per engine call. Debug-level
// so it stays silent in normal operation but lets operators confirm/tune batching.
xlog.Debug("parakeet-cpp: dispatching batch", "size", len(reqs))
nSamples := make([]int32, len(reqs))
total := 0
for i, r := range reqs {
nSamples[i] = int32(len(r.pcm))
total += len(r.pcm)
}
concat := make([]float32, 0, total)
for _, r := range reqs {
concat = append(concat, r.pcm...)
}
var dec int32
if len(reqs) > 0 {
dec = reqs[0].decoder
}
// All requests in a batch share one language (the batcher coalesces only
// same-language requests), so any element's language describes the batch.
lang := ""
if len(reqs) > 0 {
lang = reqs[0].language
}
p.engineMu.Lock()
var cstr uintptr
if CppTranscribePcmBatchJSONLang != nil {
cstr = CppTranscribePcmBatchJSONLang(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec, lang)
} else {
cstr = CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec)
}
p.engineMu.Unlock()
if cstr == 0 {
err := fmt.Errorf("parakeet-cpp: batch transcribe failed: %s", CppLastError(p.ctxPtr))
for _, r := range reqs {
r.reply <- batchReply{err: err}
}
return
}
raw := goStringFromCPtr(cstr)
CppFreeString(cstr)
var docs []json.RawMessage
if err := json.Unmarshal([]byte(raw), &docs); err != nil || len(docs) != len(reqs) {
e := fmt.Errorf("parakeet-cpp: batch json: got %d results for %d reqs (%v)", len(docs), len(reqs), err)
for _, r := range reqs {
r.reply <- batchReply{err: e}
}
return
}
for i, r := range reqs {
r.reply <- batchReply{json: string(docs[i])}
}
}
// AudioTranscription decodes the wav at opts.Dst to 16 kHz mono PCM and
// submits it to the in-process batcher, which coalesces concurrent requests
// into a single batched engine call (parakeet_capi_transcribe_pcm_batch_json)
// with the default decoder (decoder=0, which selects the right head per
// architecture: transducer for tdt/rnnt/hybrid, CTC for ctc) and shapes the
// per-word timestamps into a LocalAI TranscriptResult.
//
// Parakeet emits word- and token-level timestamps but no native segment
// boundaries, so we synthesise a single whole-clip segment spanning the first
// word start to the last word end. Word-level timings are attached only when
// the caller opts in via timestamp_granularities=["word"] (matching the
// OpenAI API, whose default is segment-level); token ids always populate
// Segment.Tokens.
//
// translate/diarize/prompt/temperature/threads are not applicable to parakeet
// and are ignored; language is honored on the batched + streaming paths (see
// opts.GetLanguage() below); streaming is handled by AudioTranscriptionStream
// (L2).
func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) {
if p.ctxPtr == 0 {
return pb.TranscriptResult{}, grpcerrors.ModelNotLoaded("parakeet-cpp")
}
if opts.Dst == "" {
return pb.TranscriptResult{}, errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required")
}
// Fallback when the batched C-API is unavailable: transcribe from a file
// path (original behavior, no batching). The C library's audio loader only
// understands 16 kHz mono WAV/PCM, so convert the input first - otherwise
// any non-WAV upload (MP3, etc.) fails with "failed to load audio". This
// mirrors what every other audio backend (whisper, crispasr) does via
// utils.AudioToWav before handing the file to the engine.
if p.bat == nil {
converted, cleanup, err := convertToWavMono16k(opts.Dst)
if err != nil {
return pb.TranscriptResult{}, err
}
defer cleanup()
cstr := CppTranscribePathJSON(p.ctxPtr, converted, 0)
if cstr == 0 {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", CppLastError(p.ctxPtr))
}
raw := goStringFromCPtr(cstr)
CppFreeString(cstr)
var doc transcriptJSON
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts, p.segmentGapFrames), nil
}
// Batched path: decode to PCM, submit to the batcher, wait for this request's
// JSON element. The dispatcher is the sole engine caller on this path; both
// sends honour ctx cancellation.
pcm, _, err := decodeWavMono16k(opts.Dst)
if err != nil {
return pb.TranscriptResult{}, err
}
rep := make(chan batchReply, 1)
select {
case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, language: opts.GetLanguage(), reply: rep}:
case <-ctx.Done():
return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled")
}
var res batchReply
select {
case res = <-rep:
case <-ctx.Done():
return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled")
}
if res.err != nil {
return pb.TranscriptResult{}, res.err
}
var doc transcriptJSON
if err := json.Unmarshal([]byte(res.json), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts, p.segmentGapFrames), nil
}
// segmentSeparators is NeMo's default segment_seperators (sentence-ending
// punctuation). Splitting on these matches NeMo's default segment timestamps.
var segmentSeparators = []rune{'.', '?', '!'}
// transcriptResultFromDoc maps a decoded transcriptJSON to a TranscriptResult,
// grouping words into NeMo-faithful segments (see splitWordsIntoSegments). The
// optional gapFrames (NeMo's segment_gap_threshold, in encoder FRAMES; 0=off)
// additionally splits on inter-word silence; it is converted to a seconds gap
// with the document's frame_sec. Per-segment word timings are attached only when
// the caller requested word granularity; token ids populate each segment's
// Tokens by time-window membership. Shared by the batched and direct paths.
func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest, gapFrames int) pb.TranscriptResult {
text, eou := stripEouMarker(strings.TrimSpace(doc.Text))
// Frame-unit gap threshold -> seconds (NeMo segment_gap_threshold). 0 = off.
gapSeconds := 0.0
if gapFrames > 0 {
if doc.FrameSec > 0 {
gapSeconds = float64(gapFrames) * doc.FrameSec
} else {
xlog.Warn("parakeet-cpp: segment_gap_threshold set but libparakeet.so " +
"did not report frame_sec; falling back to punctuation-only segments")
}
}
groups := splitWordsIntoSegments(doc.Words, segmentSeparators, gapSeconds)
if len(groups) == 0 {
// No words (edge case): single whole-clip text segment.
return pb.TranscriptResult{
Text: text,
Segments: []*pb.TranscriptSegment{{Id: 0, Text: text}},
Eou: eou,
}
}
wantWords := wordsRequested(opts.TimestampGranularities)
segments := make([]*pb.TranscriptSegment, 0, len(groups))
for id, group := range groups {
parts := make([]string, len(group))
for i, gw := range group {
parts[i] = gw.W
}
seg := &pb.TranscriptSegment{
Id: int32(id),
Start: secondsToNanos(group[0].Start),
End: secondsToNanos(group[len(group)-1].End),
Text: strings.TrimSpace(strings.Join(parts, " ")),
Tokens: tokensInWindow(doc.Tokens, group[0].Start, group[len(group)-1].End),
}
if wantWords {
ws := make([]*pb.TranscriptWord, len(group))
for i, gw := range group {
ws[i] = &pb.TranscriptWord{Start: secondsToNanos(gw.Start), End: secondsToNanos(gw.End), Text: gw.W}
}
seg.Words = ws
}
segments = append(segments, seg)
}
return pb.TranscriptResult{Text: text, Segments: segments, Eou: eou}
}
// stripEouMarker removes a trailing literal <EOU>/<EOB> from offline-decode
// text and reports whether the decode ended on an end-of-UTTERANCE token. The
// realtime EOU model's offline decode keeps the special token in the
// detokenized text (the streaming path strips it and surfaces it as flags
// instead); user-visible transcripts must never carry either marker, but only
// <EOU> may confirm the semantic_vad retranscribe cross-check — a decode
// ending on <EOB> means the last thing heard was a backchannel, not the user
// yielding the turn.
func stripEouMarker(text string) (string, bool) {
if strings.HasSuffix(text, "<EOU>") {
return strings.TrimSpace(strings.TrimSuffix(text, "<EOU>")), true
}
if strings.HasSuffix(text, "<EOB>") {
return strings.TrimSpace(strings.TrimSuffix(text, "<EOB>")), false
}
return text, false
}
// splitWordsIntoSegments groups words into segments exactly as NeMo's
// get_segment_offsets does (nemo/collections/asr/parts/utils/timestamp_utils.py).
// Walking the words, it closes a segment when (1) the gap rule is enabled
// (gapSeconds > 0) and the segment already has words and the gap from the
// previous word's end to this word's start is >= gapSeconds - the current word
// then STARTS a new segment - or, checked only when the gap rule did not apply
// (NeMo's elif), (2) the word ends with (or is) a separator, which closes the
// segment INCLUDING that word. Trailing words flush into a final segment.
// gapSeconds <= 0 disables the gap rule, matching NeMo's default
// segment_gap_threshold=None (punctuation-only segments).
func splitWordsIntoSegments(words []transcriptWord, separators []rune, gapSeconds float64) [][]transcriptWord {
var segments [][]transcriptWord
var cur []transcriptWord
for i, word := range words {
gapActive := gapSeconds > 0 && len(cur) > 0
if gapActive && (word.Start-words[i-1].End) >= gapSeconds {
segments = append(segments, cur)
cur = []transcriptWord{word}
continue
}
if !gapActive && endsWithSeparator(word.W, separators) {
cur = append(cur, word)
segments = append(segments, cur)
cur = nil
continue
}
cur = append(cur, word)
}
if len(cur) > 0 {
segments = append(segments, cur)
}
return segments
}
// endsWithSeparator reports whether w's last rune is in separators (matching
// NeMo's `word[-1] in delims or word in delims`).
func endsWithSeparator(w string, separators []rune) bool {
r := []rune(strings.TrimSpace(w))
if len(r) == 0 {
return false
}
last := r[len(r)-1]
for _, s := range separators {
if last == s {
return true
}
}
return false
}
// tokensInWindow returns the ids of tokens whose timestamp t falls in
// [start, end] (inclusive), assigning each token to the segment that spans its
// time. The last segment's end is the last word end, so the final token is
// included.
func tokensInWindow(tokens []transcriptToken, start, end float64) []int32 {
var ids []int32
for _, t := range tokens {
if t.T >= start && t.T <= end {
ids = append(ids, t.ID)
}
}
return ids
}
// streamSegmenter accumulates streaming decode increments into per-utterance
// segments. <EOU>/<EOB> are the model's own utterance boundaries; each closes a
// segment. When the feed carries per-word timings (ABI v4 JSON), a closed
// segment takes its start/end from its first/last word; against an older
// text-only library (no words) it falls back to segmenting the delta text, so
// the same assembler serves both paths.
type streamSegmenter struct {
segs []*pb.TranscriptSegment
cur []transcriptWord // words for the open segment (ABI v4 JSON path)
curText []string // delta text for the open segment (text-only path)
nextID int32
}
func (s *streamSegmenter) add(r streamFeedResult) {
s.cur = append(s.cur, r.Words...)
if len(r.Words) == 0 && r.Delta != "" {
// Older libparakeet.so with no per-word timing: segment from the text.
s.curText = append(s.curText, r.Delta)
}
// Both <EOU> and <EOB> reset the decoder, so both close a segment.
if r.Eou || r.Eob {
s.flush()
}
}
func (s *streamSegmenter) flush() {
switch {
case len(s.cur) > 0:
parts := make([]string, len(s.cur))
for i, w := range s.cur {
parts[i] = w.W
}
s.segs = append(s.segs, &pb.TranscriptSegment{
Id: s.nextID,
Start: secondsToNanos(s.cur[0].Start),
End: secondsToNanos(s.cur[len(s.cur)-1].End),
Text: strings.TrimSpace(strings.Join(parts, " ")),
})
s.nextID++
case len(s.curText) > 0:
// No words this segment: emit a text-only segment (no timestamps),
// skipping a purely-whitespace one as the legacy text path did.
if t := strings.TrimSpace(strings.Join(s.curText, "")); t != "" {
s.segs = append(s.segs, &pb.TranscriptSegment{Id: s.nextID, Text: t})
s.nextID++
}
}
s.cur = nil
s.curText = nil
}
func (s *streamSegmenter) segments() []*pb.TranscriptSegment { return s.segs }
// wordsRequested reports whether the caller asked for word-level timestamps.
// The OpenAI transcription API gates word timings behind
// timestamp_granularities[] containing "word" and defaults to segment-level
// otherwise; we follow that contract.
func wordsRequested(granularities []string) bool {
for _, g := range granularities {
if strings.EqualFold(strings.TrimSpace(g), "word") {
return true
}
}
return false
}
// secondsToNanos converts the C-API's fractional-second timestamps into the
// int64 nanoseconds LocalAI carries on TranscriptSegment/TranscriptWord, the
// same nanosecond convention the whisper backend uses.
func secondsToNanos(sec float64) int64 {
return int64(sec * 1e9)
}
// Per-C-call engine serialization for the streaming paths.
//
// Every individual C call (begin / feed / finalize / free) takes engineMu and
// re-checks ctxPtr under the lock; the lock is NEVER held across a stream's
// lifetime. This is safe because each parakeet.cpp call builds its own ggml
// graph and all streaming caches live in the session object, not the ctx —
// the only ctx-shared mutable state is last_error, which is why it is read
// under the same lock as the failing call. Holding the lock per call (rather
// than per stream, as this file previously did) keeps a long-lived live
// session from starving batched unary transcription and vice versa.
//
// A stream must not outlive its ctx (C-API contract). Free() takes engineMu
// and zeroes ctxPtr, so a racing per-call helper returns ModelNotLoaded
// instead of feeding a freed engine; streamFree of an orphaned session only
// runs the session destructor, which does not touch the ctx.
// streamBegin opens a cache-aware streaming session. A 0 stream with nil
// error means the loaded model is not a streaming model.
func (p *ParakeetCpp) streamBegin(lang string) (uintptr, error) {
p.engineMu.Lock()
defer p.engineMu.Unlock()
if p.ctxPtr == 0 {
return 0, grpcerrors.ModelNotLoaded("parakeet-cpp")
}
if CppStreamBeginLang != nil {
return CppStreamBeginLang(p.ctxPtr, lang), nil
}
return CppStreamBegin(p.ctxPtr), nil
}
func (p *ParakeetCpp) streamFree(stream uintptr) {
if stream == 0 {
return
}
p.engineMu.Lock()
defer p.engineMu.Unlock()
CppStreamFree(stream)
}
// streamFeedText runs one text-mode feed (or the finalize flush when
// finalize is true) under engineMu, returning the newly-finalized delta and
// whether an <EOU>/<EOB> fired during the call.
func (p *ParakeetCpp) streamFeedText(stream uintptr, pcm []float32, finalize bool) (delta string, eou, eob bool, err error) {
p.engineMu.Lock()
defer p.engineMu.Unlock()
if p.ctxPtr == 0 {
return "", false, false, grpcerrors.ModelNotLoaded("parakeet-cpp")
}
var ret uintptr
var events int32
if finalize {
ret = CppStreamFinalize(stream)
} else {
ret = CppStreamFeed(stream, pcm, int32(len(pcm)), unsafe.Pointer(&events))
}
if ret == 0 {
// last_error is ctx-shared: read it under the same lock as the call.
msg := CppLastError(p.ctxPtr)
if msg == "" {
msg = "unknown error"
}
return "", false, false, fmt.Errorf("parakeet-cpp: stream feed/finalize failed: %s", msg)
}
delta = goStringFromCPtr(ret)
CppFreeString(ret)
// ABI v5: eou_out is a bitmask (bit 0 = <EOU>, bit 1 = <EOB>). A v4
// library sets 0/1 for either token, which the bit-0 test reads as the
// old conflated eou — the EOB distinction simply isn't available there.
return delta, events&1 != 0, events&2 != 0, nil
}
// streamFeedDoc runs one ABI v4 JSON feed (or finalize) under engineMu and
// returns the parsed {text,eou,frame_sec,words} document.
func (p *ParakeetCpp) streamFeedDoc(stream uintptr, pcm []float32, finalize bool) (streamFeedJSON, error) {
p.engineMu.Lock()
defer p.engineMu.Unlock()
if p.ctxPtr == 0 {
return streamFeedJSON{}, grpcerrors.ModelNotLoaded("parakeet-cpp")
}
var ret uintptr
if finalize {
ret = CppStreamFinalizeJSON(stream)
} else {
ret = CppStreamFeedJSON(stream, pcm, int32(len(pcm)))
}
if ret == 0 {
msg := CppLastError(p.ctxPtr)
if msg == "" {
msg = "unknown error"
}
return streamFeedJSON{}, fmt.Errorf("parakeet-cpp: stream feed/finalize failed: %s", msg)
}
raw := goStringFromCPtr(ret)
CppFreeString(ret)
var doc streamFeedJSON
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return streamFeedJSON{}, fmt.Errorf("parakeet-cpp: decode stream json: %w", err)
}
return doc, nil
}
// AudioTranscriptionStream drives the cache-aware streaming RNN-T over the
// audio at opts.Dst: it decodes the file to 16 kHz mono PCM, feeds it through
// the shared decode driver (feedSlices/flushTail), and emits each
// newly-finalized text run as a TranscriptStreamResponse delta. <EOU>/<EOB>
// events close the current segment; a closing FinalResult carries the full
// transcript, the per-utterance segments, and whether the file ended on an
// utterance boundary.
//
// stream_begin returns 0 for models that are not cache-aware streaming models
// (only e.g. nvidia/parakeet_realtime_eou_120m-v1 qualifies). For those this
// returns codes.Unimplemented rather than faking a stream from an offline
// decode — see the stream==0 branch and grpcerrors.StreamTranscriptionUnsupported.
func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.TranscriptRequest, results chan *pb.TranscriptStreamResponse) error {
defer close(results)
if p.ctxPtr == 0 {
return grpcerrors.ModelNotLoaded("parakeet-cpp")
}
if opts.Dst == "" {
return errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required")
}
if err := ctx.Err(); err != nil {
return status.Error(codes.Canceled, "transcription cancelled")
}
stream, err := p.streamBegin(opts.GetLanguage())
if err != nil {
return err
}
if stream == 0 {
// Not a cache-aware streaming model. Report the missing capability
// honestly instead of decoding offline and emitting it as one "delta"
// + final: a client that asked for streaming must learn the model
// cannot stream, not receive a batch result dressed as a stream (which
// is indistinguishable except qualitatively, and silently breaks any
// feature that genuinely needs incremental output). Callers wanting a
// plain transcript use the unary AudioTranscription path. This mirrors
// AudioTranscriptionLive, which already returns Unimplemented here.
return grpcerrors.StreamTranscriptionUnsupported("parakeet-cpp",
"loaded model is not a cache-aware streaming model")
}
defer p.streamFree(stream)
data, duration, err := decodeWavMono16k(opts.Dst)
if err != nil {
return err
}
// Fold the shared decode driver's per-feed increments into the streamed
// deltas and the closing batch result: words/text accumulate into
// per-utterance segments (streamSegmenter), and the utterance-boundary
// latch (boundary.go) records whether the file ended on an <EOU>. These
// are the offline path's concern — the live RPC carries none of them.
var (
full strings.Builder
seg streamSegmenter
boundary utteranceBoundary
)
emit := func(r streamFeedResult) error {
if r.Delta != "" {
full.WriteString(r.Delta)
results <- &pb.TranscriptStreamResponse{Delta: r.Delta}
}
seg.add(r)
boundary = boundary.observe(r)
return nil
}
if err := p.feedSlices(ctx, stream, data, emit); err != nil {
return err
}
if err := p.flushTail(stream, emit); err != nil {
return err
}
seg.flush() // close a trailing utterance that never saw an <EOU>
// final.Text is the exact concatenation of the streamed deltas (full is
// their accumulation), so concat(deltas) == FinalResult.Text holds even
// when the model prepends a leading space to the first word (SentencePiece
// detokenization). This matches the whisper backend's streaming contract.
// The single-segment fallback stays trimmed.
fullText := full.String()
segments := seg.segments()
if trimmed := strings.TrimSpace(fullText); len(segments) == 0 && trimmed != "" {
segments = append(segments, &pb.TranscriptSegment{Id: 0, Text: trimmed})
}
results <- &pb.TranscriptStreamResponse{
FinalResult: &pb.TranscriptResult{
Text: fullText,
Segments: segments,
Duration: duration,
Eou: boundary.ended(),
},
}
return nil
}
// decodeWavMono16k converts any input audio to 16 kHz mono PCM and returns the
// float samples plus the clip duration in seconds. Mirrors the whisper
// backend: utils.AudioToWav (ffmpeg) normalises rate/channels, go-audio
// decodes the PCM.
// convertToWavMono16k converts an arbitrary audio file to a 16 kHz mono WAV in
// a fresh temp dir and returns the path together with a cleanup func the caller
// must defer. WAV inputs already at 16 kHz/mono/16-bit are passed through by
// utils.AudioToWav (hardlink/copy), everything else is transcoded via ffmpeg.
// Used by the direct (non-batched) transcription path, which hands a file path
// to the C library's WAV-only audio loader.
func convertToWavMono16k(path string) (string, func(), error) {
dir, err := os.MkdirTemp("", "parakeet")
if err != nil {
return "", func() {}, err
}
cleanup := func() { _ = os.RemoveAll(dir) }
converted := filepath.Join(dir, "converted.wav")
if err := utils.AudioToWav(path, converted); err != nil {
cleanup()
return "", func() {}, err
}
return converted, cleanup, nil
}
func decodeWavMono16k(path string) ([]float32, float32, error) {
converted, cleanup, err := convertToWavMono16k(path)
if err != nil {
return nil, 0, err
}
defer cleanup()
fh, err := os.Open(converted)
if err != nil {
return nil, 0, err
}
defer func() { _ = fh.Close() }()
buf, err := wav.NewDecoder(fh).FullPCMBuffer()
if err != nil {
return nil, 0, err
}
data := buf.AsFloat32Buffer().Data
var duration float32
if buf.Format != nil && buf.Format.SampleRate > 0 {
duration = float32(len(data)) / float32(buf.Format.SampleRate)
}
return data, duration, nil
}
// Free releases the underlying parakeet_ctx. Called by LocalAI when the
// model is unloaded.
func (p *ParakeetCpp) Free() error {
// Stop the dispatcher before releasing the engine so no in-flight runBatch
// can touch a freed ctx (close leak / use-after-free on reload).
if p.batStop != nil {
close(p.batStop)
p.batStop = nil
}
// engineMu so an in-flight streaming call (which locks per C call and
// re-checks ctxPtr under the lock) can never feed into a freed ctx.
p.engineMu.Lock()
defer p.engineMu.Unlock()
if p.ctxPtr != 0 {
CppFree(p.ctxPtr)
p.ctxPtr = 0
}
return nil
}
// goStringFromCPtr copies a NUL-terminated C string into Go memory.
// cptr is the raw pointer returned by purego from the C-API (a malloc'd
// buffer the caller owns); callers must free it via CppFreeString after
// the copy lands.
//
// The uintptr->unsafe.Pointer conversion below trips go vet's unsafeptr
// check, which can't distinguish a C-owned heap pointer from Go-managed
// memory. It is safe here: the pointer addresses a malloc'd C buffer the
// Go GC neither tracks nor moves, and we dereference it immediately to
// copy the bytes out, the same pattern (and the same tolerated warning)
// as the whisper backend's unsafe.Slice over segsPtr.
func goStringFromCPtr(cptr uintptr) string {
if cptr == 0 {
return ""
}
p := unsafe.Pointer(cptr) //nolint:govet // C-owned malloc'd buffer, not Go-GC memory (see doc above)
n := 0
for *(*byte)(unsafe.Add(p, n)) != 0 {
n++
}
return string(unsafe.Slice((*byte)(p), n))
}