mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-30 11:26:32 -04:00
* feat(realtime): EOU-driven semantic_vad turn detection Add a `semantic_vad` turn-detection mode to the realtime API that feeds the transcription model live and decides "the user finished speaking" from the `<EOU>` end-of-utterance token rather than from silence alone. When EOU fires the turn commits immediately (~0.3s); otherwise it falls back to an eagerness-scaled silence threshold (low/med/high = 8/4/2s). Plumbing, bottom to top: - proto: `AudioTranscriptionLive` bidirectional RPC (config-first oneof, mono float PCM @16k, ready-ack / Unimplemented degrade signal) plus `TranscriptResult.eou` for the unary retranscribe gate. - pkg/grpc: client/server/base/embed scaffolding for the bidi stream, modeled on AudioTransformStream; release stream conns on terminal Recv. - parakeet-cpp: live transcription RPC with per-C-call engine locking (one live stream per turn, finalize+free at commit); bump parakeet.cpp to ABI v5 — incremental StreamingMel (no more quadratic per-feed mel recompute that delayed EOU on long turns) and the <EOU>/<EOB> split; strip the literal <EOU>/<EOB> from offline text and set Eou. - core/backend: LiveTranscriptionSession wrapper + pipeline `turn_detection:` config block (type/eagerness/retranscribe). - realtime: semantic_vad integration — live input captions streamed as transcription deltas while the user speaks, EOU-immediate commit with eagerness fallback, optional retranscribe gate (batch re-decode must also end in <EOU> to confirm), clause synthesis off the LLM token callback, and per-turn live-transcription / model_load telemetry. - UI: show the realtime pipeline components as a vertical list. Docs and tests included; opt-in via the pipeline YAML or per-session `session.update`. Non-streaming STT backends degrade to silence-only. Assisted-by: Claude Code:claude-opus-4-8 [Read] [Edit] [Write] [Bash] Assisted-by: Claude Code:claude-fable-5 [Read] [Edit] [Bash] Signed-off-by: Richard Palethorpe <io@richiejp.com> * feat(realtime): explicit formally-verified state machines + parakeet streaming driver The realtime API had several implicit state machines whose state was inferred from scattered booleans, channels, and five separate mutexes, leaving illegal/inconsistent states reachable. Make them explicit and keep the implementation in step with a formal design; rework the parakeet streaming backend along the same lines. Realtime state machines (M1-M5). Each is a sealed sum-type State/Event/Effect with a total, pure Next(state,event)->(state,[]effect) behind a single-writer Coordinator: M1 conncoord connection lifecycle: VAD toggle + once-only teardown (replaces vadServerStarted + a `done` channel closed from two sites). M2 turncoord turn detection: collapses speechStarted and the live-stream "turn open" flag into one state, so discardTurn can no longer desync them and suppress the next onset. M3 respcoord response coordination: serializes the dual-writer start/cancel so at most one response is live; one response.done per response.create. M4 compactcoord conversation compaction: single-flight (replaces the `compacting atomic.Bool` CAS). M5 ttscoord TTS pipeline: open->closing->closed, idempotent wait(), rejects enqueue-after-close (was a silent drop). The Coordinator/Sink/Next plumbing — only the sealed types and Next differed per machine — is extracted once into core/http/endpoints/openai/coordinator as a generic Coordinator[S,E,F]; each machine keeps its public API via type aliases, so no sink, call-site, or test moved. Hierarchy. session_lifecycle.fizz models M1 as the parent region with its children (M2/M3/M4) as one statechart and asserts ChildrenDieWithParent (conn torn => all children terminal, none start after teardown). respcoord and compactcoord gain an absorbing Terminated state + Shutdown event; conncoord's teardown drives the children terminal. This closes a compaction teardown gap: a fire-and-forget compaction could outlive a torn session — compactionSink now takes a session-scoped cancellable context + WaitGroup and joins the in-flight summarize+evict on shutdown. Formal verification. formal-verification/ holds one authoritative FizzBee spec per machine plus the composition spec, each with an always-assertion and a documented one-line edit that makes the checker fail (verified non-vacuous). scripts/realtime-conformance.sh is fail-closed: all Go conformance suites under -race AND a model-check of every .fizz spec; a missing FizzBee is a hard error (only the loud REALTIME_CONFORMANCE_SKIP_FIZZBEE=1 bypasses it, never in CI). FizzBee is pinned by sha256 and installed via scripts/install-fizzbee.sh into .tools/ (gitignored). Wired as make test-realtime-conformance, a CI workflow, and a pre-commit path filter. Go conformance tests are Ginkgo/Gomega (per the repo's forbidigo lint): transition tables + fixed-seed property walks + concurrent/-race specs, no rapid dependency. Design map: docs/design/realtime-state-machines.md. Parakeet streaming backend. The same treatment applied to the parakeet-cpp streaming paths: - AudioTranscriptionStream returns codes.Unimplemented for non-streaming models instead of decoding offline and emitting it as one delta + final. A client that asked for streaming learns the model cannot stream rather than receiving a batch result shaped like a stream. New grpcerrors.StreamTranscriptionUnsupported carries that signal; the HTTP /v1/audio/transcriptions stream path surfaces it as an SSE error event. Mirrors AudioTranscriptionLive, which already did this. - utteranceBoundary (boundary.go): a single definition of the end-of-utterance latch, replacing three open-coded finalEou toggles. Modelled as a two-valued type so illegal states are unrepresentable. - Shared decode driver (driver.go): streamFeedResult (one per-feed event) + feedChunk (hides the ABI v4 JSON vs text-only split) + feedSlices + flushTail. The feed loop is written once. - AudioTranscriptionLive becomes a bidi adapter: it streams the per-feed {delta,eou,eob,words} the realtime turn detector consumes and a terminal FinalResult carrying only Text. Segments/duration/eou are offline-only and no longer produced (nor read) on the live path; liveTraceState drops the terminal eou and keeps the per-feed eou_events count. - AudioTranscriptionStream + streamJSON merge into one driver-based function; streamSegmenter is generalized to the unified event with a text-only fallback that preserves the legacy (no-words) library's per-utterance segmentation. Verified: build/vet/gofumpt clean, golangci-lint 0 issues, all coordinator and parakeet packages under -race, the fail-closed conformance gate green, and make test-realtime (12 e2e WS+WebRTC). Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Richard Palethorpe <io@richiejp.com> --------- Signed-off-by: Richard Palethorpe <io@richiejp.com>
298 lines
8.6 KiB
Go
298 lines
8.6 KiB
Go
package backend
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/schema"
|
|
"github.com/mudler/LocalAI/core/trace"
|
|
grpcPkg "github.com/mudler/LocalAI/pkg/grpc"
|
|
"github.com/mudler/LocalAI/pkg/grpc/proto"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/LocalAI/pkg/sound"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// LiveTranscriptionEvent is one streamed event from a live (bidirectional)
|
|
// transcription session. Delta/Eou/Eob/Words arrive as the user speaks; Final
|
|
// is set exactly once, on the terminal event after Close flushes the decode
|
|
// tail. Eou means the model judged the user yielded the turn; Eob means a
|
|
// backchannel ("uh-huh") ended — callers must NOT treat Eob as a turn
|
|
// boundary.
|
|
type LiveTranscriptionEvent struct {
|
|
Delta string
|
|
Eou bool
|
|
Eob bool
|
|
Words []schema.TranscriptionWord
|
|
Final *schema.TranscriptionResult
|
|
}
|
|
|
|
// LiveTranscriptionSession is a handle on an open live transcription stream.
|
|
// Feed pushes 16 kHz mono float PCM; Close signals end-of-audio, waits for
|
|
// the backend's terminal Final event to be delivered, and releases the
|
|
// stream.
|
|
type LiveTranscriptionSession interface {
|
|
Feed(pcm []float32) error
|
|
Close() error
|
|
}
|
|
|
|
// liveCloseDrainTimeout bounds how long Close waits for the backend to flush
|
|
// the decode tail before force-cancelling the stream. Finalize is one short
|
|
// engine call; seconds here means the backend is wedged.
|
|
const liveCloseDrainTimeout = 10 * time.Second
|
|
|
|
type liveTranscriptionSession struct {
|
|
stream grpcPkg.AudioTranscriptionLiveClient
|
|
cancel context.CancelFunc
|
|
recvDone chan struct{}
|
|
recvErr error // written by the recv goroutine before recvDone closes
|
|
closeOnce sync.Once
|
|
closeErr error
|
|
trace *liveTraceState // nil when tracing was disabled at open
|
|
}
|
|
|
|
func (s *liveTranscriptionSession) Feed(pcm []float32) error {
|
|
s.trace.addPCM(pcm)
|
|
return s.stream.Send(&proto.TranscriptLiveRequest{
|
|
Payload: &proto.TranscriptLiveRequest_Audio{Audio: &proto.TranscriptLiveAudio{Pcm: pcm}},
|
|
})
|
|
}
|
|
|
|
func (s *liveTranscriptionSession) Close() error {
|
|
s.closeOnce.Do(func() {
|
|
err := s.stream.CloseSend()
|
|
select {
|
|
case <-s.recvDone:
|
|
case <-time.After(liveCloseDrainTimeout):
|
|
xlog.Warn("live transcription: backend did not finalize in time; cancelling stream")
|
|
s.cancel()
|
|
<-s.recvDone
|
|
}
|
|
s.cancel()
|
|
if err == nil {
|
|
err = s.recvErr
|
|
}
|
|
s.closeErr = err
|
|
s.trace.record(err)
|
|
})
|
|
return s.closeErr
|
|
}
|
|
|
|
// liveSampleRate is the PCM rate of a live transcription session, fixed by
|
|
// the session config sent in ModelTranscriptionLive.
|
|
const liveSampleRate = 16000
|
|
|
|
// liveTraceState accumulates what the per-turn backend trace needs while a
|
|
// live session runs: a bounded copy of the fed PCM for the audio snippet,
|
|
// the decode outputs, and timing. One trace is recorded at Close — the live
|
|
// path never touches the unary transcription wrapper, so without this a
|
|
// streaming-only pipeline produced no transcription traces at all. Feed and
|
|
// the recv goroutine run concurrently; mu guards the accumulators.
|
|
type liveTraceState struct {
|
|
appConfig *config.ApplicationConfig
|
|
modelName string
|
|
backend string
|
|
language string
|
|
started time.Time
|
|
|
|
mu sync.Mutex
|
|
pcm []byte // first trace.MaxSnippetSeconds of fed audio, int16 LE
|
|
fedSamples int // ALL samples fed, beyond the snippet cap
|
|
deltaEvents int
|
|
eouEvents int
|
|
eobEvents int
|
|
finalText string
|
|
}
|
|
|
|
func newLiveTraceState(modelConfig config.ModelConfig, appConfig *config.ApplicationConfig, language string) *liveTraceState {
|
|
if !appConfig.EnableTracing {
|
|
return nil
|
|
}
|
|
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
|
|
return &liveTraceState{
|
|
appConfig: appConfig,
|
|
modelName: modelConfig.Name,
|
|
backend: modelConfig.Backend,
|
|
language: language,
|
|
started: time.Now(),
|
|
}
|
|
}
|
|
|
|
func (ts *liveTraceState) addPCM(pcm []float32) {
|
|
if ts == nil {
|
|
return
|
|
}
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
ts.fedSamples += len(pcm)
|
|
maxBytes := trace.MaxSnippetSeconds * liveSampleRate * 2
|
|
if room := (maxBytes - len(ts.pcm)) / 2; room > 0 {
|
|
if len(pcm) > room {
|
|
pcm = pcm[:room]
|
|
}
|
|
ts.pcm = append(ts.pcm, sound.Float32sToInt16LEBytes(pcm)...)
|
|
}
|
|
}
|
|
|
|
func (ts *liveTraceState) observe(ev LiveTranscriptionEvent) {
|
|
if ts == nil {
|
|
return
|
|
}
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
if ev.Delta != "" {
|
|
ts.deltaEvents++
|
|
}
|
|
if ev.Eou {
|
|
ts.eouEvents++
|
|
}
|
|
if ev.Eob {
|
|
ts.eobEvents++
|
|
}
|
|
if ev.Final != nil {
|
|
ts.finalText = ev.Final.Text
|
|
}
|
|
}
|
|
|
|
func (ts *liveTraceState) record(closeErr error) {
|
|
if ts == nil || !ts.appConfig.EnableTracing {
|
|
return
|
|
}
|
|
ts.mu.Lock()
|
|
data := map[string]any{
|
|
"source": "live_stream",
|
|
"language": ts.language,
|
|
"result_text": ts.finalText,
|
|
"eou_events": ts.eouEvents,
|
|
"eob_events": ts.eobEvents,
|
|
"delta_events": ts.deltaEvents,
|
|
}
|
|
if snippet := trace.AudioSnippetFromPCM(ts.pcm, liveSampleRate, ts.fedSamples*2, ts.appConfig.TracingMaxBodyBytes); snippet != nil {
|
|
maps.Copy(data, snippet)
|
|
}
|
|
summary := "live -> " + ts.finalText
|
|
ts.mu.Unlock()
|
|
|
|
bt := trace.BackendTrace{
|
|
Timestamp: ts.started,
|
|
Duration: time.Since(ts.started),
|
|
Type: trace.BackendTraceTranscription,
|
|
ModelName: ts.modelName,
|
|
Backend: ts.backend,
|
|
Summary: trace.TruncateString(summary, 200),
|
|
Data: data,
|
|
}
|
|
if closeErr != nil {
|
|
bt.Error = closeErr.Error()
|
|
}
|
|
trace.RecordBackendTrace(bt)
|
|
}
|
|
|
|
// ModelTranscriptionLive loads the transcription backend, opens the
|
|
// bidirectional AudioTranscriptionLive RPC, sends the session config, and
|
|
// BLOCKS until the backend's ready ack. A grpcerrors.
|
|
// IsLiveTranscriptionUnsupported error means the backend (or the loaded
|
|
// model) cannot do live transcription and the caller should degrade to the
|
|
// unary/file path. After a successful return, onEvent is invoked from a
|
|
// background goroutine — in order, one event at a time — for every response
|
|
// the backend streams, ending with the Final event triggered by Close.
|
|
func ModelTranscriptionLive(ctx context.Context, language string,
|
|
ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig,
|
|
onEvent func(LiveTranscriptionEvent)) (LiveTranscriptionSession, error) {
|
|
|
|
transcriptionModel, err := loadTranscriptionModel(ctx, ml, modelConfig, appConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// The derived cancel out-lives this call inside the session: Close uses
|
|
// it to unwind the stream (and, in embed mode, the server-side recv
|
|
// pump, which only stops on send-close or context cancellation).
|
|
streamCtx, cancel := context.WithCancel(ctx)
|
|
stream, err := transcriptionModel.AudioTranscriptionLive(streamCtx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
fail := func(err error) (LiveTranscriptionSession, error) {
|
|
_ = stream.CloseSend()
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
if err := stream.Send(&proto.TranscriptLiveRequest{
|
|
Payload: &proto.TranscriptLiveRequest_Config{Config: &proto.TranscriptLiveConfig{
|
|
Language: language,
|
|
SampleRate: liveSampleRate,
|
|
}},
|
|
}); err != nil {
|
|
return fail(err)
|
|
}
|
|
|
|
// Ready-ack contract: the backend answers a successful open with a
|
|
// {ready:true} response before any transcript data; unsupported
|
|
// backends surface Unimplemented here instead.
|
|
ack, err := stream.Recv()
|
|
if err != nil {
|
|
return fail(err)
|
|
}
|
|
if !ack.GetReady() {
|
|
return fail(fmt.Errorf("live transcription: backend %q broke the ready-ack contract (first response carried data)", modelConfig.Backend))
|
|
}
|
|
|
|
s := &liveTranscriptionSession{
|
|
stream: stream,
|
|
cancel: cancel,
|
|
recvDone: make(chan struct{}),
|
|
trace: newLiveTraceState(modelConfig, appConfig, language),
|
|
}
|
|
|
|
go func() {
|
|
defer close(s.recvDone)
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) && streamCtx.Err() == nil {
|
|
xlog.Warn("live transcription stream ended unexpectedly", "error", err)
|
|
s.recvErr = err
|
|
}
|
|
return
|
|
}
|
|
ev := liveEventFromProto(resp)
|
|
if ev.Delta == "" && !ev.Eou && !ev.Eob && len(ev.Words) == 0 && ev.Final == nil {
|
|
continue // duplicate ready ack / keep-alive: nothing to deliver
|
|
}
|
|
s.trace.observe(ev)
|
|
onEvent(ev)
|
|
}
|
|
}()
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func liveEventFromProto(r *proto.TranscriptLiveResponse) LiveTranscriptionEvent {
|
|
ev := LiveTranscriptionEvent{
|
|
Delta: r.GetDelta(),
|
|
Eou: r.GetEou(),
|
|
Eob: r.GetEob(),
|
|
}
|
|
for _, w := range r.GetWords() {
|
|
ev.Words = append(ev.Words, schema.TranscriptionWord{
|
|
Start: time.Duration(w.Start),
|
|
End: time.Duration(w.End),
|
|
Text: w.Text,
|
|
})
|
|
}
|
|
if r.GetFinalResult() != nil {
|
|
ev.Final = transcriptResultFromProto(r.GetFinalResult())
|
|
}
|
|
return ev
|
|
}
|