Files
LocalAI/core/backend/transcript.go
Richard Palethorpe 5d0c43ec6e feat(realtime): Semantic VAD EOU token (#10444)
* feat(realtime): EOU-driven semantic_vad turn detection

Add a `semantic_vad` turn-detection mode to the realtime API that feeds
the transcription model live and decides "the user finished speaking"
from the `<EOU>` end-of-utterance token rather than from silence alone.
When EOU fires the turn commits immediately (~0.3s); otherwise it falls
back to an eagerness-scaled silence threshold (low/med/high = 8/4/2s).

Plumbing, bottom to top:

- proto: `AudioTranscriptionLive` bidirectional RPC (config-first oneof,
  mono float PCM @16k, ready-ack / Unimplemented degrade signal) plus
  `TranscriptResult.eou` for the unary retranscribe gate.
- pkg/grpc: client/server/base/embed scaffolding for the bidi stream,
  modeled on AudioTransformStream; release stream conns on terminal Recv.
- parakeet-cpp: live transcription RPC with per-C-call engine locking
  (one live stream per turn, finalize+free at commit); bump parakeet.cpp
  to ABI v5 — incremental StreamingMel (no more quadratic per-feed mel
  recompute that delayed EOU on long turns) and the <EOU>/<EOB> split;
  strip the literal <EOU>/<EOB> from offline text and set Eou.
- core/backend: LiveTranscriptionSession wrapper + pipeline
  `turn_detection:` config block (type/eagerness/retranscribe).
- realtime: semantic_vad integration — live input captions streamed as
  transcription deltas while the user speaks, EOU-immediate commit with
  eagerness fallback, optional retranscribe gate (batch re-decode must
  also end in <EOU> to confirm), clause synthesis off the LLM token
  callback, and per-turn live-transcription / model_load telemetry.
- UI: show the realtime pipeline components as a vertical list.

Docs and tests included; opt-in via the pipeline YAML or per-session
`session.update`. Non-streaming STT backends degrade to silence-only.

Assisted-by: Claude Code:claude-opus-4-8 [Read] [Edit] [Write] [Bash]
Assisted-by: Claude Code:claude-fable-5 [Read] [Edit] [Bash]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(realtime): explicit formally-verified state machines + parakeet streaming driver

The realtime API had several implicit state machines whose state was inferred
from scattered booleans, channels, and five separate mutexes, leaving
illegal/inconsistent states reachable. Make them explicit and keep the
implementation in step with a formal design; rework the parakeet streaming
backend along the same lines.

Realtime state machines (M1-M5). Each is a sealed sum-type State/Event/Effect
with a total, pure Next(state,event)->(state,[]effect) behind a single-writer
Coordinator:

  M1 conncoord    connection lifecycle: VAD toggle + once-only teardown
                  (replaces vadServerStarted + a `done` channel closed from
                  two sites).
  M2 turncoord    turn detection: collapses speechStarted and the live-stream
                  "turn open" flag into one state, so discardTurn can no longer
                  desync them and suppress the next onset.
  M3 respcoord    response coordination: serializes the dual-writer
                  start/cancel so at most one response is live; one
                  response.done per response.create.
  M4 compactcoord conversation compaction: single-flight (replaces the
                  `compacting atomic.Bool` CAS).
  M5 ttscoord     TTS pipeline: open->closing->closed, idempotent wait(),
                  rejects enqueue-after-close (was a silent drop).

The Coordinator/Sink/Next plumbing — only the sealed types and Next differed
per machine — is extracted once into core/http/endpoints/openai/coordinator as
a generic Coordinator[S,E,F]; each machine keeps its public API via type
aliases, so no sink, call-site, or test moved.

Hierarchy. session_lifecycle.fizz models M1 as the parent region with its
children (M2/M3/M4) as one statechart and asserts ChildrenDieWithParent (conn
torn => all children terminal, none start after teardown). respcoord and
compactcoord gain an absorbing Terminated state + Shutdown event; conncoord's
teardown drives the children terminal. This closes a compaction teardown gap: a
fire-and-forget compaction could outlive a torn session — compactionSink now
takes a session-scoped cancellable context + WaitGroup and joins the in-flight
summarize+evict on shutdown.

Formal verification. formal-verification/ holds one authoritative FizzBee spec
per machine plus the composition spec, each with an always-assertion and a
documented one-line edit that makes the checker fail (verified non-vacuous).
scripts/realtime-conformance.sh is fail-closed: all Go conformance suites under
-race AND a model-check of every .fizz spec; a missing FizzBee is a hard error
(only the loud REALTIME_CONFORMANCE_SKIP_FIZZBEE=1 bypasses it, never in CI).
FizzBee is pinned by sha256 and installed via scripts/install-fizzbee.sh into
.tools/ (gitignored). Wired as make test-realtime-conformance, a CI workflow,
and a pre-commit path filter. Go conformance tests are Ginkgo/Gomega (per the
repo's forbidigo lint): transition tables + fixed-seed property walks +
concurrent/-race specs, no rapid dependency. Design map:
docs/design/realtime-state-machines.md.

Parakeet streaming backend. The same treatment applied to the parakeet-cpp
streaming paths:
- AudioTranscriptionStream returns codes.Unimplemented for non-streaming models
  instead of decoding offline and emitting it as one delta + final. A client
  that asked for streaming learns the model cannot stream rather than receiving
  a batch result shaped like a stream. New grpcerrors.StreamTranscriptionUnsupported
  carries that signal; the HTTP /v1/audio/transcriptions stream path surfaces it
  as an SSE error event. Mirrors AudioTranscriptionLive, which already did this.
- utteranceBoundary (boundary.go): a single definition of the end-of-utterance
  latch, replacing three open-coded finalEou toggles. Modelled as a two-valued
  type so illegal states are unrepresentable.
- Shared decode driver (driver.go): streamFeedResult (one per-feed event) +
  feedChunk (hides the ABI v4 JSON vs text-only split) + feedSlices + flushTail.
  The feed loop is written once.
- AudioTranscriptionLive becomes a bidi adapter: it streams the per-feed
  {delta,eou,eob,words} the realtime turn detector consumes and a terminal
  FinalResult carrying only Text. Segments/duration/eou are offline-only and no
  longer produced (nor read) on the live path; liveTraceState drops the terminal
  eou and keeps the per-feed eou_events count.
- AudioTranscriptionStream + streamJSON merge into one driver-based function;
  streamSegmenter is generalized to the unified event with a text-only fallback
  that preserves the legacy (no-words) library's per-utterance segmentation.

Verified: build/vet/gofumpt clean, golangci-lint 0 issues, all coordinator and
parakeet packages under -race, the fail-closed conformance gate green, and
make test-realtime (12 e2e WS+WebRTC).

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

---------

Signed-off-by: Richard Palethorpe <io@richiejp.com>
2026-06-30 09:01:22 +02:00

215 lines
6.8 KiB
Go

package backend
import (
"context"
"fmt"
"maps"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/trace"
grpcPkg "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/model"
)
// TranscriptionRequest groups the parameters accepted by ModelTranscription.
// Use this so callers don't have to pass long positional arg lists when they
// only care about a subset of fields.
type TranscriptionRequest struct {
Audio string
Language string
Translate bool
Diarize bool
Prompt string
Temperature float32
TimestampGranularities []string
}
func (r *TranscriptionRequest) toProto(threads uint32) *proto.TranscriptRequest {
return &proto.TranscriptRequest{
Dst: r.Audio,
Language: r.Language,
Translate: r.Translate,
Diarize: r.Diarize,
Threads: threads,
Prompt: r.Prompt,
Temperature: r.Temperature,
TimestampGranularities: r.TimestampGranularities,
}
}
func loadTranscriptionModel(ctx context.Context, ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig) (grpcPkg.Backend, error) {
if modelConfig.Backend == "" {
modelConfig.Backend = model.WhisperBackend
}
// model.WithContext(ctx) overrides the app-context default set in
// ModelOptions so distributed routing decisions reach the request's
// X-LocalAI-Node holder via distributedhdr.Stamp.
opts := ModelOptions(modelConfig, appConfig, model.WithContext(ctx))
transcriptionModel, err := ml.Load(opts...)
if err != nil {
recordModelLoadFailure(appConfig, modelConfig.Name, modelConfig.Backend, err, nil)
return nil, err
}
if transcriptionModel == nil {
return nil, fmt.Errorf("could not load transcription model")
}
return transcriptionModel, nil
}
func ModelTranscription(ctx context.Context, audio, language string, translate, diarize bool, prompt string, ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig) (*schema.TranscriptionResult, error) {
return ModelTranscriptionWithOptions(ctx, TranscriptionRequest{
Audio: audio,
Language: language,
Translate: translate,
Diarize: diarize,
Prompt: prompt,
}, ml, modelConfig, appConfig)
}
func ModelTranscriptionWithOptions(ctx context.Context, req TranscriptionRequest, ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig) (*schema.TranscriptionResult, error) {
transcriptionModel, err := loadTranscriptionModel(ctx, ml, modelConfig, appConfig)
if err != nil {
return nil, err
}
var startTime time.Time
var audioSnippet map[string]any
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
// Capture audio before the backend call — the backend may delete the file.
audioSnippet = trace.AudioSnippet(req.Audio, appConfig.TracingMaxBodyBytes)
}
r, err := transcriptionModel.AudioTranscription(ctx, req.toProto(uint32(*modelConfig.Threads)))
if err != nil {
if appConfig.EnableTracing {
errData := map[string]any{
"audio_file": req.Audio,
"language": req.Language,
"translate": req.Translate,
"diarize": req.Diarize,
"prompt": req.Prompt,
}
if audioSnippet != nil {
maps.Copy(errData, audioSnippet)
}
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: startTime,
Duration: time.Since(startTime),
Type: trace.BackendTraceTranscription,
ModelName: modelConfig.Name,
Backend: modelConfig.Backend,
Summary: trace.TruncateString(req.Audio, 200),
Error: err.Error(),
Data: errData,
})
}
return nil, err
}
tr := transcriptResultFromProto(r)
if appConfig.EnableTracing {
data := map[string]any{
"audio_file": req.Audio,
"language": req.Language,
"translate": req.Translate,
"diarize": req.Diarize,
"prompt": req.Prompt,
"result_text": tr.Text,
"segments_count": len(tr.Segments),
}
if audioSnippet != nil {
maps.Copy(data, audioSnippet)
}
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: startTime,
Duration: time.Since(startTime),
Type: trace.BackendTraceTranscription,
ModelName: modelConfig.Name,
Backend: modelConfig.Backend,
Summary: trace.TruncateString(req.Audio+" -> "+tr.Text, 200),
Data: data,
})
}
return tr, err
}
// TranscriptionStreamChunk is a streaming event emitted by
// ModelTranscriptionStream. Either Delta carries an incremental text fragment,
// or Final carries the completed transcription as the very last event.
type TranscriptionStreamChunk struct {
Delta string
Final *schema.TranscriptionResult
}
// ModelTranscriptionStream runs the gRPC streaming transcription RPC and
// invokes onChunk for each event the backend produces. Backends that don't
// support real streaming should still emit one terminal event with Final set,
// which the HTTP layer turns into a single delta + done SSE pair.
func ModelTranscriptionStream(ctx context.Context, req TranscriptionRequest, ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig, onChunk func(TranscriptionStreamChunk)) error {
transcriptionModel, err := loadTranscriptionModel(ctx, ml, modelConfig, appConfig)
if err != nil {
return err
}
pbReq := req.toProto(uint32(*modelConfig.Threads))
pbReq.Stream = true
return transcriptionModel.AudioTranscriptionStream(ctx, pbReq, func(chunk *proto.TranscriptStreamResponse) {
if chunk == nil {
return
}
out := TranscriptionStreamChunk{Delta: chunk.Delta}
if chunk.FinalResult != nil {
out.Final = transcriptResultFromProto(chunk.FinalResult)
}
onChunk(out)
})
}
func transcriptResultFromProto(r *proto.TranscriptResult) *schema.TranscriptionResult {
if r == nil {
return &schema.TranscriptionResult{}
}
tr := &schema.TranscriptionResult{
Text: r.Text,
Language: r.Language,
Duration: float64(r.Duration),
Eou: r.Eou,
}
for _, s := range r.Segments {
var tks []int
for _, t := range s.Tokens {
tks = append(tks, int(t))
}
var words []schema.TranscriptionWord
for _, w := range s.Words {
var word = schema.TranscriptionWord{
Start: time.Duration(w.Start),
End: time.Duration(w.End),
Text: w.Text,
}
words = append(words, word)
tr.Words = append(tr.Words, word)
}
tr.Segments = append(tr.Segments,
schema.TranscriptionSegment{
Text: s.Text,
Id: int(s.Id),
Start: time.Duration(s.Start),
End: time.Duration(s.End),
Tokens: tks,
Speaker: s.Speaker,
Words: words,
})
}
return tr
}