mirror of
https://github.com/mudler/LocalAI.git
synced 2026-07-01 03:46:41 -04:00
* feat(realtime): EOU-driven semantic_vad turn detection Add a `semantic_vad` turn-detection mode to the realtime API that feeds the transcription model live and decides "the user finished speaking" from the `<EOU>` end-of-utterance token rather than from silence alone. When EOU fires the turn commits immediately (~0.3s); otherwise it falls back to an eagerness-scaled silence threshold (low/med/high = 8/4/2s). Plumbing, bottom to top: - proto: `AudioTranscriptionLive` bidirectional RPC (config-first oneof, mono float PCM @16k, ready-ack / Unimplemented degrade signal) plus `TranscriptResult.eou` for the unary retranscribe gate. - pkg/grpc: client/server/base/embed scaffolding for the bidi stream, modeled on AudioTransformStream; release stream conns on terminal Recv. - parakeet-cpp: live transcription RPC with per-C-call engine locking (one live stream per turn, finalize+free at commit); bump parakeet.cpp to ABI v5 — incremental StreamingMel (no more quadratic per-feed mel recompute that delayed EOU on long turns) and the <EOU>/<EOB> split; strip the literal <EOU>/<EOB> from offline text and set Eou. - core/backend: LiveTranscriptionSession wrapper + pipeline `turn_detection:` config block (type/eagerness/retranscribe). - realtime: semantic_vad integration — live input captions streamed as transcription deltas while the user speaks, EOU-immediate commit with eagerness fallback, optional retranscribe gate (batch re-decode must also end in <EOU> to confirm), clause synthesis off the LLM token callback, and per-turn live-transcription / model_load telemetry. - UI: show the realtime pipeline components as a vertical list. Docs and tests included; opt-in via the pipeline YAML or per-session `session.update`. Non-streaming STT backends degrade to silence-only. Assisted-by: Claude Code:claude-opus-4-8 [Read] [Edit] [Write] [Bash] Assisted-by: Claude Code:claude-fable-5 [Read] [Edit] [Bash] Signed-off-by: Richard Palethorpe <io@richiejp.com> * feat(realtime): explicit formally-verified state machines + parakeet streaming driver The realtime API had several implicit state machines whose state was inferred from scattered booleans, channels, and five separate mutexes, leaving illegal/inconsistent states reachable. Make them explicit and keep the implementation in step with a formal design; rework the parakeet streaming backend along the same lines. Realtime state machines (M1-M5). Each is a sealed sum-type State/Event/Effect with a total, pure Next(state,event)->(state,[]effect) behind a single-writer Coordinator: M1 conncoord connection lifecycle: VAD toggle + once-only teardown (replaces vadServerStarted + a `done` channel closed from two sites). M2 turncoord turn detection: collapses speechStarted and the live-stream "turn open" flag into one state, so discardTurn can no longer desync them and suppress the next onset. M3 respcoord response coordination: serializes the dual-writer start/cancel so at most one response is live; one response.done per response.create. M4 compactcoord conversation compaction: single-flight (replaces the `compacting atomic.Bool` CAS). M5 ttscoord TTS pipeline: open->closing->closed, idempotent wait(), rejects enqueue-after-close (was a silent drop). The Coordinator/Sink/Next plumbing — only the sealed types and Next differed per machine — is extracted once into core/http/endpoints/openai/coordinator as a generic Coordinator[S,E,F]; each machine keeps its public API via type aliases, so no sink, call-site, or test moved. Hierarchy. session_lifecycle.fizz models M1 as the parent region with its children (M2/M3/M4) as one statechart and asserts ChildrenDieWithParent (conn torn => all children terminal, none start after teardown). respcoord and compactcoord gain an absorbing Terminated state + Shutdown event; conncoord's teardown drives the children terminal. This closes a compaction teardown gap: a fire-and-forget compaction could outlive a torn session — compactionSink now takes a session-scoped cancellable context + WaitGroup and joins the in-flight summarize+evict on shutdown. Formal verification. formal-verification/ holds one authoritative FizzBee spec per machine plus the composition spec, each with an always-assertion and a documented one-line edit that makes the checker fail (verified non-vacuous). scripts/realtime-conformance.sh is fail-closed: all Go conformance suites under -race AND a model-check of every .fizz spec; a missing FizzBee is a hard error (only the loud REALTIME_CONFORMANCE_SKIP_FIZZBEE=1 bypasses it, never in CI). FizzBee is pinned by sha256 and installed via scripts/install-fizzbee.sh into .tools/ (gitignored). Wired as make test-realtime-conformance, a CI workflow, and a pre-commit path filter. Go conformance tests are Ginkgo/Gomega (per the repo's forbidigo lint): transition tables + fixed-seed property walks + concurrent/-race specs, no rapid dependency. Design map: docs/design/realtime-state-machines.md. Parakeet streaming backend. The same treatment applied to the parakeet-cpp streaming paths: - AudioTranscriptionStream returns codes.Unimplemented for non-streaming models instead of decoding offline and emitting it as one delta + final. A client that asked for streaming learns the model cannot stream rather than receiving a batch result shaped like a stream. New grpcerrors.StreamTranscriptionUnsupported carries that signal; the HTTP /v1/audio/transcriptions stream path surfaces it as an SSE error event. Mirrors AudioTranscriptionLive, which already did this. - utteranceBoundary (boundary.go): a single definition of the end-of-utterance latch, replacing three open-coded finalEou toggles. Modelled as a two-valued type so illegal states are unrepresentable. - Shared decode driver (driver.go): streamFeedResult (one per-feed event) + feedChunk (hides the ABI v4 JSON vs text-only split) + feedSlices + flushTail. The feed loop is written once. - AudioTranscriptionLive becomes a bidi adapter: it streams the per-feed {delta,eou,eob,words} the realtime turn detector consumes and a terminal FinalResult carrying only Text. Segments/duration/eou are offline-only and no longer produced (nor read) on the live path; liveTraceState drops the terminal eou and keeps the per-feed eou_events count. - AudioTranscriptionStream + streamJSON merge into one driver-based function; streamSegmenter is generalized to the unified event with a text-only fallback that preserves the legacy (no-words) library's per-utterance segmentation. Verified: build/vet/gofumpt clean, golangci-lint 0 issues, all coordinator and parakeet packages under -race, the fail-closed conformance gate green, and make test-realtime (12 e2e WS+WebRTC). Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Richard Palethorpe <io@richiejp.com> --------- Signed-off-by: Richard Palethorpe <io@richiejp.com>
187 lines
6.4 KiB
Go
187 lines
6.4 KiB
Go
package main
|
|
|
|
import (
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/pkg/grpc/grpcerrors"
|
|
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
|
|
"github.com/mudler/xlog"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// liveSampleRate is the only PCM rate the parakeet C streaming API accepts.
|
|
const liveSampleRate = 16000
|
|
|
|
// AudioTranscriptionLive drives one cache-aware streaming session over audio
|
|
// fed incrementally by the caller (the realtime API's semantic_vad turn
|
|
// detection). Contract:
|
|
//
|
|
// - the first request must carry a Config; a Config mid-stream resets the
|
|
// decode session (free + begin) and drops accumulated transcript state;
|
|
// - a Ready ack is sent right after a successful stream_begin so callers
|
|
// can degrade synchronously when the model has no streaming support
|
|
// (LiveTranscriptionUnsupported, codes.Unimplemented);
|
|
// - every feed that produced output is forwarded as {delta, eou, words};
|
|
// the <EOU>/<EOB> flag is the model's own utterance boundary and the
|
|
// decoder auto-resets after it, so one session spans many utterances;
|
|
// - closing the send side finalizes: the held-back tail chunk is flushed
|
|
// (the last ~2 encoder frames of words only appear here) and a terminal
|
|
// FinalResult carries the full transcript Text only. Per-utterance
|
|
// segments, duration, and the terminal <EOU> flag are NOT produced here —
|
|
// the realtime core consumes the streamed per-feed tokens and the final
|
|
// Text; those batch fields are the file path's concern (see
|
|
// AudioTranscriptionStream).
|
|
//
|
|
// Engine access is serialized per C call (streamBegin/streamFeed*/streamFree
|
|
// take engineMu internally), never for the session lifetime — unary
|
|
// transcription keeps flowing between feeds.
|
|
func (p *ParakeetCpp) AudioTranscriptionLive(in <-chan *pb.TranscriptLiveRequest, out chan<- *pb.TranscriptLiveResponse) error {
|
|
defer close(out)
|
|
|
|
if p.ctxPtr == 0 {
|
|
return grpcerrors.ModelNotLoaded("parakeet-cpp")
|
|
}
|
|
|
|
first, ok := <-in
|
|
if !ok {
|
|
return nil // caller closed without sending anything
|
|
}
|
|
cfg := first.GetConfig()
|
|
if cfg == nil {
|
|
return status.Error(codes.InvalidArgument, "parakeet-cpp: first live message must carry a config")
|
|
}
|
|
if err := validateLiveConfig(cfg); err != nil {
|
|
return err
|
|
}
|
|
|
|
stream, err := p.streamBegin(cfg.GetLanguage())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if stream == 0 {
|
|
return grpcerrors.LiveTranscriptionUnsupported("parakeet-cpp",
|
|
"loaded model is not a cache-aware streaming model")
|
|
}
|
|
// stream is reassigned on a mid-stream Config reset; free whatever is
|
|
// current when the RPC unwinds.
|
|
defer func() { p.streamFree(stream) }()
|
|
|
|
out <- &pb.TranscriptLiveResponse{Ready: true}
|
|
|
|
var (
|
|
full strings.Builder
|
|
fedSecs float64
|
|
|
|
// behindSec accumulates how far decode wall time has fallen behind
|
|
// the audio it was fed. A live caller feeds in real time, so a
|
|
// persistent positive backlog means every downstream signal —
|
|
// including the <EOU> the turn detector waits on — arrives that many
|
|
// seconds late. Warned once per session; reset by a Config reset.
|
|
behindSec float64
|
|
behindWarned bool
|
|
)
|
|
|
|
// emit forwards one decode increment: it streams the per-feed tokens the
|
|
// realtime turn detector consumes (delta/eou/eob/words) and accumulates the
|
|
// running transcript for the closing FinalResult. No segmentation or
|
|
// boundary latch here — the live consumer reads only the streamed tokens
|
|
// and the final Text; per-utterance segments and the terminal <EOU> flag
|
|
// are an offline-path concern (see AudioTranscriptionStream / boundary.go).
|
|
emit := func(r streamFeedResult) error {
|
|
if r.Delta != "" {
|
|
full.WriteString(r.Delta)
|
|
}
|
|
if r.Delta != "" || r.Eou || r.Eob || len(r.Words) > 0 {
|
|
out <- &pb.TranscriptLiveResponse{
|
|
Delta: r.Delta,
|
|
Eou: r.Eou,
|
|
Eob: r.Eob,
|
|
Words: liveWordsToProto(r.Words),
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
for req := range in {
|
|
switch payload := req.GetPayload().(type) {
|
|
case *pb.TranscriptLiveRequest_Config:
|
|
if err := validateLiveConfig(payload.Config); err != nil {
|
|
return err
|
|
}
|
|
// Reset: a fresh decode session, dropping accumulated state.
|
|
p.streamFree(stream)
|
|
stream, err = p.streamBegin(payload.Config.GetLanguage())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if stream == 0 {
|
|
return grpcerrors.LiveTranscriptionUnsupported("parakeet-cpp",
|
|
"loaded model is not a cache-aware streaming model")
|
|
}
|
|
full.Reset()
|
|
fedSecs = 0
|
|
case *pb.TranscriptLiveRequest_Audio:
|
|
pcm := payload.Audio.GetPcm()
|
|
audioSec := float64(len(pcm)) / liveSampleRate
|
|
fedSecs += audioSec
|
|
start := time.Now()
|
|
// nil ctx: a live session is bounded by this request channel, not a
|
|
// context — cancellation is the caller closing the stream.
|
|
if err := p.feedSlices(nil, stream, pcm, emit); err != nil {
|
|
return err
|
|
}
|
|
wallSec := time.Since(start).Seconds()
|
|
behindSec += wallSec - audioSec
|
|
if behindSec < 0 {
|
|
behindSec = 0
|
|
}
|
|
xlog.Debug("parakeet-cpp: live feed",
|
|
"audio_ms", int(audioSec*1000), "wall_ms", int(wallSec*1000),
|
|
"behind_ms", int(behindSec*1000), "fed_s", fedSecs)
|
|
if behindSec > 1 && !behindWarned {
|
|
behindWarned = true
|
|
xlog.Warn("parakeet-cpp: live decode is falling behind real time; "+
|
|
"end-of-utterance signals will arrive late",
|
|
"behind_s", behindSec, "fed_s", fedSecs)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send side closed: flush the streaming tail and emit the final transcript.
|
|
// The live FinalResult carries only Text — the authoritative full-turn
|
|
// transcript the realtime core commits. Per-utterance segments, duration,
|
|
// and the terminal <EOU> flag are not produced on the live path.
|
|
if err := p.flushTail(stream, emit); err != nil {
|
|
return err
|
|
}
|
|
out <- &pb.TranscriptLiveResponse{
|
|
FinalResult: &pb.TranscriptResult{Text: strings.TrimSpace(full.String())},
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func validateLiveConfig(cfg *pb.TranscriptLiveConfig) error {
|
|
if sr := cfg.GetSampleRate(); sr != 0 && sr != liveSampleRate {
|
|
return status.Errorf(codes.InvalidArgument,
|
|
"parakeet-cpp: unsupported live sample_rate %d (only %d)", sr, liveSampleRate)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func liveWordsToProto(words []transcriptWord) []*pb.TranscriptWord {
|
|
if len(words) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]*pb.TranscriptWord, len(words))
|
|
for i, w := range words {
|
|
out[i] = &pb.TranscriptWord{
|
|
Start: secondsToNanos(w.Start),
|
|
End: secondsToNanos(w.End),
|
|
Text: w.W,
|
|
}
|
|
}
|
|
return out
|
|
}
|