Files
LocalAI/core/http/endpoints/openai/realtime.go
LocalAI [bot] 600dafd20b feat(ced): sound-event classification backend (CED audio tagger) (#10425)
* feat(ced): sketch sound-classification backend (CED audio tagger)

Wires ced.cpp (CED, 527-class AudioSet sound-event tagger; baby cry,
footsteps, glass, alarms, dog bark) into LocalAI as a Go/purego backend.

SKETCH (backend skeleton real; core REST wiring + CI/gallery is a checklist
in DESIGN.md):
- backend/backend.proto: new SoundDetection rpc + SoundClass messages
  (run `make protogen-go` to regenerate pkg/grpc/proto).
- backend/go/ced: main.go (purego dlopen libced.so + ced_capi.h),
  goced.go (Ced gRPC backend: Load + SoundDetection), Makefile
  (clone-at-pin CED_VERSION, ggml static-PIC shared build), run.sh,
  package.sh, .gitignore.
- DESIGN.md: REST /v1/audio/classification wiring (handler/route/capability
  registration checklist), gallery/index + CI registration, and a scoping
  note for the realtime/websocket live-recognition path (sliding-window
  classify over the existing ws transport + voicegate; the ced C-API
  per-PCM entry point is already window-friendly).

Backend code does not compile until protogen-go regenerates the pb types
and a libced.so is built (Makefile clones+builds it).

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ced): REST /v1/audio/classification endpoint + capability registration

Wires the ced sound-event classification backend (AudioSet audio tagger)
end to end through the REST surface, mirroring the transcription path.

- Handler: core/http/endpoints/openai/sound_classification.go parses the
  multipart audio upload, temp-files it, resolves the model config and
  calls the SoundDetection RPC; returns {model, detections[]} JSON.
- Backend wrapper: core/backend/sound_classification.go (ModelSoundDetection)
  loads the model and normalizes the proto response into schema types.
- Schema: core/schema/sound_classification.go (SoundClassificationResult).
- gRPC layer: SoundDetection wired through the LocalAI wrapper (interface,
  Backend client, Client, embed, server, base default) so the loader-typed
  client exposes the RPC; proto regenerated via make protogen-go.
- Route: POST /v1/audio/classification (+ /audio/classification alias) with
  the audio/multipart default-model middleware in routes/openai.go.
- Capability surfaces: swagger @Tags/@Router on the handler; FLAG_SOUND_
  CLASSIFICATION usecase flag + UsecaseSoundClassification + UsecaseInfoMap +
  GuessUsecases + ModalityGroups + GetAllModelConfigUsecases; meta usecase
  option; /api/instructions audio area updated; auth RouteFeatureRegistry +
  FeatureAudioClassification (APIFeatures, default ON) + FeatureMetas; UI
  usecaseFilters, capabilities.js CAP_SOUND_CLASSIFICATION, Models.jsx filter
  + i18n; docs page features/audio-classification.md + whats-new + crosslink.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ced): realtime sound-event detection over the websocket API

When a realtime pipeline configures a sound-classification model, each
VAD-committed utterance (the same window the transcription path produces)
is also run through the CED sound-event classifier and the scored AudioSet
tags are emitted as a new server event. No new backend rpc is needed: the
SoundDetection gRPC method already exists on this branch.

- config: add Pipeline.SoundDetection (yaml/json sound_detection,omitempty)
  beside Transcription/VAD.
- realtime: add Model.SoundDetection(ctx, audio, topK, threshold) to the
  ModelInterface; implement it on wrappedModel and transcriptOnlyModel by
  calling backend.ModelSoundDetection with the session's sound-classification
  model config (mirrors how Transcribe dispatches). Load the optional config
  in newModel / newTranscriptionOnlyModel; nil config keeps it additive.
- types: add ConversationItemSoundDetectionEvent (item_id, content_index,
  detections[]{label,score,index}) with type conversation.item.sound_detection,
  its ServerEventType constant and MarshalJSON, mirroring the transcription
  completed event.
- realtime: add emitSoundDetection (unary path: classify the committed window,
  build the event, t.SendEvent) and wire it at the utterance-commit hook right
  after emitTranscription; gated on session.SoundDetectionEnabled (resolved
  from Pipeline.SoundDetection at session setup, defaults top_k=5, threshold=0).
  Its error is logged via xlog but never aborts the turn.
- test: Ginkgo specs for emitSoundDetection (tags emitted, empty detections,
  classifier error) plus a SoundDetection method on the fakeModel double.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(ced): implement SoundDetection in nodes backend test doubles

The SoundDetection method added to the grpc backend interface left two
test doubles (fakeBackendClient, fakeGRPCBackend) incomplete, so
core/services/nodes failed to compile under `go vet`/`go test` (go build
missed it: the doubles live in _test.go). Add the method to both,
mirroring their existing Detect mock. Repairs CI for the nodes package.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ced): decouple realtime sound detection from VAD (sound-only sessions)

Sound-event detection must activate on sounds, not speech, so it no longer
runs through the voice VAD/transcription path. A sound-detection-only
pipeline (sound_detection set, no transcription/LLM) now:

- is accepted by prepareRealtimeConfig (sound_detection counts as a pipeline
  stage),
- builds a lightweight model via newSoundDetectionOnlyModel (no VAD/STT/LLM/TTS
  loaded), and
- defaults the session to turn_detection none (no VAD) with no transcription
  stage, so the client drives windowing via input_audio_buffer.commit
  (option A: client-side sliding window). The per-PCM C-API already supports
  arbitrary windows.

commitUtterance gains a sound-only branch: it emits the
conversation.item.sound_detection event (scored AudioSet tags) and stops -
no transcription, no LLM response. generateResponse is now guarded on a
transcription stage being present, so a sound-only turn never invokes the LLM.

Existing transcription/VAD sessions are unchanged (additive). Added a
commitUtterance sound-only Ginkgo spec asserting it emits the sound event and
neither transcribes nor generates a response. go vet + golangci-lint
(new-from-merge-base) clean; openai suite green.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ced): register sound-classification backend in gallery + CI

Mechanical backend-image registration for the ced sound-event classifier,
mirroring the parakeet-cpp Go/purego backend everywhere it is wired up.

- .github/backend-matrix.yml: add the ced build matrix, field-for-field copies
  of the parakeet-cpp entries (cpu amd64/arm64, cublas cuda 12/13 amd64,
  l4t cuda-13 arm64, l4t-jetpack cuda-12 arm64, sycl f32/f16, vulkan
  amd64/arm64, rocm hipblas, and the metal darwin entry), changing only
  backend and tag-suffix. dockerfile stays ./backend/Dockerfile.golang.
- backend/index.yaml: add the &ced meta anchor (capabilities map per platform)
  plus ced-development and the per-arch image entries, each uri/mirror
  tag-suffix matching the matrix exactly. The model gallery (GGUF) entry is
  intentionally deferred pending the HuggingFace publish (TODO note inline).
- scripts/changed-backends.js: add an explicit item.backend === "ced" branch in
  inferBackendPath mapping to backend/go/ced/, same mechanism and ordering as
  the parakeet-cpp branch (before the generic golang fallthrough).
- .github/workflows/bump_deps.yaml: register mudler/ced.cpp -> CED_VERSION in
  backend/go/ced/Makefile so the daily bot bumps the pin.
- swagger/{docs.go,swagger.json,swagger.yaml}: regenerated via make swagger so
  the existing /v1/audio/classification annotations land in the generated spec.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ced): server-side windowing for realtime sound detection (option B)

Adds an optional server-driven sliding-window classifier so a sound-only
realtime client only has to stream audio (no input_audio_buffer.commit):

- Pipeline.sound_detection_window_ms / sound_detection_hop_ms config knobs.
  When both > 0 on a sound-only session, the server classifies the last
  window of streamed audio every hop and emits a conversation.item.sound_
  detection event; the input buffer is trimmed to one window so a long
  stream stays bounded. When unset, the session stays client-driven
  (option A). Runs independent of VAD (sound events are not speech).
- handleSoundWindow (ticker) + classifySoundWindow (one tick, extracted so
  it is unit-testable) + writeWindowWAV, which declares the true
  InputSampleRate (NewWAVHeaderWithRate) so the classifier resamples
  correctly. Goroutine is started after toggleVAD and torn down with the
  session (close + wg.Wait).
- Register pipeline.sound_detection (+window_ms/hop_ms) in the config meta
  registry; the earlier realtime commit added pipeline.sound_detection
  without a registry entry, failing TestAllFieldsHaveRegistryEntries. This
  fixes that and covers the two new knobs.

Tests: classifySoundWindow emits an event + trims the buffer to one window,
no-ops on too-little audio; writeWindowWAV declares the given sample rate.
go build/vet + golangci-lint (new-from-merge-base) clean; config + openai
suites green.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ced): add ced-base GGUF model gallery entries (f16 + q8_0)

The ced-base weights are now published at mudler/ced-base-gguf (Apache-2.0,
converted from mispeech/ced-base). Adds gallery/ced.yaml (backend: ced +
known_usecases: sound_classification) and two gallery/index.yaml entries
(ced-base-f16 default, ced-base-q8 smallest) with sha256-pinned files, and
removes the now-resolved TODO from backend/index.yaml.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ced): add tiny/mini/small GGUF model gallery entries

Publishes the rest of the CED family (same architecture, metadata-driven port
verified end-to-end on ced-tiny) to mudler/ced-{tiny,mini,small}-gguf and adds
their f16 + q8_0 gallery entries:

  ced-tiny  (5.5M, edge/Pi-class)  f16 11MB / q8_0 6MB
  ced-mini  (9.6M)                 f16 19MB / q8_0 11MB
  ced-small (22M)                  f16 42MB / q8_0 23MB

All sha256-pinned. ced-base remains the accuracy default.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* chore(ced): point gallery entries at the consolidated mudler/ced-gguf repo

All CED quantizations (tiny/mini/small/base, f16/q8_0) now live in a single
HuggingFace repo, mudler/ced-gguf, instead of per-model repos. Repoint the 8
gallery model entries' urls + file uris accordingly. sha256 and filenames are
unchanged.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* chore(ced): bump CED_VERSION to the short-clip fix

Pin the ced backend to ced.cpp 99c6ed3, which fixes a crash on any clip
shorter than target_length (~10.11s): time_pos_embed was added at its full
63-frame grid instead of being sliced to the clip's actual time grid, tripping
ggml_can_repeat in ggml_add. Surfaced by the live realtime e2e (sub-10s
windows) and gated with a short-clip parity test upstream.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(ced): list ced.cpp as a LocalAI-team engine + backend-guide directive

- README.md: add ced.cpp to the "native C/C++/GGML engines developed and
  maintained by the LocalAI project" table.
- docs/content/features/backends.md: add a Sound Classification backend
  category (sound-event classification / audio tagging) listing ced.cpp.
- .agents/adding-backends.md: add a "Documenting the backend" section and two
  verification-checklist items requiring new backends to be documented in the
  backends.md category list, and in-house native engines to be added to the
  README maintained-engines table. This directive was missing.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* chore(ced): repin CED_VERSION to the v0.1.0 release commit

ced.cpp history was squashed into a single release commit (tagged v0.1.0), so
the previous pin (99c6ed3) no longer exists upstream. Pin to c04ac14, the
v0.1.0 release commit, so the backend builds against a commit that exists.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(ced): silence gosec G304/G103 + govet unsafeptr on audited paths

- sound_classification.go: os.Create(dst) where dst = temp dir + path.Base of
  the upload (no traversal). #nosec G304, matching the depth-anything-cpp handler.
- goced.go: reading a NUL-terminated C string from a libced-owned buffer.
  #nosec G103 (gosec) + //nolint:govet (golangci-lint's unsafeptr check), since
  the uintptr is a C-owned malloc'd buffer, not Go-GC memory.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-22 01:00:28 +02:00

2338 lines
79 KiB
Go

package openai
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"strconv"
"sync"
"time"
"net/http"
"github.com/go-audio/audio"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/application"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/auth"
mcpTools "github.com/mudler/LocalAI/core/http/endpoints/mcp"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/templates"
laudio "github.com/mudler/LocalAI/pkg/audio"
"github.com/mudler/LocalAI/pkg/functions"
"github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/LocalAI/pkg/grpc/proto"
model "github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/reasoning"
"github.com/mudler/LocalAI/pkg/sound"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/xlog"
)
const (
// XXX: Presently it seems all ASR/VAD backends use 16Khz. If a backend uses 24Khz then it will likely still work, but have reduced performance
localSampleRate = 16000
defaultRemoteSampleRate = 24000
// Maximum audio buffer size in bytes (100MB) to prevent memory exhaustion
maxAudioBufferSize = 100 * 1024 * 1024
// Maximum WebSocket message size in bytes (10MB) to prevent DoS attacks
maxWebSocketMessageSize = 10 * 1024 * 1024
defaultInstructions = "You are a helpful voice assistant. " +
"Your responses will be spoken aloud using text-to-speech, so keep them concise and conversational. " +
"Do not use markdown formatting, bullet points, numbered lists, code blocks, or special characters. " +
"Speak naturally as you would in a phone conversation. " +
"Avoid parenthetical asides, URLs, and anything that cannot be clearly vocalized."
)
// resolveOutputModalities returns the effective output modalities for a
// response: response-level overrides session-level, and the OpenAI Realtime
// spec default is ["audio"] when neither is set.
func resolveOutputModalities(session, response []types.Modality) []types.Modality {
if len(response) > 0 {
return response
}
if len(session) > 0 {
return session
}
return []types.Modality{types.ModalityAudio}
}
// modalitiesContainAudio reports whether the resolved modalities include audio
// output.
func modalitiesContainAudio(m []types.Modality) bool {
for _, x := range m {
if x == types.ModalityAudio {
return true
}
}
return false
}
// A model can be "emulated" that is: transcribe audio to text -> feed text to the LLM -> generate audio as result
// If the model support instead audio-to-audio, we will use the specific gRPC calls instead
// Session represents a single WebSocket connection and its state
type Session struct {
ID string
TranscriptionOnly bool
// The pipeline or any-to-any model name (full realtime mode)
Model string
// The voice may be a TTS model name or a parameter passed to a TTS model
Voice string
TurnDetection *types.TurnDetectionUnion // "server_vad", "semantic_vad" or "none"
InputAudioTranscription *types.AudioTranscription
// SoundDetectionEnabled is set when pipeline.sound_detection names a
// sound-event-classification model. When true, each committed utterance is
// also run through ModelInterface.SoundDetection and the scored tags are
// emitted as a conversation.item.sound_detection event. SoundDetectionTopK
// and SoundDetectionThreshold are the knobs passed to that call (defaults:
// top_k=5, threshold=0).
SoundDetectionEnabled bool
SoundDetectionTopK int
SoundDetectionThreshold float32
// SoundDetectionWindowMs / SoundDetectionHopMs, when both > 0, enable
// server-side windowing for a sound-only session: the server classifies the
// last WindowMs of streamed audio every HopMs (no client commits needed).
SoundDetectionWindowMs int
SoundDetectionHopMs int
Tools []types.ToolUnion
ToolChoice *types.ToolChoiceUnion
Conversations map[string]*Conversation
InputAudioBuffer []byte
AudioBufferLock sync.Mutex
OpusFrames [][]byte
OpusFramesLock sync.Mutex
Instructions string
DefaultConversationID string
ModelInterface Model
// The pipeline model config or the config for an any-to-any model
ModelConfig *config.ModelConfig
InputSampleRate int
OutputSampleRate int
MaxOutputTokens types.IntOrInf
// OutputModalities mirrors the OpenAI Realtime spec field of the same
// name. Empty means "use the spec default" (audio). ["text"] suppresses
// TTS so the client receives only response.output_text.* events.
OutputModalities []types.Modality
// MaxHistoryItems caps the number of MessageItems passed to the LLM each
// turn (0 = unlimited). Small models — especially the LFM2.5-Audio 1.5B
// served via the liquid-audio backend — degrade quickly past a handful
// of turns. Counted from the tail; FunctionCall + FunctionCallOutput
// pairs are kept together so we never feed an orphaned tool result.
MaxHistoryItems int
// AssistantExecutor is non-nil when the session opted into the in-process
// LocalAI Assistant tool surface. Tool calls whose name matches this
// executor's catalog are run inproc and their output is fed back to the
// model server-side; the client never sees a function_call_arguments
// event for those. Mirrors the chat handler's metadata.localai_assistant
// path.
AssistantExecutor mcpTools.ToolExecutor
// AssistantTools is the cached ToolUnion slice we injected at session
// creation. Re-applied after every client session.update so a
// client-driven tool refresh (e.g. toggling a client MCP server) doesn't
// silently strip Manage Mode's tools.
AssistantTools []types.ToolUnion
// voiceGate is non-nil when pipeline.voice_recognition is configured. It
// authorizes each committed utterance's speaker before the LLM runs.
voiceGate *voiceGate
// gateMu guards the when:first verification state below.
gateMu sync.Mutex
voiceVerified bool
// Response cancellation: protects activeResponseCancel/activeResponseDone
responseMu sync.Mutex
activeResponseCancel context.CancelFunc
activeResponseDone chan struct{}
}
// cancelActiveResponse cancels any in-flight response and waits for its
// goroutine to exit. This ensures we never have overlapping responses and
// that interrupted responses are fully cleaned up before starting a new one.
func (s *Session) cancelActiveResponse() {
s.responseMu.Lock()
cancel := s.activeResponseCancel
done := s.activeResponseDone
s.responseMu.Unlock()
if cancel != nil {
cancel()
}
if done != nil {
<-done
}
}
// startResponse cancels any active response and returns a new context for
// the replacement response. The caller MUST close the returned done channel
// when the response goroutine exits.
func (s *Session) startResponse(parent context.Context) (context.Context, chan struct{}) {
s.cancelActiveResponse()
ctx, cancel := context.WithCancel(parent)
done := make(chan struct{})
s.responseMu.Lock()
s.activeResponseCancel = cancel
s.activeResponseDone = done
s.responseMu.Unlock()
return ctx, done
}
func (s *Session) FromClient(session *types.SessionUnion) {
}
func (s *Session) ToServer() types.SessionUnion {
if s.TranscriptionOnly {
return types.SessionUnion{
Transcription: &types.TranscriptionSession{
ID: s.ID,
Object: "realtime.transcription_session",
Audio: &types.TranscriptionSessionAudio{
Input: &types.SessionAudioInput{
Transcription: s.InputAudioTranscription,
},
},
},
}
} else {
return types.SessionUnion{
Realtime: &types.RealtimeSession{
ID: s.ID,
Object: "realtime.session",
Model: s.Model,
Instructions: s.Instructions,
Tools: s.Tools,
ToolChoice: s.ToolChoice,
MaxOutputTokens: s.MaxOutputTokens,
OutputModalities: s.OutputModalities,
Audio: &types.RealtimeSessionAudio{
Input: &types.SessionAudioInput{
TurnDetection: s.TurnDetection,
Transcription: s.InputAudioTranscription,
},
Output: &types.SessionAudioOutput{
Voice: types.Voice(s.Voice),
},
},
},
}
}
}
// Conversation represents a conversation with a list of items
type Conversation struct {
ID string
Items []*types.MessageItemUnion
Lock sync.Mutex
}
func (c *Conversation) ToServer() types.Conversation {
return types.Conversation{
ID: c.ID,
Object: "realtime.conversation",
}
}
// Map to store sessions (in-memory)
var sessions = make(map[string]*Session)
var sessionLock sync.Mutex
type Model interface {
VAD(ctx context.Context, request *schema.VADRequest) (*schema.VADResponse, error)
Transcribe(ctx context.Context, audio, language string, translate bool, diarize bool, prompt string) (*schema.TranscriptionResult, error)
Predict(ctx context.Context, messages schema.Messages, images, videos, audios []string, tokenCallback func(string, backend.TokenUsage) bool, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, logprobs *int, topLogprobs *int, logitBias map[string]float64) (func() (backend.LLMResponse, error), error)
TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error)
// TTSStream synthesizes speech incrementally, invoking onAudio with raw PCM
// chunks (and the backend sample rate) as they are produced.
TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error
// TranscribeStream transcribes audio incrementally, invoking onDelta for each
// transcript text fragment and returning the final aggregated result.
TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error)
// SoundDetection classifies a committed audio window into scored AudioSet
// sound-event tags. topK caps the number of returned tags (0 = backend
// default), threshold drops tags below the given score (0 = keep all).
SoundDetection(ctx context.Context, audio string, topK int, threshold float32) (*schema.SoundClassificationResult, error)
PredictConfig() *config.ModelConfig
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // Allow all origins
},
}
// TODO: Implement ephemeral keys to allow these endpoints to be used
func RealtimeSessions(application *application.Application) echo.HandlerFunc {
return func(c echo.Context) error {
return c.NoContent(501)
}
}
func RealtimeTranscriptionSession(application *application.Application) echo.HandlerFunc {
return func(c echo.Context) error {
return c.NoContent(501)
}
}
// RealtimeSessionOptions bundles per-session knobs decoded from the WS query
// string (or the WebRTC handshake body). Mirrors what chat.go pulls off
// `metadata.localai_assistant` — admin-only opt-in to the in-process
// management tool surface.
type RealtimeSessionOptions struct {
LocalAIAssistant bool
// AuthEnabled mirrors chat.go's requireAssistantAccess gate. We resolve
// admin role at handshake time (where the echo.Context has the auth
// cookie/Bearer) and drop the result here so runRealtimeSession can
// decide without holding onto the request.
IsAdmin bool
}
func Realtime(application *application.Application) echo.HandlerFunc {
return func(c echo.Context) error {
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
defer ws.Close()
// Set maximum message size to prevent DoS attacks
ws.SetReadLimit(maxWebSocketMessageSize)
// Extract query parameters from Echo context before passing to websocket handler
model := c.QueryParam("model")
assistantFlag, _ := strconv.ParseBool(c.QueryParam("localai_assistant"))
opts := RealtimeSessionOptions{
LocalAIAssistant: assistantFlag,
IsAdmin: isCurrentUserAdmin(c, application),
}
registerRealtime(application, model, opts)(ws)
return nil
}
}
// isCurrentUserAdmin replicates the chat-side admin check at the realtime
// handshake. When auth is disabled, every caller is treated as admin (same
// as chat's requireAssistantAccess).
func isCurrentUserAdmin(c echo.Context, application *application.Application) bool {
if application == nil || application.ApplicationConfig() == nil || !application.ApplicationConfig().Auth.Enabled {
return true
}
user := auth.GetUser(c)
return user != nil && user.Role == auth.RoleAdmin
}
func registerRealtime(application *application.Application, model string, opts RealtimeSessionOptions) func(c *websocket.Conn) {
return func(conn *websocket.Conn) {
t := NewWebSocketTransport(conn)
evaluator := application.TemplatesEvaluator()
xlog.Debug("Realtime WebSocket connection established", "address", conn.RemoteAddr().String(), "model", model)
runRealtimeSession(application, t, model, evaluator, opts)
}
}
// defaultMaxHistoryItems picks a sensible default cap for the session.
// Small any-to-any audio models degrade quickly past a handful of turns;
// legacy pipelines composing larger LLMs keep the historical "unlimited"
// default and rely on the LLM's own context window.
func defaultMaxHistoryItems(cfg *config.ModelConfig) int {
if cfg != nil && cfg.HasUsecases(config.FLAG_REALTIME_AUDIO) {
return 6
}
return 0
}
// resolveMaxHistoryItems honors an explicit pipeline.max_history_items when set,
// otherwise falls back to the per-model-type default. This lets a composed
// pipeline (VAD+STT+LLM+TTS) cap its history so a long-running session doesn't
// grow until the LLM's context window fills.
func resolveMaxHistoryItems(cfg *config.ModelConfig) int {
if cfg != nil && cfg.Pipeline.MaxHistoryItems != nil {
return *cfg.Pipeline.MaxHistoryItems
}
return defaultMaxHistoryItems(cfg)
}
// trimRealtimeItems returns the tail of items capped at maxItems (0 = no cap).
// Walks backwards keeping function_call + function_call_output pairs together
// so we never feed the LLM an orphaned tool result that references a call it
// can't see.
func trimRealtimeItems(items []*types.MessageItemUnion, maxItems int) []*types.MessageItemUnion {
if maxItems <= 0 || len(items) <= maxItems {
return items
}
// Find the cut point starting from len-maxItems and pull it left until
// we're not in the middle of a tool-call pair.
cut := len(items) - maxItems
for cut > 0 && items[cut] != nil && items[cut].FunctionCallOutput != nil {
cut--
}
return items[cut:]
}
// prepareRealtimeConfig validates a model config for use in a realtime session
// and fills in pipeline slots for self-contained any-to-any models. It returns
// an error code + message pair suitable for sendError; the bool indicates
// whether the caller should proceed. Extracted from runRealtimeSession so the
// gate logic can be exercised in unit tests without a full Application.
func prepareRealtimeConfig(cfg *config.ModelConfig) (errCode, errMsg string, ok bool) {
if cfg == nil {
return "invalid_model", "Model is not a pipeline model", false
}
// Self-contained any-to-any models (e.g. liquid-audio) own the whole
// loop in one engine — surface them by populating empty pipeline slots
// with the model's own name so newModel can resolve a config for each
// role. The user can still pin individual slots (e.g. Pipeline.VAD =
// silero-vad) and those wins.
if cfg.HasUsecases(config.FLAG_REALTIME_AUDIO) {
if cfg.Pipeline.VAD == "" {
cfg.Pipeline.VAD = cfg.Name
}
if cfg.Pipeline.Transcription == "" {
cfg.Pipeline.Transcription = cfg.Name
}
if cfg.Pipeline.LLM == "" {
cfg.Pipeline.LLM = cfg.Name
}
if cfg.Pipeline.TTS == "" {
cfg.Pipeline.TTS = cfg.Name
}
return "", "", true
}
if cfg.Pipeline.VAD == "" && cfg.Pipeline.Transcription == "" && cfg.Pipeline.TTS == "" && cfg.Pipeline.LLM == "" && cfg.Pipeline.SoundDetection == "" {
return "invalid_model", "Model is not a pipeline model", false
}
return "", "", true
}
// runRealtimeSession runs the main event loop for a realtime session.
// It is transport-agnostic and works with both WebSocket and WebRTC.
func runRealtimeSession(application *application.Application, t Transport, model string, evaluator *templates.Evaluator, opts RealtimeSessionOptions) {
cl := application.ModelConfigLoader()
cfg, err := cl.LoadModelConfigFileByNameDefaultOptions(model, application.ApplicationConfig())
if err != nil {
xlog.Error("failed to load model config", "error", err)
sendError(t, "model_load_error", "Failed to load model config", "", "")
return
}
if code, msg, ok := prepareRealtimeConfig(cfg); !ok {
xlog.Error("model is not a pipeline", "model", model)
sendError(t, code, msg, "", "")
return
}
// LocalAI Assistant opt-in: gate on admin (same rule as chat.go's
// requireAssistantAccess) and grab the process-wide holder's executor.
// We collect tools + system prompt here and merge them into the session
// below so they're live from the first response.create.
var assistantTools []types.ToolUnion
var assistantSystemPrompt string
var assistantExecutor mcpTools.ToolExecutor
if opts.LocalAIAssistant {
if !opts.IsAdmin {
sendError(t, "forbidden", "localai_assistant requires admin", "", "")
return
}
appCfg := application.ApplicationConfig()
if appCfg != nil && appCfg.DisableLocalAIAssistant {
sendError(t, "unavailable", "LocalAI Assistant is disabled on this server", "", "")
return
}
holder := application.LocalAIAssistant()
if holder == nil || !holder.HasTools() {
sendError(t, "unavailable", "LocalAI Assistant is not available on this server", "", "")
return
}
exec := holder.Executor()
fns, discErr := exec.DiscoverTools(context.Background())
if discErr != nil {
xlog.Error("realtime: failed to discover LocalAI Assistant tools", "error", discErr)
sendError(t, "tool_discovery_failed", "failed to discover assistant tools: "+discErr.Error(), "", "")
return
}
assistantExecutor = exec
assistantSystemPrompt = holder.SystemPrompt()
assistantTools = make([]types.ToolUnion, 0, len(fns))
for _, fn := range fns {
fnCopy := fn
assistantTools = append(assistantTools, types.ToolUnion{
Function: &types.ToolFunction{
Name: fnCopy.Name,
Description: fnCopy.Description,
Parameters: fnCopy.Parameters,
},
})
}
xlog.Debug("realtime: LocalAI Assistant tools injected", "count", len(fns))
}
sttModel := cfg.Pipeline.Transcription
// A sound-detection-only pipeline (sound_detection set, no transcription/LLM)
// activates on sounds, not speech, so it runs WITHOUT the voice VAD: the
// session defaults to turn_detection none and the client drives windowing via
// input_audio_buffer.commit. There is no transcription stage in that case.
soundOnly := cfg.Pipeline.SoundDetection != "" && cfg.Pipeline.Transcription == "" && cfg.Pipeline.LLM == ""
turnDetection := &types.TurnDetectionUnion{
ServerVad: &types.ServerVad{
Threshold: 0.5,
PrefixPaddingMs: 300,
SilenceDurationMs: 500,
CreateResponse: true,
},
}
inputAudioTranscription := &types.AudioTranscription{Model: sttModel}
if soundOnly {
turnDetection = nil // turn_detection none: no VAD
inputAudioTranscription = nil // no transcription stage
}
// Compose the system prompt: prepend the assistant prompt when we have
// one (it teaches the model the safety rules and tool recipes), then the
// session's default voice instructions. Order matches chat.go's
// hasSystemMessage check — assistant prompt comes first.
instructions := defaultInstructions
if assistantSystemPrompt != "" {
instructions = assistantSystemPrompt + "\n\n" + defaultInstructions
}
sessionID := generateSessionID()
session := &Session{
ID: sessionID,
TranscriptionOnly: false,
Model: model,
Voice: cfg.TTSConfig.Voice,
Instructions: instructions,
ModelConfig: cfg,
Tools: assistantTools,
AssistantTools: assistantTools,
AssistantExecutor: assistantExecutor,
TurnDetection: turnDetection,
InputAudioTranscription: inputAudioTranscription,
Conversations: make(map[string]*Conversation),
InputSampleRate: defaultRemoteSampleRate,
OutputSampleRate: defaultRemoteSampleRate,
MaxHistoryItems: resolveMaxHistoryItems(cfg),
SoundDetectionEnabled: cfg.Pipeline.SoundDetection != "",
SoundDetectionTopK: defaultSoundDetectionTopK,
SoundDetectionThreshold: 0,
SoundDetectionWindowMs: cfg.Pipeline.SoundDetectionWindowMs,
SoundDetectionHopMs: cfg.Pipeline.SoundDetectionHopMs,
}
// Create a default conversation
conversationID := generateConversationID()
conversation := &Conversation{
ID: conversationID,
// TODO: We need to truncate the conversation items when a new item is added and we have run out of space. There are multiple places where items
// can be added so we could use a datastructure here that enforces truncation upon addition
Items: []*types.MessageItemUnion{},
}
session.Conversations[conversationID] = conversation
session.DefaultConversationID = conversationID
var m Model
if soundOnly {
m, err = newSoundDetectionOnlyModel(
&cfg.Pipeline,
application.ModelConfigLoader(),
application.ModelLoader(),
application.ApplicationConfig(),
)
} else {
m, err = newModel(
&cfg.Pipeline,
application.ModelConfigLoader(),
application.ModelLoader(),
application.ApplicationConfig(),
evaluator,
buildRealtimeRoutingContext(application, sessionID),
)
}
if err != nil {
xlog.Error("failed to load model", "error", err)
sendError(t, "model_load_error", "Failed to load model", "", "")
return
}
session.ModelInterface = m
if cfg.Pipeline.VoiceGateEnabled() {
gate, gerr := newVoiceGate(
*cfg.Pipeline.VoiceRecognition,
application.ModelConfigLoader(),
application.ModelLoader(),
application.ApplicationConfig(),
application.VoiceRegistry(),
)
if gerr != nil {
xlog.Error("failed to initialize voice recognition gate", "error", gerr)
sendError(t, "voice_gate_error", gerr.Error(), "", "")
return
}
session.voiceGate = gate
xlog.Info("realtime voice recognition gate enabled", "mode", gate.cfg.Mode, "when", gate.cfg.When)
}
// Store the session and notify the transport (for WebRTC audio track handling)
sessionLock.Lock()
sessions[sessionID] = session
sessionLock.Unlock()
// For WebRTC, inbound audio arrives as Opus (48kHz) and is decoded+resampled
// to localSampleRate in handleIncomingAudioTrack. Set InputSampleRate to
// match so handleVAD doesn't needlessly double-resample.
if _, ok := t.(*WebRTCTransport); ok {
session.InputSampleRate = localSampleRate
}
if sn, ok := t.(interface{ SetSession(*Session) }); ok {
sn.SetSession(session)
}
sendEvent(t, types.SessionCreatedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
Session: session.ToServer(),
})
var (
msg []byte
wg sync.WaitGroup
done = make(chan struct{})
)
vadServerStarted := false
toggleVAD := func() {
if session.TurnDetection != nil && session.TurnDetection.ServerVad != nil && !vadServerStarted {
xlog.Debug("Starting VAD goroutine...")
done = make(chan struct{})
wg.Go(func() {
conversation := session.Conversations[session.DefaultConversationID]
handleVAD(session, conversation, t, done)
})
vadServerStarted = true
} else if (session.TurnDetection == nil || session.TurnDetection.ServerVad == nil) && vadServerStarted {
xlog.Debug("Stopping VAD goroutine...")
close(done)
vadServerStarted = false
}
}
// For WebRTC sessions, start the Opus decode loop before VAD so that
// decoded PCM is already flowing when VAD's first tick fires.
var decodeDone chan struct{}
if wt, ok := t.(*WebRTCTransport); ok {
decodeDone = make(chan struct{})
go decodeOpusLoop(session, wt.opusBackend, decodeDone)
}
toggleVAD()
// Server-side sound-detection windowing (option B): for a sound-only session
// with window/hop configured, the server classifies the last window of
// streamed audio on a timer, so the client only has to stream (no commits).
// This runs independent of VAD (sound events are not speech).
var soundWindowDone chan struct{}
if soundOnly && session.SoundDetectionWindowMs > 0 && session.SoundDetectionHopMs > 0 {
soundWindowDone = make(chan struct{})
wg.Go(func() {
handleSoundWindow(session, t, soundWindowDone)
})
xlog.Debug("Starting server-side sound-detection windowing",
"window_ms", session.SoundDetectionWindowMs, "hop_ms", session.SoundDetectionHopMs)
}
for {
msg, err = t.ReadEvent()
if err != nil {
xlog.Error("read error", "error", err)
break
}
// Handle diagnostic events that aren't part of the OpenAI protocol
var rawType struct {
Type string `json:"type"`
}
if json.Unmarshal(msg, &rawType) == nil && rawType.Type == "test_tone" {
if _, ok := t.(*WebSocketTransport); ok {
sendError(t, "not_supported", "test_tone is only supported on WebRTC connections", "", "")
} else {
xlog.Debug("Generating test tone")
go sendTestTone(t)
}
continue
}
// Parse the incoming message
event, err := types.UnmarshalClientEvent(msg)
if err != nil {
xlog.Error("invalid json", "error", err)
sendError(t, "invalid_json", "Invalid JSON format", "", "")
continue
}
switch e := event.(type) {
case types.SessionUpdateEvent:
xlog.Debug("recv", "message", string(msg))
// Handle transcription session update
if e.Session.Transcription != nil {
if err := updateTransSession(
session,
&e.Session,
application.ModelConfigLoader(),
application.ModelLoader(),
application.ApplicationConfig(),
); err != nil {
xlog.Error("failed to update session", "error", err)
sendError(t, "session_update_error", "Failed to update session", "", "")
continue
}
toggleVAD()
sendEvent(t, types.SessionUpdatedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
Session: session.ToServer(),
})
}
// Handle realtime session update
if e.Session.Realtime != nil {
if err := updateSession(
session,
&e.Session,
application.ModelConfigLoader(),
application.ModelLoader(),
application.ApplicationConfig(),
evaluator,
buildRealtimeRoutingContext(application, session.ID),
); err != nil {
xlog.Error("failed to update session", "error", err)
sendError(t, "session_update_error", "Failed to update session", "", "")
continue
}
toggleVAD()
sendEvent(t, types.SessionUpdatedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
Session: session.ToServer(),
})
}
case types.InputAudioBufferAppendEvent:
// Handle 'input_audio_buffer.append'
if e.Audio == "" {
xlog.Error("Audio data is missing in 'input_audio_buffer.append'")
sendError(t, "missing_audio_data", "Audio data is missing", "", "")
continue
}
// Decode base64 audio data
decodedAudio, err := base64.StdEncoding.DecodeString(e.Audio)
if err != nil {
xlog.Error("failed to decode audio data", "error", err)
sendError(t, "invalid_audio_data", "Failed to decode audio data", "", "")
continue
}
// Check buffer size limits before appending
session.AudioBufferLock.Lock()
newSize := len(session.InputAudioBuffer) + len(decodedAudio)
if newSize > maxAudioBufferSize {
session.AudioBufferLock.Unlock()
xlog.Error("audio buffer size limit exceeded", "current_size", len(session.InputAudioBuffer), "incoming_size", len(decodedAudio), "limit", maxAudioBufferSize)
sendError(t, "buffer_size_exceeded", fmt.Sprintf("Audio buffer size limit exceeded (max %d bytes)", maxAudioBufferSize), "", "")
continue
}
// Append to InputAudioBuffer
session.InputAudioBuffer = append(session.InputAudioBuffer, decodedAudio...)
session.AudioBufferLock.Unlock()
case types.InputAudioBufferCommitEvent:
xlog.Debug("recv", "message", string(msg))
sessionLock.Lock()
isServerVAD := session.TurnDetection != nil && session.TurnDetection.ServerVad != nil
sessionLock.Unlock()
// TODO: At the least need to check locking and timer state in the VAD Go routine before allowing this
if isServerVAD {
sendNotImplemented(t, "input_audio_buffer.commit in conjunction with VAD")
continue
}
session.AudioBufferLock.Lock()
allAudio := make([]byte, len(session.InputAudioBuffer))
copy(allAudio, session.InputAudioBuffer)
session.InputAudioBuffer = nil
session.AudioBufferLock.Unlock()
sendEvent(t, types.InputAudioBufferCommittedEvent{
ServerEventBase: types.ServerEventBase{},
ItemID: generateItemID(),
})
respCtx, respDone := session.startResponse(context.Background())
go func() {
defer close(respDone)
commitUtterance(respCtx, allAudio, session, conversation, t)
}()
case types.ConversationItemCreateEvent:
xlog.Debug("recv", "message", string(msg))
// Add the item to the conversation
item := e.Item
// Ensure IDs are present
if item.User != nil && item.User.ID == "" {
item.User.ID = generateItemID()
}
if item.Assistant != nil && item.Assistant.ID == "" {
item.Assistant.ID = generateItemID()
}
if item.System != nil && item.System.ID == "" {
item.System.ID = generateItemID()
}
if item.FunctionCall != nil && item.FunctionCall.ID == "" {
item.FunctionCall.ID = generateItemID()
}
if item.FunctionCallOutput != nil && item.FunctionCallOutput.ID == "" {
item.FunctionCallOutput.ID = generateItemID()
}
conversation.Lock.Lock()
conversation.Items = append(conversation.Items, &item)
conversation.Lock.Unlock()
sendEvent(t, types.ConversationItemAddedEvent{
ServerEventBase: types.ServerEventBase{
EventID: e.EventID,
},
PreviousItemID: e.PreviousItemID,
Item: item,
})
case types.ConversationItemDeleteEvent:
sendError(t, "not_implemented", "Deleting items not implemented", "", "event_TODO")
case types.ConversationItemRetrieveEvent:
xlog.Debug("recv", "message", string(msg))
if e.ItemID == "" {
sendError(t, "invalid_item_id", "Need item_id, but none specified", "", "event_TODO")
continue
}
conversation.Lock.Lock()
var retrievedItem types.MessageItemUnion
for _, item := range conversation.Items {
// We need to check ID in the union
var id string
if item.System != nil {
id = item.System.ID
} else if item.User != nil {
id = item.User.ID
} else if item.Assistant != nil {
id = item.Assistant.ID
} else if item.FunctionCall != nil {
id = item.FunctionCall.ID
} else if item.FunctionCallOutput != nil {
id = item.FunctionCallOutput.ID
}
if id == e.ItemID {
retrievedItem = *item
break
}
}
conversation.Lock.Unlock()
sendEvent(t, types.ConversationItemRetrievedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
Item: retrievedItem,
})
case types.ResponseCreateEvent:
xlog.Debug("recv", "message", string(msg))
// Handle optional items to add to context
if len(e.Response.Input) > 0 {
conversation.Lock.Lock()
for _, item := range e.Response.Input {
// Ensure IDs are present
if item.User != nil && item.User.ID == "" {
item.User.ID = generateItemID()
}
if item.Assistant != nil && item.Assistant.ID == "" {
item.Assistant.ID = generateItemID()
}
if item.System != nil && item.System.ID == "" {
item.System.ID = generateItemID()
}
if item.FunctionCall != nil && item.FunctionCall.ID == "" {
item.FunctionCall.ID = generateItemID()
}
if item.FunctionCallOutput != nil && item.FunctionCallOutput.ID == "" {
item.FunctionCallOutput.ID = generateItemID()
}
conversation.Items = append(conversation.Items, &item)
}
conversation.Lock.Unlock()
}
respCtx, respDone := session.startResponse(context.Background())
go func() {
defer close(respDone)
triggerResponse(respCtx, session, conversation, t, &e.Response)
}()
case types.ResponseCancelEvent:
xlog.Debug("recv", "message", string(msg))
session.cancelActiveResponse()
default:
xlog.Error("unknown message type")
// sendError(t, "unknown_message_type", fmt.Sprintf("Unknown message type: %s", incomingMsg.Type), "", "")
}
}
// Cancel any in-flight response before tearing down
session.cancelActiveResponse()
// Stop the Opus decode goroutine (if running)
if decodeDone != nil {
close(decodeDone)
}
// Signal any running VAD goroutine to exit.
if vadServerStarted {
close(done)
}
// Stop the server-side sound-detection windowing goroutine (if running).
if soundWindowDone != nil {
close(soundWindowDone)
}
wg.Wait()
// Remove the session from the sessions map
sessionLock.Lock()
delete(sessions, sessionID)
sessionLock.Unlock()
}
// sendEvent sends a server event via the transport, logging any errors.
func sendEvent(t Transport, event types.ServerEvent) {
if err := t.SendEvent(event); err != nil {
xlog.Error("write error", "error", err)
}
}
// sendError sends an error event to the client.
func sendError(t Transport, code, message, param, eventID string) {
errorEvent := types.ErrorEvent{
ServerEventBase: types.ServerEventBase{
EventID: eventID,
},
Error: types.Error{
Type: "invalid_request_error",
Code: code,
Message: message,
Param: param,
EventID: eventID,
},
}
sendEvent(t, errorEvent)
}
func sendNotImplemented(t Transport, message string) {
sendError(t, "not_implemented", message, "", "event_TODO")
}
// sendTestTone generates a 1-second 440 Hz sine wave and sends it through
// the transport's audio path. This exercises the full Opus encode → RTP →
// browser decode pipeline without involving TTS.
func sendTestTone(t Transport) {
const (
freq = 440.0
sampleRate = 24000
duration = 1 // seconds
amplitude = 16000
numSamples = sampleRate * duration
)
pcm := make([]byte, numSamples*2) // 16-bit samples = 2 bytes each
for i := range numSamples {
sample := int16(amplitude * math.Sin(2*math.Pi*freq*float64(i)/sampleRate))
binary.LittleEndian.PutUint16(pcm[i*2:], uint16(sample))
}
xlog.Debug("Sending test tone", "samples", numSamples, "sample_rate", sampleRate, "freq", freq)
if err := t.SendAudio(context.Background(), pcm, sampleRate); err != nil {
xlog.Error("test tone send failed", "error", err)
}
}
func updateTransSession(session *Session, update *types.SessionUnion, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) error {
sessionLock.Lock()
defer sessionLock.Unlock()
// In transcription session update, we look at Transcription field
if update.Transcription == nil || update.Transcription.Audio == nil || update.Transcription.Audio.Input == nil {
return nil
}
trUpd := update.Transcription.Audio.Input.Transcription
trCur := session.InputAudioTranscription
session.TranscriptionOnly = true
if trUpd != nil && trUpd.Model != "" && trUpd.Model != trCur.Model {
cfg, err := cl.LoadModelConfigFileByNameDefaultOptions(trUpd.Model, appConfig)
if err != nil {
return err
}
if cfg == nil || (cfg.Pipeline.VAD == "" || cfg.Pipeline.Transcription == "") {
return fmt.Errorf("model is not a valid pipeline model: %s", trUpd.Model)
}
m, cfg, err := newTranscriptionOnlyModel(&cfg.Pipeline, cl, ml, appConfig)
if err != nil {
return err
}
session.ModelInterface = m
session.ModelConfig = cfg
session.SoundDetectionEnabled = cfg.Pipeline.SoundDetection != ""
if session.SoundDetectionTopK <= 0 {
session.SoundDetectionTopK = defaultSoundDetectionTopK
}
}
if trUpd != nil {
trCur.Language = trUpd.Language
trCur.Prompt = trUpd.Prompt
}
if update.Transcription.Audio.Input.TurnDetectionSet {
session.TurnDetection = update.Transcription.Audio.Input.TurnDetection
}
if update.Transcription.Audio.Input.Format != nil && update.Transcription.Audio.Input.Format.PCM != nil {
if update.Transcription.Audio.Input.Format.PCM.Rate > 0 {
session.InputSampleRate = update.Transcription.Audio.Input.Format.PCM.Rate
}
}
return nil
}
func updateSession(session *Session, update *types.SessionUnion, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig, evaluator *templates.Evaluator, routing *RealtimeRoutingContext) error {
sessionLock.Lock()
defer sessionLock.Unlock()
if update.Realtime == nil {
return nil
}
session.TranscriptionOnly = false
rt := update.Realtime
if rt.Model != "" {
cfg, err := cl.LoadModelConfigFileByNameDefaultOptions(rt.Model, appConfig)
if err != nil {
return err
}
if cfg == nil || (cfg.Pipeline.VAD == "" || cfg.Pipeline.Transcription == "" || cfg.Pipeline.TTS == "" || cfg.Pipeline.LLM == "") {
return fmt.Errorf("model is not a valid pipeline model: %s", rt.Model)
}
if session.InputAudioTranscription == nil {
session.InputAudioTranscription = &types.AudioTranscription{}
}
session.InputAudioTranscription.Model = cfg.Pipeline.Transcription
session.Voice = cfg.TTSConfig.Voice
session.Model = rt.Model
session.ModelConfig = cfg
}
if rt.Audio != nil && rt.Audio.Output != nil && rt.Audio.Output.Voice != "" {
session.Voice = string(rt.Audio.Output.Voice)
}
if rt.Audio != nil && rt.Audio.Input != nil && rt.Audio.Input.Transcription != nil {
trUpd := rt.Audio.Input.Transcription
// A language-only update (e.g. a client forcing the STT language) carries
// an empty Model. Preserve the pipeline's configured transcription backend
// instead of blanking it — otherwise the next utterance transcribes against
// an empty model and the backend RPC fails with "unimplemented".
if trUpd.Model == "" && session.InputAudioTranscription != nil {
trUpd.Model = session.InputAudioTranscription.Model
}
session.InputAudioTranscription = trUpd
if trUpd.Model != "" {
session.ModelConfig.Pipeline.Transcription = trUpd.Model
}
}
if rt.Model != "" || (rt.Audio != nil && rt.Audio.Output != nil && rt.Audio.Output.Voice != "") || (rt.Audio != nil && rt.Audio.Input != nil && rt.Audio.Input.Transcription != nil) {
m, err := newModel(&session.ModelConfig.Pipeline, cl, ml, appConfig, evaluator, routing)
if err != nil {
return err
}
session.ModelInterface = m
}
if rt.Audio != nil && rt.Audio.Input != nil && rt.Audio.Input.TurnDetectionSet {
session.TurnDetection = rt.Audio.Input.TurnDetection
}
if rt.Audio != nil && rt.Audio.Input != nil && rt.Audio.Input.Format != nil && rt.Audio.Input.Format.PCM != nil {
if rt.Audio.Input.Format.PCM.Rate > 0 {
session.InputSampleRate = rt.Audio.Input.Format.PCM.Rate
}
}
if rt.Audio != nil && rt.Audio.Output != nil && rt.Audio.Output.Format != nil && rt.Audio.Output.Format.PCM != nil {
if rt.Audio.Output.Format.PCM.Rate > 0 {
session.OutputSampleRate = rt.Audio.Output.Format.PCM.Rate
}
}
if rt.Instructions != "" {
session.Instructions = rt.Instructions
}
if rt.Tools != nil {
// Manage Mode tools survive a client-driven session.update — the
// alternative is silently dropping them whenever the user toggles
// a client MCP server, which would break the modality mid-session.
// Names from rt.Tools win on collision (the client is explicit;
// we preserve, we don't override).
merged := append([]types.ToolUnion(nil), rt.Tools...)
seen := make(map[string]struct{}, len(merged))
for _, t := range merged {
if t.Function != nil {
seen[t.Function.Name] = struct{}{}
}
}
for _, t := range session.AssistantTools {
if t.Function == nil {
continue
}
if _, ok := seen[t.Function.Name]; ok {
continue
}
merged = append(merged, t)
}
session.Tools = merged
}
if rt.ToolChoice != nil {
session.ToolChoice = rt.ToolChoice
}
if rt.MaxOutputTokens != 0 {
session.MaxOutputTokens = rt.MaxOutputTokens
}
if len(rt.OutputModalities) > 0 {
session.OutputModalities = rt.OutputModalities
}
return nil
}
// decodeOpusLoop runs a ticker that drains buffered raw Opus frames from the
// session, decodes them in a single batched gRPC call, and appends the
// resulting PCM to InputAudioBuffer. This gives ~3 gRPC calls/sec instead of
// 50 (one per RTP packet) and keeps decode diagnostics once-per-batch.
func decodeOpusLoop(session *Session, opusBackend grpc.Backend, done chan struct{}) {
ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
session.OpusFramesLock.Lock()
frames := session.OpusFrames
session.OpusFrames = nil
session.OpusFramesLock.Unlock()
if len(frames) == 0 {
continue
}
result, err := opusBackend.AudioDecode(context.Background(), &proto.AudioDecodeRequest{
Frames: frames,
Options: map[string]string{
"session_id": session.ID,
},
})
if err != nil {
xlog.Warn("opus decode batch error", "error", err, "frames", len(frames))
continue
}
samples := sound.BytesToInt16sLE(result.PcmData)
xlog.Debug("opus decode batch",
"frames", len(frames),
"decoded_samples", len(samples),
"sample_rate", result.SampleRate,
)
// Resample from 48kHz to session input rate (16kHz) if needed
if result.SampleRate != int32(session.InputSampleRate) {
samples = sound.ResampleInt16(samples, int(result.SampleRate), session.InputSampleRate)
}
pcmBytes := sound.Int16toBytesLE(samples)
session.AudioBufferLock.Lock()
newSize := len(session.InputAudioBuffer) + len(pcmBytes)
if newSize <= maxAudioBufferSize {
session.InputAudioBuffer = append(session.InputAudioBuffer, pcmBytes...)
}
session.AudioBufferLock.Unlock()
case <-done:
return
}
}
}
// handleVAD is a goroutine that listens for audio data from the client,
// runs VAD on the audio data, and commits utterances to the conversation
func handleVAD(session *Session, conv *Conversation, t Transport, done chan struct{}) {
vadContext, cancel := context.WithCancel(context.Background())
go func() {
<-done
cancel()
}()
silenceThreshold := 0.5 // Default 500ms
if session.TurnDetection != nil && session.TurnDetection.ServerVad != nil {
silenceThreshold = float64(session.TurnDetection.ServerVad.SilenceDurationMs) / 1000
}
speechStarted := false
startTime := time.Now()
ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
session.AudioBufferLock.Lock()
allAudio := make([]byte, len(session.InputAudioBuffer))
copy(allAudio, session.InputAudioBuffer)
session.AudioBufferLock.Unlock()
aints := sound.BytesToInt16sLE(allAudio)
if len(aints) == 0 || len(aints) < int(silenceThreshold*float64(session.InputSampleRate)) {
continue
}
// Resample from InputSampleRate to 16kHz
aints = sound.ResampleInt16(aints, session.InputSampleRate, localSampleRate)
segments, err := runVAD(vadContext, session, aints)
if err != nil {
if err.Error() == "unexpected speech end" {
xlog.Debug("VAD cancelled")
continue
}
xlog.Error("failed to process audio", "error", err)
sendError(t, "processing_error", "Failed to process audio: "+err.Error(), "", "")
continue
}
audioLength := float64(len(aints)) / localSampleRate
// TODO: When resetting the buffer we should retain a small postfix
if len(segments) == 0 && audioLength > silenceThreshold {
session.AudioBufferLock.Lock()
session.InputAudioBuffer = nil
session.AudioBufferLock.Unlock()
continue
} else if len(segments) == 0 {
continue
}
if !speechStarted {
// Barge-in: cancel any in-flight response so we stop
// sending audio and don't keep the interrupted reply in history.
session.cancelActiveResponse()
sendEvent(t, types.InputAudioBufferSpeechStartedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
AudioStartMs: time.Since(startTime).Milliseconds(),
})
speechStarted = true
}
// Segment still in progress when audio ended
segEndTime := segments[len(segments)-1].End
if segEndTime == 0 {
continue
}
if float32(audioLength)-segEndTime > float32(silenceThreshold) {
xlog.Debug("Detected end of speech segment")
session.AudioBufferLock.Lock()
session.InputAudioBuffer = nil
session.AudioBufferLock.Unlock()
sendEvent(t, types.InputAudioBufferSpeechStoppedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
AudioEndMs: time.Since(startTime).Milliseconds(),
})
speechStarted = false
sendEvent(t, types.InputAudioBufferCommittedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
ItemID: generateItemID(),
PreviousItemID: "TODO",
})
abytes := sound.Int16toBytesLE(aints)
// TODO: Remove prefix silence that is is over TurnDetectionParams.PrefixPaddingMs
respCtx, respDone := session.startResponse(vadContext)
go func() {
defer close(respDone)
commitUtterance(respCtx, abytes, session, conv, t)
}()
}
}
}
}
func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Conversation, t Transport) {
if len(utt) == 0 {
return
}
f, err := os.CreateTemp("", "realtime-audio-chunk-*.wav")
if err != nil {
xlog.Error("failed to create temp file", "error", err)
return
}
defer f.Close()
defer os.Remove(f.Name())
xlog.Debug("Writing to file", "file", f.Name())
hdr := laudio.NewWAVHeader(uint32(len(utt)))
if err := hdr.Write(f); err != nil {
xlog.Error("Failed to write WAV header", "error", err)
return
}
if _, err := f.Write(utt); err != nil {
xlog.Error("Failed to write audio data", "error", err)
return
}
f.Sync()
// Start speaker verification concurrently with transcription. This is a
// latency optimization only: there is a hard join below before the LLM, so
// an unauthorized utterance never reaches generateResponse (no LLM, no
// tools, no TTS) regardless of how fast transcription finishes. A rejected
// turn wastes only transcription compute, which has no side effects. The
// transcript is still emitted to the same peer that sent the audio, which
// reveals nothing new to them.
// Resolve the speaker when the gate must authorize this turn, or when identity
// surfacing/personalization needs a fresh identity. Identity resolution
// ignores the when:first short-circuit (that only skips re-authorization).
type resolveOutcome struct {
res resolution
err error
}
var resolveCh chan resolveOutcome
runResolve := false
if session.voiceGate != nil && session.InputAudioTranscription != nil {
enforce := session.voiceGate.cfg.EnforceGate()
gateNeedsAuth := enforce
if enforce && session.voiceGate.cfg.When == config.VoiceGateWhenFirst {
session.gateMu.Lock()
if session.voiceVerified {
gateNeedsAuth = false
}
session.gateMu.Unlock()
}
if gateNeedsAuth || session.voiceGate.cfg.IdentityEnabled() {
runResolve = true
resolveCh = make(chan resolveOutcome, 1)
wavPath := f.Name()
go func() {
r, rerr := session.voiceGate.Resolve(ctx, wavPath)
resolveCh <- resolveOutcome{res: r, err: rerr}
}()
}
}
// TODO: If we have a real any-to-any model then transcription is optional
var transcript string
switch {
case session.InputAudioTranscription != nil:
// emitTranscription streams transcript deltas when
// pipeline.streaming.transcription is set, otherwise emits a single
// completed event; either way it returns the final transcript text.
var err error
transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name())
if err != nil {
// Drain the gate goroutine before returning so its in-flight read of
// the temp WAV finishes before the deferred os.Remove fires.
if runResolve {
<-resolveCh
}
sendError(t, "transcription_failed", err.Error(), "", "event_TODO")
return
}
case session.SoundDetectionEnabled:
// Sound-detection-only session: no transcription and no LLM. The
// sound-detection emit below carries the result; there is no any-to-any
// path to fall into. Windowing is client-driven (turn_detection none +
// input_audio_buffer.commit), so this is not voice-gated.
default:
// The voice gate runs only on the transcription path above; if an
// any-to-any model path is added here, join the gate before responding.
sendNotImplemented(t, "any-to-any models")
return
}
// Sound-event detection is additive to transcription: classify the same
// committed window and emit its scored AudioSet tags as a separate event.
// A failure here is logged but must never abort the turn.
if session.SoundDetectionEnabled {
if sderr := emitSoundDetection(ctx, t, session, generateItemID(), f.Name()); sderr != nil {
xlog.Error("sound detection failed", "error", sderr)
}
}
// Join on the resolution before any side-effecting step.
var speaker *types.Speaker
if runResolve {
out := <-resolveCh
enforce := session.voiceGate.cfg.EnforceGate()
if out.err != nil {
if enforce {
// Fail closed: a gate that cannot decide must not let audio through.
xlog.Error("voice recognition gate error", "error", out.err)
if session.voiceGate.cfg.OnReject == config.VoiceGateRejectEvent {
sendError(t, "speaker_not_authorized", "speaker not authorized: verification error", "", "event_TODO")
}
return
}
// Non-enforcing: degrade to an unknown speaker and continue.
xlog.Warn("voice identity resolve failed; continuing as unknown speaker", "error", out.err)
} else {
s := out.res.speaker
speaker = &s
}
if enforce {
alreadyVerified := false
if session.voiceGate.cfg.When == config.VoiceGateWhenFirst {
session.gateMu.Lock()
alreadyVerified = session.voiceVerified
session.gateMu.Unlock()
}
allowed, reason := false, "verification error"
if out.err == nil {
allowed, reason = session.voiceGate.authorize(out.res)
}
proceed, markVerified := session.voiceGate.decide(alreadyVerified, allowed)
if !proceed {
xlog.Debug("voice recognition gate rejected utterance", "reason", reason)
if session.voiceGate.cfg.OnReject == config.VoiceGateRejectEvent {
sendError(t, "speaker_not_authorized", "speaker not authorized: "+reason, "", "event_TODO")
}
return
}
if markVerified {
session.gateMu.Lock()
session.voiceVerified = true
session.gateMu.Unlock()
}
xlog.Debug("voice recognition gate authorized utterance", "speaker", out.res.speaker.Name)
}
}
// Generate an LLM response only when there is a transcript to feed it. A
// sound-detection-only session (no transcription) has no LLM stage, so it
// stops here after emitting the sound-detection event.
if session.InputAudioTranscription != nil && !session.TranscriptionOnly {
generateResponse(ctx, session, utt, transcript, speaker, conv, t)
}
}
// handleSoundWindow runs server-side windowed sound-event detection (option B):
// every HopMs it classifies the last WindowMs of streamed audio and emits a
// sound_detection event, so a sound-only client only has to stream audio (no
// input_audio_buffer.commit). It keeps the input buffer trimmed to one window
// so a long stream stays bounded. Runs until done is closed. This is
// independent of VAD: sound events are not speech.
func handleSoundWindow(session *Session, t Transport, done chan struct{}) {
ticker := time.NewTicker(time.Duration(session.SoundDetectionHopMs) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
classifySoundWindow(session, t)
}
}
}
// classifySoundWindow is one windowing tick: it snapshots the most recent
// WindowMs of buffered audio (trimming the buffer so a long stream stays
// bounded) and, when there is enough, classifies it and emits a sound_detection
// event. Extracted from handleSoundWindow so it can be driven synchronously in
// tests.
func classifySoundWindow(session *Session, t Transport) {
const bytesPerSample = 2 // 16-bit mono PCM
sr := session.InputSampleRate
windowBytes := session.SoundDetectionWindowMs * sr / 1000 * bytesPerSample
minBytes := sr / 100 * bytesPerSample // ~10ms before classifying
session.AudioBufferLock.Lock()
// Keep only the most recent window so a long stream stays bounded.
if windowBytes > 0 && len(session.InputAudioBuffer) > windowBytes {
trimmed := make([]byte, windowBytes)
copy(trimmed, session.InputAudioBuffer[len(session.InputAudioBuffer)-windowBytes:])
session.InputAudioBuffer = trimmed
}
window := make([]byte, len(session.InputAudioBuffer))
copy(window, session.InputAudioBuffer)
session.AudioBufferLock.Unlock()
if len(window) < minBytes {
return // not enough audio buffered yet
}
path, err := writeWindowWAV(window, sr)
if err != nil {
xlog.Error("sound window: failed to write wav", "error", err)
return
}
if sderr := emitSoundDetection(context.Background(), t, session, generateItemID(), path); sderr != nil {
xlog.Error("sound window: detection failed", "error", sderr)
}
if rerr := os.Remove(path); rerr != nil {
xlog.Debug("sound window: temp cleanup failed", "error", rerr)
}
}
// writeWindowWAV writes mono 16-bit PCM to a temp WAV at the given sample rate
// (the ced classifier reads the declared rate and resamples). Returns the path;
// the caller removes it.
func writeWindowWAV(pcm []byte, sampleRate int) (string, error) {
f, err := os.CreateTemp("", "realtime-sound-window-*.wav")
if err != nil {
return "", err
}
defer func() { _ = f.Close() }()
hdr := laudio.NewWAVHeaderWithRate(uint32(len(pcm)), uint32(sampleRate))
if err := hdr.Write(f); err != nil {
_ = os.Remove(f.Name())
return "", err
}
if _, err := f.Write(pcm); err != nil {
_ = os.Remove(f.Name())
return "", err
}
_ = f.Sync()
return f.Name(), nil
}
func runVAD(ctx context.Context, session *Session, adata []int16) ([]schema.VADSegment, error) {
soundIntBuffer := &audio.IntBuffer{
Format: &audio.Format{SampleRate: localSampleRate, NumChannels: 1},
SourceBitDepth: 16,
Data: sound.ConvertInt16ToInt(adata),
}
float32Data := soundIntBuffer.AsFloat32Buffer().Data
resp, err := session.ModelInterface.VAD(ctx, &schema.VADRequest{
Audio: float32Data,
})
if err != nil {
return nil, err
}
// If resp.Segments is empty => no speech
return resp.Segments, nil
}
// speakerNote renders the system-prompt note for the current speaker. Returns
// an empty string when there is no name and unknown notes are disabled.
func speakerNote(s *types.Speaker, noteUnknown bool) string {
if s != nil && s.Matched && s.Name != "" {
return "The current speaker is " + s.Name + "."
}
if noteUnknown {
return "The current speaker is unknown."
}
return ""
}
// Function to generate a response based on the conversation
func generateResponse(ctx context.Context, session *Session, utt []byte, transcript string, speaker *types.Speaker, conv *Conversation, t Transport) {
xlog.Debug("Generating realtime response...")
// Create user message item
item := types.MessageItemUnion{
User: &types.MessageItemUser{
ID: generateItemID(),
Status: types.ItemStatusCompleted,
Speaker: speaker,
Content: []types.MessageContentInput{
{
Type: types.MessageContentTypeInputAudio,
Audio: base64.StdEncoding.EncodeToString(utt),
Transcript: transcript,
},
},
},
}
conv.Lock.Lock()
conv.Items = append(conv.Items, &item)
conv.Lock.Unlock()
sendEvent(t, types.ConversationItemAddedEvent{
Item: item,
})
// Surface the recognized speaker to the client. Skip the event for an
// unidentified speaker unless announce_unknown is set.
if speaker != nil && session.voiceGate != nil && session.voiceGate.cfg.AnnounceEnabled() {
if speaker.Matched || session.voiceGate.cfg.Identity.AnnounceUnknown {
sendEvent(t, types.ConversationItemSpeakerEvent{
ItemID: item.User.ID,
Speaker: *speaker,
})
}
}
triggerResponse(ctx, session, conv, t, nil)
}
// maxAssistantToolTurns caps the server-side agentic loop. Mirrors the
// chat-page maxToolTurns:10 from useChat.js — the model gets up to this
// many consecutive tool round-trips before we return control to the user
// without another response cycle.
const maxAssistantToolTurns = 10
func triggerResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams) {
triggerResponseAtTurn(ctx, session, conv, t, overrides, 0)
}
func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams, toolTurn int) {
config := session.ModelInterface.PredictConfig()
// Default values
tools := session.Tools
toolChoice := session.ToolChoice
instructions := session.Instructions
maxOutputTokens := session.MaxOutputTokens
// Overrides
if overrides != nil {
if overrides.Tools != nil {
tools = overrides.Tools
}
if overrides.ToolChoice != nil {
toolChoice = overrides.ToolChoice
}
if overrides.Instructions != "" {
instructions = overrides.Instructions
}
if overrides.MaxOutputTokens != 0 {
maxOutputTokens = overrides.MaxOutputTokens
}
}
// Apply MaxOutputTokens to model config if specified
// Save original value to restore after prediction
var originalMaxTokens *int
if config != nil {
originalMaxTokens = config.Maxtokens
if maxOutputTokens != 0 && !maxOutputTokens.IsInf() {
tokenValue := int(maxOutputTokens)
config.Maxtokens = &tokenValue
xlog.Debug("Applied max_output_tokens to config", "value", tokenValue)
}
}
// Defer restoration of original value
defer func() {
if config != nil {
config.Maxtokens = originalMaxTokens
}
}()
var conversationHistory schema.Messages
conversationHistory = append(conversationHistory, schema.Message{
Role: string(types.MessageRoleSystem),
StringContent: instructions,
Content: instructions,
})
imgIndex := 0
var lastUserSpeaker *types.Speaker
personalize := session.voiceGate != nil && session.voiceGate.cfg.PersonalizeEnabled()
conv.Lock.Lock()
items := trimRealtimeItems(conv.Items, session.MaxHistoryItems)
for _, item := range items {
if item.User != nil {
msg := schema.Message{
Role: string(types.MessageRoleUser),
}
lastUserSpeaker = item.User.Speaker
if personalize && session.voiceGate.cfg.Identity.InjectName &&
item.User.Speaker != nil && item.User.Speaker.Matched && item.User.Speaker.Name != "" {
msg.Name = item.User.Speaker.Name
}
textContent := ""
nrOfImgsInMessage := 0
for _, content := range item.User.Content {
switch content.Type {
case types.MessageContentTypeInputText:
textContent += content.Text
case types.MessageContentTypeInputAudio:
textContent += content.Transcript
case types.MessageContentTypeInputImage:
img, err := utils.GetContentURIAsBase64(content.ImageURL)
if err != nil {
xlog.Warn("Failed to process image", "error", err)
continue
}
msg.StringImages = append(msg.StringImages, img)
imgIndex++
nrOfImgsInMessage++
}
}
if nrOfImgsInMessage > 0 && !config.TemplateConfig.UseTokenizerTemplate {
templated, err := templates.TemplateMultiModal(config.TemplateConfig.Multimodal, templates.MultiModalOptions{
TotalImages: imgIndex,
ImagesInMessage: nrOfImgsInMessage,
}, textContent)
if err != nil {
xlog.Warn("Failed to apply multimodal template", "error", err)
templated = textContent
}
msg.StringContent = templated
msg.Content = templated
} else {
msg.StringContent = textContent
msg.Content = textContent
}
conversationHistory = append(conversationHistory, msg)
} else if item.Assistant != nil {
for _, content := range item.Assistant.Content {
switch content.Type {
case types.MessageContentTypeOutputText:
conversationHistory = append(conversationHistory, schema.Message{
Role: string(types.MessageRoleAssistant),
StringContent: content.Text,
Content: content.Text,
})
case types.MessageContentTypeOutputAudio:
conversationHistory = append(conversationHistory, schema.Message{
Role: string(types.MessageRoleAssistant),
StringContent: content.Transcript,
Content: content.Transcript,
StringAudios: []string{content.Audio},
})
}
}
} else if item.System != nil {
for _, content := range item.System.Content {
conversationHistory = append(conversationHistory, schema.Message{
Role: string(types.MessageRoleSystem),
StringContent: content.Text,
Content: content.Text,
})
}
} else if item.FunctionCall != nil {
conversationHistory = append(conversationHistory, schema.Message{
Role: string(types.MessageRoleAssistant),
ToolCalls: []schema.ToolCall{
{
ID: item.FunctionCall.CallID,
Type: "function",
FunctionCall: schema.FunctionCall{
Name: item.FunctionCall.Name,
Arguments: item.FunctionCall.Arguments,
},
},
},
})
} else if item.FunctionCallOutput != nil {
conversationHistory = append(conversationHistory, schema.Message{
Role: "tool",
Name: item.FunctionCallOutput.CallID,
Content: item.FunctionCallOutput.Output,
StringContent: item.FunctionCallOutput.Output,
})
}
}
conv.Lock.Unlock()
if personalize && session.voiceGate.cfg.Identity.InjectSystemNote {
if note := speakerNote(lastUserSpeaker, session.voiceGate.cfg.Identity.NoteUnknown); note != "" {
conversationHistory[0].StringContent += "\n\n" + note
conversationHistory[0].Content = conversationHistory[0].StringContent
}
}
var images []string
for _, m := range conversationHistory {
images = append(images, m.StringImages...)
}
responseID := generateUniqueID()
sendEvent(t, types.ResponseCreatedEvent{
ServerEventBase: types.ServerEventBase{},
Response: types.Response{
ID: responseID,
Object: "realtime.response",
Status: types.ResponseStatusInProgress,
},
})
// Streamed LLM path: when the pipeline opts into LLM streaming, stream the
// transcript to the client as it is generated and synthesize the buffered
// message once. Tool turns are supported only when the model uses its
// tokenizer template: the C++ autoparser then delivers content and tool
// calls via ChatDeltas (clearing the text stream), so the spoken transcript
// never leaks tool-call tokens. Grammar-based function calling emits the
// call as JSON in the token stream, so those turns keep the buffered path.
if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() {
canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate
var respMods []types.Modality
if overrides != nil {
respMods = overrides.OutputModalities
}
if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) {
if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) {
return
}
}
}
predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil)
if err != nil {
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here
return
}
pred, err := predFunc()
if err != nil {
sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", "")
return
}
// Check for cancellation after LLM inference (barge-in may have fired)
if ctx.Err() != nil {
xlog.Debug("Response cancelled after LLM inference (barge-in)")
sendEvent(t, types.ResponseDoneEvent{
ServerEventBase: types.ServerEventBase{},
Response: types.Response{
ID: responseID,
Object: "realtime.response",
Status: types.ResponseStatusCancelled,
},
})
return
}
xlog.Debug("Function config for parsing", "function_name_key", config.FunctionsConfig.FunctionNameKey, "function_arguments_key", config.FunctionsConfig.FunctionArgumentsKey)
xlog.Debug("LLM raw response", "text", pred.Response, "response_length", len(pred.Response), "usage", pred.Usage)
// Safely dereference pointer fields for logging
maxTokens := "nil"
if config.Maxtokens != nil {
maxTokens = fmt.Sprintf("%d", *config.Maxtokens)
}
contextSize := "nil"
if config.ContextSize != nil {
contextSize = fmt.Sprintf("%d", *config.ContextSize)
}
xlog.Debug("Model parameters", "max_tokens", maxTokens, "context_size", contextSize, "stopwords", config.StopWords)
rawResponse := pred.Response
if config.TemplateConfig.ReplyPrefix != "" {
rawResponse = config.TemplateConfig.ReplyPrefix + rawResponse
}
// Detect thinking start token from template for reasoning extraction
var template string
if config.TemplateConfig.UseTokenizerTemplate {
template = config.GetModelTemplate()
} else {
template = config.TemplateConfig.Chat
}
thinkingStartToken := reasoning.DetectThinkingStartToken(template, &config.ReasoningConfig)
// When the C++ autoparser emitted ChatDeltas with actionable data,
// prefer them — the backend clears Reply.Message in that path and
// delivers parsed content/reasoning/tool-calls via the delta stream
// (see pkg/functions/chat_deltas.go, mirrored from chat.go's non-SSE
// handling). Without this, Response is empty and realtime would
// synthesize silence for replies that actually produced tokens.
var reasoningText, responseWithoutReasoning, textContent, cleanedResponse string
var toolCalls []functions.FuncCallResults
deltaToolCalls := functions.ToolCallsFromChatDeltas(pred.ChatDeltas)
deltaContent := functions.ContentFromChatDeltas(pred.ChatDeltas)
deltaReasoning := functions.ReasoningFromChatDeltas(pred.ChatDeltas)
if len(deltaToolCalls) > 0 || deltaContent != "" {
xlog.Debug("[ChatDeltas] realtime: using C++ autoparser deltas",
"tool_calls", len(deltaToolCalls),
"content_len", len(deltaContent),
"reasoning_len", len(deltaReasoning))
// Issue #9985: when the autoparser only delivered content (no
// reasoning_content), it may be running in the "pure content"
// PEG fallback (non-jinja path) which leaves <think>…</think>
// embedded in the content. Run Go-side extraction defensively.
// ExtractReasoningWithConfig is a no-op when no tag pair matches,
// so it's safe to apply unconditionally in the no-reasoning branch.
if deltaReasoning == "" && deltaContent != "" {
deltaReasoning, deltaContent = reasoning.ExtractReasoningComplete(deltaContent, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
}
reasoningText = deltaReasoning
responseWithoutReasoning = deltaContent
textContent = deltaContent
cleanedResponse = deltaContent
toolCalls = deltaToolCalls
} else {
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningComplete(rawResponse, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig)
cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig)
toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig)
}
xlog.Debug("LLM Response", "reasoning", reasoningText, "response_without_reasoning", responseWithoutReasoning)
xlog.Debug("Function call parsing", "textContent", textContent, "cleanedResponse", cleanedResponse, "toolCallsCount", len(toolCalls))
noActionName := "answer"
if config.FunctionsConfig.NoActionFunctionName != "" {
noActionName = config.FunctionsConfig.NoActionFunctionName
}
isNoAction := len(toolCalls) > 0 && toolCalls[0].Name == noActionName
var finalSpeech string
var finalToolCalls []functions.FuncCallResults
if isNoAction {
arg := toolCalls[0].Arguments
arguments := map[string]any{}
if err := json.Unmarshal([]byte(arg), &arguments); err == nil {
if m, exists := arguments["message"]; exists {
if message, ok := m.(string); ok {
finalSpeech = message
} else {
xlog.Warn("NoAction function message field is not a string", "type", fmt.Sprintf("%T", m))
}
} else {
xlog.Warn("NoAction function missing 'message' field in arguments")
}
} else {
xlog.Warn("Failed to unmarshal NoAction function arguments", "error", err, "arguments", arg)
}
if finalSpeech == "" {
// Fallback if parsing failed
xlog.Warn("NoAction function did not produce speech, using cleaned response as fallback")
finalSpeech = cleanedResponse
}
} else {
finalToolCalls = toolCalls
xlog.Debug("Setting finalToolCalls", "count", len(finalToolCalls))
if len(toolCalls) > 0 {
finalSpeech = textContent
} else {
finalSpeech = cleanedResponse
}
}
if finalSpeech != "" {
// Create the assistant item now that we have content
item := types.MessageItemUnion{
Assistant: &types.MessageItemAssistant{
ID: generateItemID(),
Status: types.ItemStatusInProgress,
Content: []types.MessageContentOutput{
{
Type: types.MessageContentTypeOutputAudio,
Transcript: finalSpeech,
},
},
},
}
conv.Lock.Lock()
conv.Items = append(conv.Items, &item)
conv.Lock.Unlock()
sendEvent(t, types.ResponseOutputItemAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: 0,
Item: item,
})
sendEvent(t, types.ResponseContentPartAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Part: item.Assistant.Content[0],
})
// removeItemFromConv removes the last occurrence of an item with
// the given assistant ID from conversation history.
removeItemFromConv := func(assistantID string) {
conv.Lock.Lock()
for i := len(conv.Items) - 1; i >= 0; i-- {
if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == assistantID {
conv.Items = append(conv.Items[:i], conv.Items[i+1:]...)
break
}
}
conv.Lock.Unlock()
}
// sendCancelledResponse emits the cancelled status and cleans up the
// assistant item so the interrupted reply is not in chat history.
sendCancelledResponse := func() {
removeItemFromConv(item.Assistant.ID)
sendEvent(t, types.ResponseDoneEvent{
ServerEventBase: types.ServerEventBase{},
Response: types.Response{
ID: responseID,
Object: "realtime.response",
Status: types.ResponseStatusCancelled,
},
})
}
var audioString string
_, isWebRTC := t.(*WebRTCTransport)
var respMods []types.Modality
if overrides != nil {
respMods = overrides.OutputModalities
}
modalities := resolveOutputModalities(session.OutputModalities, respMods)
if modalitiesContainAudio(modalities) {
// Check for cancellation before TTS
if ctx.Err() != nil {
xlog.Debug("Response cancelled before TTS (barge-in)")
sendCancelledResponse()
return
}
// Transcript of the spoken reply (the audio's text).
sendEvent(t, types.ResponseOutputAudioTranscriptDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Delta: finalSpeech,
})
sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Transcript: finalSpeech,
})
// Synthesize and send the audio. With pipeline.streaming.tts enabled
// emitSpeech forwards a response.output_audio.delta per backend PCM
// chunk as it's produced; otherwise it sends the whole utterance as a
// single delta. The returned PCM is stored (base64) on the item below.
pcmAudio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech)
if err != nil {
if ctx.Err() != nil {
xlog.Debug("TTS cancelled (barge-in)")
sendCancelledResponse()
return
}
xlog.Error("TTS failed", "error", err)
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
return
}
if !isWebRTC {
audioString = base64.StdEncoding.EncodeToString(pcmAudio)
}
if !isWebRTC {
sendEvent(t, types.ResponseOutputAudioDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
})
}
} else {
// Text-only mode: skip TTS, emit only the text events.
sendEvent(t, types.ResponseOutputTextDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Delta: finalSpeech,
})
sendEvent(t, types.ResponseOutputTextDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Text: finalSpeech,
})
}
sendEvent(t, types.ResponseContentPartDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Part: item.Assistant.Content[0],
})
conv.Lock.Lock()
item.Assistant.Status = types.ItemStatusCompleted
if !isWebRTC {
item.Assistant.Content[0].Audio = audioString
}
conv.Lock.Unlock()
sendEvent(t, types.ResponseOutputItemDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: 0,
Item: item,
})
}
// Emit the parsed tool calls, the terminal response.done, and (for
// server-side assistant tools) the follow-up response. Shared with the
// streamed path so both finalize tool calls identically.
emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn)
}
// emitToolCallItems emits the realtime function_call items for the parsed tool
// calls, the terminal response.done, and — for server-side LocalAI Assistant
// tools — re-triggers a follow-up response so the model can speak the result.
// hasContent shifts the tool-call output index past the assistant content item
// when the same turn also produced spoken/text content. Two tool paths:
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side;
// we append both the call and its output to conv.Items and re-trigger. The
// client only sees observability events.
// - All other tools follow the standard OpenAI flow: emit
// function_call_arguments.done and wait for the client to send
// conversation.item.create back.
func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) {
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls))
executedAssistantTool := false
for i, tc := range toolCalls {
toolCallID := generateItemID()
callID := "call_" + generateUniqueID() // OpenAI uses call_xyz
// Create FunctionCall Item
fcItem := types.MessageItemUnion{
FunctionCall: &types.MessageItemFunctionCall{
ID: toolCallID,
CallID: callID,
Name: tc.Name,
Arguments: tc.Arguments,
Status: types.ItemStatusCompleted,
},
}
conv.Lock.Lock()
conv.Items = append(conv.Items, &fcItem)
conv.Lock.Unlock()
outputIndex := i
if hasContent {
outputIndex++
}
sendEvent(t, types.ResponseOutputItemAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: outputIndex,
Item: fcItem,
})
serverSide := session.AssistantExecutor != nil && session.AssistantExecutor.IsTool(tc.Name)
if serverSide {
output, execErr := session.AssistantExecutor.ExecuteTool(ctx, tc.Name, tc.Arguments)
if execErr != nil {
output = "Error: " + execErr.Error()
xlog.Error("realtime: assistant tool execution failed", "tool", tc.Name, "error", execErr)
}
foItem := types.MessageItemUnion{
FunctionCallOutput: &types.MessageItemFunctionCallOutput{
ID: generateItemID(),
CallID: callID,
Output: output,
Status: types.ItemStatusCompleted,
},
}
conv.Lock.Lock()
conv.Items = append(conv.Items, &foItem)
conv.Lock.Unlock()
// Close the call out and emit the output as its own paired
// added/done — the OpenAI spec pairs every item-done with a
// preceding item-added, so we re-pair here for the output.
// The UI renders the transcript entry on item.done for both
// shapes (FunctionCall + FunctionCallOutput).
sendEvent(t, types.ResponseOutputItemDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: outputIndex,
Item: fcItem,
})
sendEvent(t, types.ResponseOutputItemAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: outputIndex,
Item: foItem,
})
sendEvent(t, types.ResponseOutputItemDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: outputIndex,
Item: foItem,
})
executedAssistantTool = true
continue
}
sendEvent(t, types.ResponseFunctionCallArgumentsDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: toolCallID,
OutputIndex: outputIndex,
CallID: callID,
Delta: tc.Arguments,
})
sendEvent(t, types.ResponseFunctionCallArgumentsDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: toolCallID,
OutputIndex: outputIndex,
CallID: callID,
Arguments: tc.Arguments,
Name: tc.Name,
})
sendEvent(t, types.ResponseOutputItemDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: outputIndex,
Item: fcItem,
})
}
sendEvent(t, types.ResponseDoneEvent{
ServerEventBase: types.ServerEventBase{},
Response: types.Response{
ID: responseID,
Object: "realtime.response",
Status: types.ResponseStatusCompleted,
},
})
// If we executed any assistant tools inproc, run another response cycle
// so the model can speak the result. Mirrors the chat-side agentic loop
// but driven server-side rather than by client round-trip. Bounded so a
// degenerate "model keeps calling tools" doesn't blow the stack.
if executedAssistantTool {
if toolTurn+1 >= maxAssistantToolTurns {
xlog.Warn("realtime: assistant tool-turn limit reached, stopping the agentic loop",
"limit", maxAssistantToolTurns, "model", session.Model)
return
}
triggerResponseAtTurn(ctx, session, conv, t, nil, toolTurn+1)
}
}
// Helper functions to generate unique IDs
func generateSessionID() string {
// Generate a unique session ID
// Implement as needed
return "sess_" + generateUniqueID()
}
func generateConversationID() string {
// Generate a unique conversation ID
// Implement as needed
return "conv_" + generateUniqueID()
}
func generateItemID() string {
// Generate a unique item ID
// Implement as needed
return "item_" + generateUniqueID()
}
func generateUniqueID() string {
// 16 random bytes, hex-encoded. Must be collision-free: session, item,
// response and call IDs build on this, and the conversation tracks/removes
// items by ID (e.g. cancel() in realtime_stream.go, conversation.item.retrieve).
// A constant would make every ID alias and corrupt that bookkeeping.
var b [16]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}