mirror of
https://github.com/mudler/LocalAI.git
synced 2026-07-01 11:56:57 -04:00
* feat(realtime): EOU-driven semantic_vad turn detection Add a `semantic_vad` turn-detection mode to the realtime API that feeds the transcription model live and decides "the user finished speaking" from the `<EOU>` end-of-utterance token rather than from silence alone. When EOU fires the turn commits immediately (~0.3s); otherwise it falls back to an eagerness-scaled silence threshold (low/med/high = 8/4/2s). Plumbing, bottom to top: - proto: `AudioTranscriptionLive` bidirectional RPC (config-first oneof, mono float PCM @16k, ready-ack / Unimplemented degrade signal) plus `TranscriptResult.eou` for the unary retranscribe gate. - pkg/grpc: client/server/base/embed scaffolding for the bidi stream, modeled on AudioTransformStream; release stream conns on terminal Recv. - parakeet-cpp: live transcription RPC with per-C-call engine locking (one live stream per turn, finalize+free at commit); bump parakeet.cpp to ABI v5 — incremental StreamingMel (no more quadratic per-feed mel recompute that delayed EOU on long turns) and the <EOU>/<EOB> split; strip the literal <EOU>/<EOB> from offline text and set Eou. - core/backend: LiveTranscriptionSession wrapper + pipeline `turn_detection:` config block (type/eagerness/retranscribe). - realtime: semantic_vad integration — live input captions streamed as transcription deltas while the user speaks, EOU-immediate commit with eagerness fallback, optional retranscribe gate (batch re-decode must also end in <EOU> to confirm), clause synthesis off the LLM token callback, and per-turn live-transcription / model_load telemetry. - UI: show the realtime pipeline components as a vertical list. Docs and tests included; opt-in via the pipeline YAML or per-session `session.update`. Non-streaming STT backends degrade to silence-only. Assisted-by: Claude Code:claude-opus-4-8 [Read] [Edit] [Write] [Bash] Assisted-by: Claude Code:claude-fable-5 [Read] [Edit] [Bash] Signed-off-by: Richard Palethorpe <io@richiejp.com> * feat(realtime): explicit formally-verified state machines + parakeet streaming driver The realtime API had several implicit state machines whose state was inferred from scattered booleans, channels, and five separate mutexes, leaving illegal/inconsistent states reachable. Make them explicit and keep the implementation in step with a formal design; rework the parakeet streaming backend along the same lines. Realtime state machines (M1-M5). Each is a sealed sum-type State/Event/Effect with a total, pure Next(state,event)->(state,[]effect) behind a single-writer Coordinator: M1 conncoord connection lifecycle: VAD toggle + once-only teardown (replaces vadServerStarted + a `done` channel closed from two sites). M2 turncoord turn detection: collapses speechStarted and the live-stream "turn open" flag into one state, so discardTurn can no longer desync them and suppress the next onset. M3 respcoord response coordination: serializes the dual-writer start/cancel so at most one response is live; one response.done per response.create. M4 compactcoord conversation compaction: single-flight (replaces the `compacting atomic.Bool` CAS). M5 ttscoord TTS pipeline: open->closing->closed, idempotent wait(), rejects enqueue-after-close (was a silent drop). The Coordinator/Sink/Next plumbing — only the sealed types and Next differed per machine — is extracted once into core/http/endpoints/openai/coordinator as a generic Coordinator[S,E,F]; each machine keeps its public API via type aliases, so no sink, call-site, or test moved. Hierarchy. session_lifecycle.fizz models M1 as the parent region with its children (M2/M3/M4) as one statechart and asserts ChildrenDieWithParent (conn torn => all children terminal, none start after teardown). respcoord and compactcoord gain an absorbing Terminated state + Shutdown event; conncoord's teardown drives the children terminal. This closes a compaction teardown gap: a fire-and-forget compaction could outlive a torn session — compactionSink now takes a session-scoped cancellable context + WaitGroup and joins the in-flight summarize+evict on shutdown. Formal verification. formal-verification/ holds one authoritative FizzBee spec per machine plus the composition spec, each with an always-assertion and a documented one-line edit that makes the checker fail (verified non-vacuous). scripts/realtime-conformance.sh is fail-closed: all Go conformance suites under -race AND a model-check of every .fizz spec; a missing FizzBee is a hard error (only the loud REALTIME_CONFORMANCE_SKIP_FIZZBEE=1 bypasses it, never in CI). FizzBee is pinned by sha256 and installed via scripts/install-fizzbee.sh into .tools/ (gitignored). Wired as make test-realtime-conformance, a CI workflow, and a pre-commit path filter. Go conformance tests are Ginkgo/Gomega (per the repo's forbidigo lint): transition tables + fixed-seed property walks + concurrent/-race specs, no rapid dependency. Design map: docs/design/realtime-state-machines.md. Parakeet streaming backend. The same treatment applied to the parakeet-cpp streaming paths: - AudioTranscriptionStream returns codes.Unimplemented for non-streaming models instead of decoding offline and emitting it as one delta + final. A client that asked for streaming learns the model cannot stream rather than receiving a batch result shaped like a stream. New grpcerrors.StreamTranscriptionUnsupported carries that signal; the HTTP /v1/audio/transcriptions stream path surfaces it as an SSE error event. Mirrors AudioTranscriptionLive, which already did this. - utteranceBoundary (boundary.go): a single definition of the end-of-utterance latch, replacing three open-coded finalEou toggles. Modelled as a two-valued type so illegal states are unrepresentable. - Shared decode driver (driver.go): streamFeedResult (one per-feed event) + feedChunk (hides the ABI v4 JSON vs text-only split) + feedSlices + flushTail. The feed loop is written once. - AudioTranscriptionLive becomes a bidi adapter: it streams the per-feed {delta,eou,eob,words} the realtime turn detector consumes and a terminal FinalResult carrying only Text. Segments/duration/eou are offline-only and no longer produced (nor read) on the live path; liveTraceState drops the terminal eou and keeps the per-feed eou_events count. - AudioTranscriptionStream + streamJSON merge into one driver-based function; streamSegmenter is generalized to the unified event with a text-only fallback that preserves the legacy (no-words) library's per-utterance segmentation. Verified: build/vet/gofumpt clean, golangci-lint 0 issues, all coordinator and parakeet packages under -race, the fail-closed conformance gate green, and make test-realtime (12 e2e WS+WebRTC). Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Richard Palethorpe <io@richiejp.com> --------- Signed-off-by: Richard Palethorpe <io@richiejp.com>
273 lines
10 KiB
Go
273 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/ebitengine/purego"
|
|
"github.com/go-audio/audio"
|
|
"github.com/go-audio/wav"
|
|
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func TestParakeetCpp(t *testing.T) {
|
|
RegisterFailHandler(Fail)
|
|
RunSpecs(t, "parakeet-cpp Backend Suite")
|
|
}
|
|
|
|
var (
|
|
libLoadOnce sync.Once
|
|
libLoadErr error
|
|
)
|
|
|
|
// ensureLibLoaded mirrors main.go's bootstrap so a Go test can drive
|
|
// the C-API bridge without spinning up the gRPC server. Skips the
|
|
// current spec when libparakeet.so isn't loadable from cwd
|
|
// ($LD_LIBRARY_PATH or a symlink in ./).
|
|
func ensureLibLoaded() {
|
|
libLoadOnce.Do(func() {
|
|
libName := os.Getenv("PARAKEET_LIBRARY")
|
|
if libName == "" {
|
|
libName = "libparakeet.so"
|
|
}
|
|
lib, err := purego.Dlopen(libName, purego.RTLD_NOW|purego.RTLD_GLOBAL)
|
|
if err != nil {
|
|
libLoadErr = err
|
|
return
|
|
}
|
|
purego.RegisterLibFunc(&CppAbiVersion, lib, "parakeet_capi_abi_version")
|
|
purego.RegisterLibFunc(&CppLoad, lib, "parakeet_capi_load")
|
|
purego.RegisterLibFunc(&CppFree, lib, "parakeet_capi_free")
|
|
purego.RegisterLibFunc(&CppTranscribePath, lib, "parakeet_capi_transcribe_path")
|
|
purego.RegisterLibFunc(&CppTranscribePathJSON, lib, "parakeet_capi_transcribe_path_json")
|
|
if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json"); err == nil && sym != 0 {
|
|
purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json")
|
|
}
|
|
purego.RegisterLibFunc(&CppStreamBegin, lib, "parakeet_capi_stream_begin")
|
|
purego.RegisterLibFunc(&CppStreamFeed, lib, "parakeet_capi_stream_feed")
|
|
purego.RegisterLibFunc(&CppStreamFinalize, lib, "parakeet_capi_stream_finalize")
|
|
purego.RegisterLibFunc(&CppStreamFree, lib, "parakeet_capi_stream_free")
|
|
if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_feed_json"); err == nil && sym != 0 {
|
|
purego.RegisterLibFunc(&CppStreamFeedJSON, lib, "parakeet_capi_stream_feed_json")
|
|
purego.RegisterLibFunc(&CppStreamFinalizeJSON, lib, "parakeet_capi_stream_finalize_json")
|
|
}
|
|
purego.RegisterLibFunc(&CppFreeString, lib, "parakeet_capi_free_string")
|
|
purego.RegisterLibFunc(&CppLastError, lib, "parakeet_capi_last_error")
|
|
})
|
|
if libLoadErr != nil {
|
|
Skip("libparakeet.so not loadable: " + libLoadErr.Error())
|
|
}
|
|
}
|
|
|
|
// fixturesOrSkip returns the model + audio paths or skips the spec if
|
|
// either env var is unset. The smoke test never runs in default CI; it
|
|
// needs a real parakeet GGUF and a 16 kHz mono WAV on disk.
|
|
func fixturesOrSkip() (string, string) {
|
|
modelPath := os.Getenv("PARAKEET_BACKEND_TEST_MODEL")
|
|
audioPath := os.Getenv("PARAKEET_BACKEND_TEST_WAV")
|
|
if modelPath == "" || audioPath == "" {
|
|
Skip("set PARAKEET_BACKEND_TEST_MODEL and PARAKEET_BACKEND_TEST_WAV to run this spec")
|
|
}
|
|
return modelPath, audioPath
|
|
}
|
|
|
|
// writeMono16kWav writes `samples` frames of 16 kHz mono 16-bit silence to
|
|
// path. The result is already in AudioToWav's target format, so the conversion
|
|
// helper copies it through without invoking ffmpeg.
|
|
func writeMono16kWav(path string, samples int) {
|
|
GinkgoHelper()
|
|
f, err := os.Create(path)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
enc := wav.NewEncoder(f, 16000, 16, 1, 1)
|
|
buf := &audio.IntBuffer{
|
|
Format: &audio.Format{NumChannels: 1, SampleRate: 16000},
|
|
SourceBitDepth: 16,
|
|
Data: make([]int, samples),
|
|
}
|
|
Expect(enc.Write(buf)).To(Succeed())
|
|
Expect(enc.Close()).To(Succeed())
|
|
Expect(f.Close()).To(Succeed())
|
|
}
|
|
|
|
var _ = Describe("ParakeetCpp", func() {
|
|
Context("AudioTranscription", func() {
|
|
It("transcribes a WAV via the parakeet C-API", func() {
|
|
modelPath, audioPath := fixturesOrSkip()
|
|
ensureLibLoaded()
|
|
|
|
p := &ParakeetCpp{}
|
|
Expect(p.Load(&pb.ModelOptions{ModelFile: modelPath})).To(Succeed())
|
|
defer func() { _ = p.Free() }()
|
|
|
|
res, err := p.AudioTranscription(context.Background(), &pb.TranscriptRequest{
|
|
Dst: audioPath,
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(strings.TrimSpace(res.Text)).ToNot(BeEmpty(),
|
|
"expected non-empty transcript for %s", audioPath)
|
|
// NeMo-faithful segmentation: one or more punctuation-delimited
|
|
// segments, each with text and a monotonically-advancing time span.
|
|
Expect(res.Segments).ToNot(BeEmpty(), "expected at least one segment")
|
|
var prevEnd int64
|
|
for i, seg := range res.Segments {
|
|
Expect(strings.TrimSpace(seg.Text)).ToNot(BeEmpty(),
|
|
"segment %d must have text", i)
|
|
Expect(seg.End).To(BeNumerically(">=", seg.Start),
|
|
"segment %d end must not precede its start", i)
|
|
Expect(seg.Start).To(BeNumerically(">=", prevEnd),
|
|
"segments must be in time order")
|
|
prevEnd = seg.End
|
|
// Default (no granularities) is segment-level: no per-word timings.
|
|
Expect(seg.Words).To(BeEmpty(),
|
|
"word timings are opt-in via timestamp_granularities")
|
|
}
|
|
})
|
|
|
|
It("emits word-level timestamps when granularity=word", func() {
|
|
modelPath, audioPath := fixturesOrSkip()
|
|
ensureLibLoaded()
|
|
|
|
p := &ParakeetCpp{}
|
|
Expect(p.Load(&pb.ModelOptions{ModelFile: modelPath})).To(Succeed())
|
|
defer func() { _ = p.Free() }()
|
|
|
|
res, err := p.AudioTranscription(context.Background(), &pb.TranscriptRequest{
|
|
Dst: audioPath,
|
|
TimestampGranularities: []string{"word"},
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(res.Segments).ToNot(BeEmpty())
|
|
// With word granularity every segment carries its own words, and each
|
|
// segment's span tracks its first/last word; word starts advance
|
|
// monotonically across the whole transcript.
|
|
totalWords := 0
|
|
var prevStart int64 = -1
|
|
for i, seg := range res.Segments {
|
|
Expect(seg.Words).ToNot(BeEmpty(),
|
|
"segment %d must carry per-word timestamps with granularity=word", i)
|
|
Expect(seg.Start).To(Equal(seg.Words[0].Start),
|
|
"segment %d start tracks its first word", i)
|
|
Expect(seg.End).To(Equal(seg.Words[len(seg.Words)-1].End),
|
|
"segment %d end tracks its last word", i)
|
|
for _, w := range seg.Words {
|
|
Expect(w.End).To(BeNumerically(">=", w.Start))
|
|
Expect(w.Start).To(BeNumerically(">=", prevStart))
|
|
prevStart = w.Start
|
|
totalWords++
|
|
}
|
|
}
|
|
Expect(totalWords).To(BeNumerically(">", 0))
|
|
Expect(res.Segments[0].Words[0].Start).To(BeNumerically(">=", int64(0)))
|
|
})
|
|
})
|
|
|
|
Context("convertToWavMono16k", func() {
|
|
// The non-batched transcription path hands a file path to the C
|
|
// library's WAV-only audio loader, so it must convert first.
|
|
// utils.AudioToWav passes an already-16kHz/mono/16-bit WAV through
|
|
// without ffmpeg, which lets us exercise the helper (and the
|
|
// regression: the direct path used to skip conversion entirely)
|
|
// without a model, the C library, or ffmpeg.
|
|
It("returns a decodable 16kHz mono WAV copy and cleans it up", func() {
|
|
dir := GinkgoT().TempDir()
|
|
src := filepath.Join(dir, "input.wav")
|
|
writeMono16kWav(src, 16000) // 1s of silence at 16 kHz
|
|
|
|
converted, cleanup, err := convertToWavMono16k(src)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
// It must produce a fresh temp file, not return the original path.
|
|
Expect(converted).ToNot(Equal(src))
|
|
Expect(converted).To(BeAnExistingFile())
|
|
|
|
pcm, _, err := decodeWavMono16k(converted)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(pcm).To(HaveLen(16000), "round-trips the sample count")
|
|
|
|
cleanup()
|
|
Expect(converted).ToNot(BeAnExistingFile(), "cleanup removes the temp dir")
|
|
})
|
|
|
|
It("errors on a non-existent input rather than passing the path through", func() {
|
|
_, _, err := convertToWavMono16k(filepath.Join(GinkgoT().TempDir(), "missing.mp3"))
|
|
Expect(err).To(HaveOccurred())
|
|
})
|
|
})
|
|
|
|
Context("AudioTranscriptionStream", func() {
|
|
It("returns the typed Unimplemented signal for non-streaming models (no offline fallback)", func() {
|
|
// stream_begin == 0 means the loaded model is not a cache-aware
|
|
// streaming model. The backend must surface that, not silently
|
|
// decode offline and fake a one-shot "stream".
|
|
savedBegin, savedBeginLang := CppStreamBegin, CppStreamBeginLang
|
|
defer func() { CppStreamBegin, CppStreamBeginLang = savedBegin, savedBeginLang }()
|
|
CppStreamBeginLang = nil
|
|
CppStreamBegin = func(ctx uintptr) uintptr { return 0 }
|
|
|
|
p := &ParakeetCpp{ctxPtr: 1}
|
|
results := make(chan *pb.TranscriptStreamResponse, 8)
|
|
err := p.AudioTranscriptionStream(context.Background(),
|
|
&pb.TranscriptRequest{Dst: "ignored.wav"}, results)
|
|
Expect(status.Code(err)).To(Equal(codes.Unimplemented))
|
|
|
|
// Honest signal: nothing was emitted — no faked batch result.
|
|
var emitted []*pb.TranscriptStreamResponse
|
|
for r := range results {
|
|
emitted = append(emitted, r)
|
|
}
|
|
Expect(emitted).To(BeEmpty())
|
|
})
|
|
|
|
It("streams deltas and a closing FinalResult from a cache-aware model", func() {
|
|
// Streaming needs a cache-aware streaming model (e.g.
|
|
// realtime_eou); the offline test model would fail stream_begin.
|
|
modelPath := os.Getenv("PARAKEET_BACKEND_TEST_STREAM_MODEL")
|
|
audioPath := os.Getenv("PARAKEET_BACKEND_TEST_WAV")
|
|
if modelPath == "" || audioPath == "" {
|
|
Skip("set PARAKEET_BACKEND_TEST_STREAM_MODEL (cache-aware streaming model) and PARAKEET_BACKEND_TEST_WAV")
|
|
}
|
|
ensureLibLoaded()
|
|
|
|
p := &ParakeetCpp{}
|
|
Expect(p.Load(&pb.ModelOptions{ModelFile: modelPath})).To(Succeed())
|
|
defer func() { _ = p.Free() }()
|
|
|
|
results := make(chan *pb.TranscriptStreamResponse, 64)
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- p.AudioTranscriptionStream(context.Background(),
|
|
&pb.TranscriptRequest{Dst: audioPath}, results)
|
|
}()
|
|
|
|
var deltas []string
|
|
var final *pb.TranscriptResult
|
|
for r := range results {
|
|
if r.Delta != "" {
|
|
deltas = append(deltas, r.Delta)
|
|
}
|
|
if r.FinalResult != nil {
|
|
final = r.FinalResult
|
|
}
|
|
}
|
|
Expect(<-errCh).ToNot(HaveOccurred())
|
|
|
|
Expect(final).ToNot(BeNil(), "expected a closing FinalResult")
|
|
Expect(strings.TrimSpace(final.Text)).ToNot(BeEmpty(),
|
|
"expected a non-empty streamed transcript")
|
|
Expect(final.Segments).ToNot(BeEmpty(),
|
|
"FinalResult always carries at least one segment")
|
|
// The concatenated deltas reconstruct the final transcript.
|
|
Expect(strings.TrimSpace(strings.Join(deltas, ""))).To(Equal(strings.TrimSpace(final.Text)),
|
|
"deltas should reconstruct the final text")
|
|
})
|
|
})
|
|
})
|