diff --git a/.agents/api-endpoints-and-auth.md b/.agents/api-endpoints-and-auth.md index ef2857b8b..77f816849 100644 --- a/.agents/api-endpoints-and-auth.md +++ b/.agents/api-endpoints-and-auth.md @@ -284,7 +284,17 @@ Also bump the expected-length count in `api_instructions_test.go` and add the na ### 3. `capabilities.js` symbol (for new model-config FLAG_* flags) -If your feature needs a new `FLAG_*` usecase flag in `core/config/model_config.go` (so users can filter gallery models by it, and so `/v1/models` surfaces it), also declare the matching symbol in `core/http/react-ui/src/utils/capabilities.js`: +If your feature needs a new `FLAG_*` usecase flag in `core/config/model_config.go` (so users can filter gallery models by it, and so `/v1/models` surfaces it), you need to update **all** of: + +- `Usecase` string constant in `core/config/backend_capabilities.go` +- `UsecaseInfoMap` entry mapping the string to its flag + gRPC method +- `FLAG_` bitmask in `core/config/model_config.go` +- `GetAllModelConfigUsecases()` map entry (otherwise the YAML loader silently ignores the string) +- `ModalityGroups` membership if the flag should affect `IsMultimodal()` (e.g. realtime_audio is in both speech-input and audio-output groups so a lone flag still reads as multimodal) +- `GuessUsecases()` branch listing the backends that own this capability +- `usecaseFilters` in `core/http/routes/ui_api.go` (drives the gallery filter dropdown) +- `Models.jsx` `FILTERS` array + matching `filters.` i18n key in `core/http/react-ui/public/locales/en/models.json` +- `core/http/react-ui/src/utils/capabilities.js`: ```js export const CAP_MY_CAPABILITY = 'FLAG_MY_CAPABILITY' diff --git a/.github/backend-matrix.yml b/.github/backend-matrix.yml index 4aca4185e..44de7b62b 100644 --- a/.github/backend-matrix.yml +++ b/.github/backend-matrix.yml @@ -278,6 +278,19 @@ include: dockerfile: "./backend/Dockerfile.python" context: "./" ubuntu-version: '2404' + - build-type: 'cublas' + cuda-major-version: "12" + cuda-minor-version: "8" + platforms: 'linux/amd64' + tag-latest: 'auto' + tag-suffix: '-gpu-nvidia-cuda-12-liquid-audio' + runs-on: 'ubuntu-latest' + base-image: "ubuntu:24.04" + skip-drivers: 'false' + backend: "liquid-audio" + dockerfile: "./backend/Dockerfile.python" + context: "./" + ubuntu-version: '2404' - build-type: 'cublas' cuda-major-version: "12" cuda-minor-version: "8" @@ -808,6 +821,19 @@ include: dockerfile: "./backend/Dockerfile.python" context: "./" ubuntu-version: '2404' + - build-type: 'cublas' + cuda-major-version: "13" + cuda-minor-version: "0" + platforms: 'linux/amd64' + tag-latest: 'auto' + tag-suffix: '-gpu-nvidia-cuda-13-liquid-audio' + runs-on: 'ubuntu-latest' + base-image: "ubuntu:24.04" + skip-drivers: 'false' + backend: "liquid-audio" + dockerfile: "./backend/Dockerfile.python" + context: "./" + ubuntu-version: '2404' - build-type: 'cublas' cuda-major-version: "13" cuda-minor-version: "0" @@ -1088,6 +1114,19 @@ include: backend: "vibevoice" dockerfile: "./backend/Dockerfile.python" context: "./" + - build-type: 'l4t' + cuda-major-version: "13" + cuda-minor-version: "0" + platforms: 'linux/arm64' + tag-latest: 'auto' + tag-suffix: '-nvidia-l4t-cuda-13-arm64-liquid-audio' + runs-on: 'ubuntu-24.04-arm' + base-image: "ubuntu:24.04" + skip-drivers: 'false' + ubuntu-version: '2404' + backend: "liquid-audio" + dockerfile: "./backend/Dockerfile.python" + context: "./" - build-type: 'l4t' cuda-major-version: "13" cuda-minor-version: "0" @@ -1729,6 +1768,19 @@ include: dockerfile: "./backend/Dockerfile.python" context: "./" ubuntu-version: '2404' + - build-type: 'hipblas' + cuda-major-version: "" + cuda-minor-version: "" + platforms: 'linux/amd64' + tag-latest: 'auto' + tag-suffix: '-gpu-rocm-hipblas-liquid-audio' + runs-on: 'ubuntu-latest' + base-image: "rocm/dev-ubuntu-24.04:7.2.1" + skip-drivers: 'false' + backend: "liquid-audio" + dockerfile: "./backend/Dockerfile.python" + context: "./" + ubuntu-version: '2404' - build-type: 'hipblas' cuda-major-version: "" cuda-minor-version: "" @@ -2177,6 +2229,19 @@ include: dockerfile: "./backend/Dockerfile.python" context: "./" ubuntu-version: '2404' + - build-type: 'intel' + cuda-major-version: "" + cuda-minor-version: "" + platforms: 'linux/amd64' + tag-latest: 'auto' + tag-suffix: '-gpu-intel-liquid-audio' + runs-on: 'ubuntu-latest' + base-image: "intel/oneapi-basekit:2025.3.0-0-devel-ubuntu24.04" + skip-drivers: 'false' + backend: "liquid-audio" + dockerfile: "./backend/Dockerfile.python" + context: "./" + ubuntu-version: '2404' - build-type: 'intel' cuda-major-version: "" cuda-minor-version: "" @@ -3503,6 +3568,20 @@ include: dockerfile: "./backend/Dockerfile.python" context: "./" ubuntu-version: '2404' + - build-type: '' + cuda-major-version: "" + cuda-minor-version: "" + platforms: 'linux/amd64' + platform-tag: 'amd64' + tag-latest: 'auto' + tag-suffix: '-cpu-liquid-audio' + runs-on: 'ubuntu-latest' + base-image: "ubuntu:24.04" + skip-drivers: 'false' + backend: "liquid-audio" + dockerfile: "./backend/Dockerfile.python" + context: "./" + ubuntu-version: '2404' - build-type: '' cuda-major-version: "" cuda-minor-version: "" diff --git a/.github/workflows/test-extra.yml b/.github/workflows/test-extra.yml index 52f33ebde..3fd836574 100644 --- a/.github/workflows/test-extra.yml +++ b/.github/workflows/test-extra.yml @@ -28,6 +28,7 @@ jobs: qwen-asr: ${{ steps.detect.outputs.qwen-asr }} nemo: ${{ steps.detect.outputs.nemo }} voxcpm: ${{ steps.detect.outputs.voxcpm }} + liquid-audio: ${{ steps.detect.outputs.liquid-audio }} llama-cpp-quantization: ${{ steps.detect.outputs.llama-cpp-quantization }} llama-cpp: ${{ steps.detect.outputs.llama-cpp }} ik-llama-cpp: ${{ steps.detect.outputs.ik-llama-cpp }} @@ -447,6 +448,32 @@ jobs: run: | make --jobs=5 --output-sync=target -C backend/python/voxcpm make --jobs=5 --output-sync=target -C backend/python/voxcpm test + # liquid-audio: LFM2.5-Audio any-to-any backend. The CI smoke test + # exercises Health() and LoadModel(mode:finetune) — fine-tune mode + # short-circuits before pulling weights (backend.py:192), so no + # HuggingFace download or GPU is needed. The full-inference path is + # gated on LIQUID_AUDIO_MODEL_ID, which we don't set here. + tests-liquid-audio: + needs: detect-changes + if: needs.detect-changes.outputs.liquid-audio == 'true' || needs.detect-changes.outputs.run-all == 'true' + runs-on: ubuntu-latest + steps: + - name: Clone + uses: actions/checkout@v6 + with: + submodules: true + - name: Dependencies + run: | + sudo apt-get update + sudo apt-get install -y build-essential ffmpeg + sudo apt-get install -y ca-certificates cmake curl patch python3-pip + # Install UV + curl -LsSf https://astral.sh/uv/install.sh | sh + pip install --user --no-cache-dir grpcio-tools==1.64.1 + - name: Test liquid-audio + run: | + make --jobs=5 --output-sync=target -C backend/python/liquid-audio + make --jobs=5 --output-sync=target -C backend/python/liquid-audio test tests-llama-cpp-quantization: needs: detect-changes if: needs.detect-changes.outputs.llama-cpp-quantization == 'true' || needs.detect-changes.outputs.run-all == 'true' diff --git a/Makefile b/Makefile index 488018b0d..ebeef4c41 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # Disable parallel execution for backend builds -.NOTPARALLEL: backends/diffusers backends/llama-cpp backends/turboquant backends/outetts backends/piper backends/stablediffusion-ggml backends/whisper backends/faster-whisper backends/silero-vad backends/local-store backends/huggingface backends/rfdetr backends/insightface backends/speaker-recognition backends/kitten-tts backends/kokoro backends/chatterbox backends/llama-cpp-darwin backends/neutts build-darwin-python-backend build-darwin-go-backend backends/mlx backends/diffuser-darwin backends/mlx-vlm backends/mlx-audio backends/mlx-distributed backends/stablediffusion-ggml-darwin backends/vllm backends/vllm-omni backends/sglang backends/moonshine backends/pocket-tts backends/qwen-tts backends/faster-qwen3-tts backends/qwen-asr backends/nemo backends/voxcpm backends/whisperx backends/ace-step backends/acestep-cpp backends/fish-speech backends/voxtral backends/opus backends/trl backends/llama-cpp-quantization backends/kokoros backends/sam3-cpp backends/qwen3-tts-cpp backends/vibevoice-cpp backends/localvqe backends/tinygrad backends/sherpa-onnx backends/ds4 backends/ds4-darwin +.NOTPARALLEL: backends/diffusers backends/llama-cpp backends/turboquant backends/outetts backends/piper backends/stablediffusion-ggml backends/whisper backends/faster-whisper backends/silero-vad backends/local-store backends/huggingface backends/rfdetr backends/insightface backends/speaker-recognition backends/kitten-tts backends/kokoro backends/chatterbox backends/llama-cpp-darwin backends/neutts build-darwin-python-backend build-darwin-go-backend backends/mlx backends/diffuser-darwin backends/mlx-vlm backends/mlx-audio backends/mlx-distributed backends/stablediffusion-ggml-darwin backends/vllm backends/vllm-omni backends/sglang backends/moonshine backends/pocket-tts backends/qwen-tts backends/faster-qwen3-tts backends/qwen-asr backends/nemo backends/voxcpm backends/whisperx backends/ace-step backends/acestep-cpp backends/fish-speech backends/voxtral backends/opus backends/trl backends/llama-cpp-quantization backends/kokoros backends/sam3-cpp backends/qwen3-tts-cpp backends/vibevoice-cpp backends/localvqe backends/tinygrad backends/sherpa-onnx backends/ds4 backends/ds4-darwin backends/liquid-audio GOCMD=go GOTEST=$(GOCMD) test @@ -463,6 +463,7 @@ prepare-test-extra: protogen-python $(MAKE) -C backend/python/vllm-omni $(MAKE) -C backend/python/sglang $(MAKE) -C backend/python/vibevoice + $(MAKE) -C backend/python/liquid-audio $(MAKE) -C backend/python/moonshine $(MAKE) -C backend/python/pocket-tts $(MAKE) -C backend/python/qwen-tts @@ -488,6 +489,7 @@ test-extra: prepare-test-extra $(MAKE) -C backend/python/vllm test $(MAKE) -C backend/python/vllm-omni test $(MAKE) -C backend/python/vibevoice test + $(MAKE) -C backend/python/liquid-audio test $(MAKE) -C backend/python/moonshine test $(MAKE) -C backend/python/pocket-tts test $(MAKE) -C backend/python/qwen-tts test @@ -1092,6 +1094,7 @@ BACKEND_SGLANG = sglang|python|.|false|true BACKEND_DIFFUSERS = diffusers|python|.|--progress=plain|true BACKEND_CHATTERBOX = chatterbox|python|.|false|true BACKEND_VIBEVOICE = vibevoice|python|.|--progress=plain|true +BACKEND_LIQUID_AUDIO = liquid-audio|python|.|--progress=plain|true BACKEND_MOONSHINE = moonshine|python|.|false|true BACKEND_POCKET_TTS = pocket-tts|python|.|false|true BACKEND_QWEN_TTS = qwen-tts|python|.|false|true @@ -1169,6 +1172,7 @@ $(eval $(call generate-docker-build-target,$(BACKEND_SGLANG))) $(eval $(call generate-docker-build-target,$(BACKEND_DIFFUSERS))) $(eval $(call generate-docker-build-target,$(BACKEND_CHATTERBOX))) $(eval $(call generate-docker-build-target,$(BACKEND_VIBEVOICE))) +$(eval $(call generate-docker-build-target,$(BACKEND_LIQUID_AUDIO))) $(eval $(call generate-docker-build-target,$(BACKEND_MOONSHINE))) $(eval $(call generate-docker-build-target,$(BACKEND_POCKET_TTS))) $(eval $(call generate-docker-build-target,$(BACKEND_QWEN_TTS))) @@ -1197,7 +1201,7 @@ $(eval $(call generate-docker-build-target,$(BACKEND_SHERPA_ONNX))) docker-save-%: backend-images docker save local-ai-backend:$* -o backend-images/$*.tar -docker-build-backends: docker-build-llama-cpp docker-build-ik-llama-cpp docker-build-turboquant docker-build-ds4 docker-build-rerankers docker-build-vllm docker-build-vllm-omni docker-build-sglang docker-build-transformers docker-build-outetts docker-build-diffusers docker-build-kokoro docker-build-faster-whisper docker-build-coqui docker-build-chatterbox docker-build-vibevoice docker-build-moonshine docker-build-pocket-tts docker-build-qwen-tts docker-build-fish-speech docker-build-faster-qwen3-tts docker-build-qwen-asr docker-build-nemo docker-build-voxcpm docker-build-whisperx docker-build-ace-step docker-build-acestep-cpp docker-build-voxtral docker-build-mlx-distributed docker-build-trl docker-build-llama-cpp-quantization docker-build-tinygrad docker-build-kokoros docker-build-sam3-cpp docker-build-qwen3-tts-cpp docker-build-vibevoice-cpp docker-build-localvqe docker-build-insightface docker-build-speaker-recognition docker-build-sherpa-onnx +docker-build-backends: docker-build-llama-cpp docker-build-ik-llama-cpp docker-build-turboquant docker-build-ds4 docker-build-rerankers docker-build-vllm docker-build-vllm-omni docker-build-sglang docker-build-transformers docker-build-outetts docker-build-diffusers docker-build-kokoro docker-build-faster-whisper docker-build-coqui docker-build-chatterbox docker-build-vibevoice docker-build-liquid-audio docker-build-moonshine docker-build-pocket-tts docker-build-qwen-tts docker-build-fish-speech docker-build-faster-qwen3-tts docker-build-qwen-asr docker-build-nemo docker-build-voxcpm docker-build-whisperx docker-build-ace-step docker-build-acestep-cpp docker-build-voxtral docker-build-mlx-distributed docker-build-trl docker-build-llama-cpp-quantization docker-build-tinygrad docker-build-kokoros docker-build-sam3-cpp docker-build-qwen3-tts-cpp docker-build-vibevoice-cpp docker-build-localvqe docker-build-insightface docker-build-speaker-recognition docker-build-sherpa-onnx ######################################################## ### Mock Backend for E2E Tests diff --git a/backend/backend.proto b/backend/backend.proto index 915f5f91e..bf07f3bd4 100644 --- a/backend/backend.proto +++ b/backend/backend.proto @@ -48,6 +48,11 @@ service Backend { rpc AudioTransform(AudioTransformRequest) returns (AudioTransformResult) {} rpc AudioTransformStream(stream AudioTransformFrameRequest) returns (stream AudioTransformFrameResponse) {} + // AudioToAudioStream is the bidirectional any-to-any S2S RPC. Backends + // that load a speech-to-speech model consume input audio frames and emit + // interleaved audio + transcript + tool-call deltas as typed events. + // Backends without S2S support return UNIMPLEMENTED. + rpc AudioToAudioStream(stream AudioToAudioRequest) returns (stream AudioToAudioResponse) {} rpc ModelMetadata(ModelOptions) returns (ModelMetadataResponse) {} @@ -768,6 +773,93 @@ message AudioTransformFrameResponse { int64 frame_index = 2; } +// === AudioToAudioStream messages ========================================= +// +// Bidirectional stream between the LocalAI core and an any-to-any audio +// model. The client opens the stream with a Config payload, then alternates +// Frame (input audio) and Control (turn boundaries, function-call results, +// session updates) payloads. The server streams back typed events: audio +// frames carry PCM in `pcm`; transcript / tool-call deltas carry JSON in +// `meta`; the stream ends with a `response.done` (success) or `error` event. + +message AudioToAudioRequest { + oneof payload { + AudioToAudioConfig config = 1; + AudioToAudioFrame frame = 2; + AudioToAudioControl control = 3; + } +} + +message AudioToAudioConfig { + // PCM format for client→server audio. 0 => backend default + // (16 kHz for the LFM2-Audio Conformer encoder). + int32 input_sample_rate = 1; + // Preferred server→client audio rate. 0 => backend default + // (24 kHz for the LFM2-Audio vocoder). + int32 output_sample_rate = 2; + // Optional system prompt override. Empty => backend chooses based on + // mode (e.g. "Respond with interleaved text and audio."). + string system_prompt = 3; + // Optional baked-voice id. Models that only ship a fixed set of + // voices (e.g. LFM2-Audio: us_male/us_female/uk_male/uk_female) match + // this against their voice table; an empty string keeps the default. + string voice = 4; + // JSON-encoded array of tool definitions in OpenAI Chat Completions + // format. Empty => no tools. + string tools = 5; + // Free-form sampling / decoding parameters (temperature, top_k, + // max_new_tokens, audio_top_k, etc). + map params = 6; + // True => reset any session-scoped state before processing further + // frames on this stream. The first Config implicitly resets. + bool reset = 7; +} + +message AudioToAudioFrame { + // Raw PCM s16le mono at config.input_sample_rate. Empty pcm + end_of_input + // is a valid "user finished speaking" marker without trailing audio. + bytes pcm = 1; + // Marks the last frame of a user turn. The backend may begin emitting + // a response immediately after seeing this. + bool end_of_input = 2; +} + +message AudioToAudioControl { + // Free-form control event names. Initial set: + // "input_audio_buffer.commit" — user finished speaking + // "response.cancel" — abort in-flight generation + // "conversation.item.create" — inject a non-audio item (e.g. + // function_call_output as JSON in + // `payload`) + // "session.update" — re-configure mid-stream + string event = 1; + // Event-specific JSON payload. + bytes payload = 2; +} + +message AudioToAudioResponse { + // Event identifies what this frame carries. Mirrors the OpenAI Realtime + // API server-event names where applicable. Initial set: + // "response.audio.delta" + // "response.audio_transcript.delta" + // "response.function_call_arguments.delta" + // "response.function_call_arguments.done" + // "response.done" + // "error" + string event = 1; + // Populated when event = response.audio.delta. + bytes pcm = 2; + // Populated alongside pcm to identify its rate. 0 => same as the + // session's negotiated output_sample_rate. + int32 sample_rate = 3; + // JSON payload for non-PCM events (transcript chunk, tool args, error + // body). + bytes meta = 4; + // Monotonic per-stream counter, useful for client reordering and + // debugging. + int64 sequence = 5; +} + message ModelMetadataResponse { bool supports_thinking = 1; string rendered_template = 2; // The rendered chat template with enable_thinking=true (empty if not applicable) diff --git a/backend/index.yaml b/backend/index.yaml index d04a2d6d5..a63f054da 100644 --- a/backend/index.yaml +++ b/backend/index.yaml @@ -847,6 +847,35 @@ nvidia-l4t-cuda-12: "nvidia-l4t-vibevoice" nvidia-l4t-cuda-13: "cuda13-nvidia-l4t-arm64-vibevoice" icon: https://avatars.githubusercontent.com/u/6154722?s=200&v=4 +- &liquid-audio + urls: + - https://github.com/Liquid4All/liquid-audio + - https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B + description: | + LiquidAI LFM2 / LFM2.5 Audio Python backend. End-to-end speech-to-speech, ASR, + TTS (4 baked voices), and text chat from a single 1.5B model. Wraps the + upstream `liquid-audio` package; supports fine-tuning via LocalAI's + /v1/fine-tuning/jobs endpoint. + tags: + - speech-to-speech + - any-to-any + - text-to-speech + - speech-to-text + - TTS + - ASR + - realtime + license: LFM-Open-License-v1.0 + name: "liquid-audio" + alias: "liquid-audio" + capabilities: + nvidia: "cuda12-liquid-audio" + intel: "intel-liquid-audio" + amd: "rocm-liquid-audio" + default: "cpu-liquid-audio" + nvidia-cuda-13: "cuda13-liquid-audio" + nvidia-cuda-12: "cuda12-liquid-audio" + nvidia-l4t-cuda-13: "cuda13-nvidia-l4t-arm64-liquid-audio" + icon: https://cdn-avatars.huggingface.co/v1/production/uploads/61b8e2ba285851687028d395/7_6D7rWrLxp2hb6OHSV1p.png - &qwen-tts urls: - https://github.com/QwenLM/Qwen3-TTS @@ -3437,6 +3466,77 @@ uri: "quay.io/go-skynet/local-ai-backends:master-metal-darwin-arm64-vibevoice" mirrors: - localai/localai-backends:master-metal-darwin-arm64-vibevoice +## liquid-audio +- !!merge <<: *liquid-audio + name: "liquid-audio-development" + capabilities: + nvidia: "cuda12-liquid-audio-development" + intel: "intel-liquid-audio-development" + amd: "rocm-liquid-audio-development" + default: "cpu-liquid-audio-development" + nvidia-cuda-13: "cuda13-liquid-audio-development" + nvidia-cuda-12: "cuda12-liquid-audio-development" + nvidia-l4t-cuda-13: "cuda13-nvidia-l4t-arm64-liquid-audio-development" +- !!merge <<: *liquid-audio + name: "cpu-liquid-audio" + uri: "quay.io/go-skynet/local-ai-backends:latest-cpu-liquid-audio" + mirrors: + - localai/localai-backends:latest-cpu-liquid-audio +- !!merge <<: *liquid-audio + name: "cpu-liquid-audio-development" + uri: "quay.io/go-skynet/local-ai-backends:master-cpu-liquid-audio" + mirrors: + - localai/localai-backends:master-cpu-liquid-audio +- !!merge <<: *liquid-audio + name: "cuda12-liquid-audio" + uri: "quay.io/go-skynet/local-ai-backends:latest-gpu-nvidia-cuda-12-liquid-audio" + mirrors: + - localai/localai-backends:latest-gpu-nvidia-cuda-12-liquid-audio +- !!merge <<: *liquid-audio + name: "cuda12-liquid-audio-development" + uri: "quay.io/go-skynet/local-ai-backends:master-gpu-nvidia-cuda-12-liquid-audio" + mirrors: + - localai/localai-backends:master-gpu-nvidia-cuda-12-liquid-audio +- !!merge <<: *liquid-audio + name: "cuda13-liquid-audio" + uri: "quay.io/go-skynet/local-ai-backends:latest-gpu-nvidia-cuda-13-liquid-audio" + mirrors: + - localai/localai-backends:latest-gpu-nvidia-cuda-13-liquid-audio +- !!merge <<: *liquid-audio + name: "cuda13-liquid-audio-development" + uri: "quay.io/go-skynet/local-ai-backends:master-gpu-nvidia-cuda-13-liquid-audio" + mirrors: + - localai/localai-backends:master-gpu-nvidia-cuda-13-liquid-audio +- !!merge <<: *liquid-audio + name: "intel-liquid-audio" + uri: "quay.io/go-skynet/local-ai-backends:latest-gpu-intel-liquid-audio" + mirrors: + - localai/localai-backends:latest-gpu-intel-liquid-audio +- !!merge <<: *liquid-audio + name: "intel-liquid-audio-development" + uri: "quay.io/go-skynet/local-ai-backends:master-gpu-intel-liquid-audio" + mirrors: + - localai/localai-backends:master-gpu-intel-liquid-audio +- !!merge <<: *liquid-audio + name: "rocm-liquid-audio" + uri: "quay.io/go-skynet/local-ai-backends:latest-gpu-rocm-hipblas-liquid-audio" + mirrors: + - localai/localai-backends:latest-gpu-rocm-hipblas-liquid-audio +- !!merge <<: *liquid-audio + name: "rocm-liquid-audio-development" + uri: "quay.io/go-skynet/local-ai-backends:master-gpu-rocm-hipblas-liquid-audio" + mirrors: + - localai/localai-backends:master-gpu-rocm-hipblas-liquid-audio +- !!merge <<: *liquid-audio + name: "cuda13-nvidia-l4t-arm64-liquid-audio" + uri: "quay.io/go-skynet/local-ai-backends:latest-nvidia-l4t-cuda-13-arm64-liquid-audio" + mirrors: + - localai/localai-backends:latest-nvidia-l4t-cuda-13-arm64-liquid-audio +- !!merge <<: *liquid-audio + name: "cuda13-nvidia-l4t-arm64-liquid-audio-development" + uri: "quay.io/go-skynet/local-ai-backends:master-nvidia-l4t-cuda-13-arm64-liquid-audio" + mirrors: + - localai/localai-backends:master-nvidia-l4t-cuda-13-arm64-liquid-audio ## qwen-tts - !!merge <<: *qwen-tts name: "qwen-tts-development" diff --git a/backend/python/liquid-audio/Makefile b/backend/python/liquid-audio/Makefile new file mode 100644 index 000000000..59b71c8c9 --- /dev/null +++ b/backend/python/liquid-audio/Makefile @@ -0,0 +1,23 @@ +.PHONY: liquid-audio +liquid-audio: + bash install.sh + +.PHONY: run +run: liquid-audio + @echo "Running liquid-audio..." + bash run.sh + @echo "liquid-audio run." + +.PHONY: test +test: liquid-audio + @echo "Testing liquid-audio..." + bash test.sh + @echo "liquid-audio tested." + +.PHONY: protogen-clean +protogen-clean: + $(RM) backend_pb2_grpc.py backend_pb2.py + +.PHONY: clean +clean: protogen-clean + rm -rf venv __pycache__ diff --git a/backend/python/liquid-audio/backend.py b/backend/python/liquid-audio/backend.py new file mode 100644 index 000000000..0a64ecb3e --- /dev/null +++ b/backend/python/liquid-audio/backend.py @@ -0,0 +1,871 @@ +#!/usr/bin/env python3 +""" +Liquid Audio backend for LocalAI. + +Wraps LiquidAI's `liquid-audio` Python package (https://github.com/Liquid4All/liquid-audio). +The same model serves four roles, selected by the `mode` option at load time: +chat, asr, tts, s2s. Fine-tuning is exposed via StartFineTune. +""" +from concurrent import futures +import argparse +import json +import os +import queue +import signal +import sys +import threading +import time +import traceback +import uuid + +import grpc + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'common')) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'common')) +from grpc_auth import get_auth_interceptors # noqa: E402 +from python_utils import parse_options # noqa: E402 + +import backend_pb2 # noqa: E402 +import backend_pb2_grpc # noqa: E402 + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +MAX_WORKERS = int(os.environ.get('PYTHON_GRPC_MAX_WORKERS', '1')) + +# Voice id → system-prompt suffix. The model only ships these four voices. +VOICE_PROMPTS = { + "us_male": "Perform TTS. Use the US male voice.", + "us_female": "Perform TTS. Use the US female voice.", + "uk_male": "Perform TTS. Use the UK male voice.", + "uk_female": "Perform TTS. Use the UK female voice.", +} +DEFAULT_VOICE = "us_female" + +# Special-token IDs that LFM2-Audio emits to delimit modality boundaries. +# Sourced from liquid_audio/model/lfm2_audio.py (see generate_sequential/_sample_*). +TEXT_END_TOKEN = 130 # <|text_end|> +AUDIO_START_TOKEN = 128 # <|audio_start|> +IM_END_TOKEN = 7 # <|im_end|> +AUDIO_EOS_CODE = 2048 # signals end-of-audio in any codebook position + +_PATCHED_LOCAL_PATHS = False + + +def _patch_liquid_audio_local_paths(): + """Make liquid_audio.utils.get_model_dir() tolerate local directories. + + Upstream always passes its argument to huggingface_hub.snapshot_download, + which only accepts `owner/repo` ids. LocalAI's gallery hands us absolute + paths under //, so we intercept snapshot_download + in the liquid_audio.utils namespace and return the directory as-is when + it already exists on disk. Idempotent. + """ + global _PATCHED_LOCAL_PATHS + if _PATCHED_LOCAL_PATHS: + return + import liquid_audio.utils as _la_utils + _orig_snapshot_download = _la_utils.snapshot_download + + def _local_first_snapshot_download(repo_id, revision=None, **kwargs): + if isinstance(repo_id, (str, os.PathLike)) and os.path.isdir(str(repo_id)): + return str(repo_id) + return _orig_snapshot_download(repo_id, revision=revision, **kwargs) + + _la_utils.snapshot_download = _local_first_snapshot_download + _PATCHED_LOCAL_PATHS = True + + +def _select_device(): + import torch + if torch.cuda.is_available(): + return "cuda" + if hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): + return "mps" + return "cpu" + + +class ActiveJob: + """Tracks an in-flight fine-tune so FineTuneProgress can stream from its queue.""" + + def __init__(self, job_id): + self.job_id = job_id + self.progress_queue = queue.Queue() + self.thread = None + self.stopped = False + self.completed = False + self.error = None + + +class BackendServicer(backend_pb2_grpc.BackendServicer): + def __init__(self): + self.processor = None + self.model = None + self.device = "cpu" + self.dtype = None + self.options = {} + self.model_id = None + self.active_job = None + + @property + def mode(self): + return str(self.options.get("mode", "chat")).lower() + + @property + def voice(self): + v = str(self.options.get("voice", DEFAULT_VOICE)).lower() + return v if v in VOICE_PROMPTS else DEFAULT_VOICE + + + def Free(self, request, context): + # Called by LocalAI when unloading the model. Drop GPU tensors so the + # next load starts from a clean state instead of bumping into OOM. + try: + for attr in ("model", "processor", "tokenizer"): + if hasattr(self, attr): + try: + delattr(self, attr) + except Exception: + pass + import gc + gc.collect() + try: + import torch + if torch.cuda.is_available(): + torch.cuda.empty_cache() + except Exception: + pass + return backend_pb2.Result(success=True, message="OK") + except Exception as exc: + print(f"Free failed: {exc}", file=sys.stderr) + return backend_pb2.Result(success=False, message=str(exc)) + + + def Health(self, request, context): + return backend_pb2.Reply(message=bytes("OK", 'utf-8')) + + + def LoadModel(self, request, context): + try: + import torch + + self.options = parse_options(request.Options) + if self.options.get("voice") and self.options["voice"] not in VOICE_PROMPTS: + print(f"Warning: unknown voice '{self.options['voice']}'; defaulting to '{DEFAULT_VOICE}'", + file=sys.stderr) + + requested_device = self.options.get("device") + self.device = requested_device or _select_device() + if self.device == "cuda" and not torch.cuda.is_available(): + return backend_pb2.Result(success=False, message="CUDA requested but not available") + if self.device == "mps" and not (hasattr(torch.backends, "mps") and + torch.backends.mps.is_available()): + print("MPS not available; falling back to CPU", file=sys.stderr) + self.device = "cpu" + + dtype_name = str(self.options.get("dtype", "bfloat16")).lower() + self.dtype = { + "bfloat16": torch.bfloat16, + "bf16": torch.bfloat16, + "float16": torch.float16, + "fp16": torch.float16, + "half": torch.float16, + "float32": torch.float32, + "fp32": torch.float32, + }.get(dtype_name, torch.bfloat16) + + # request.Model holds the raw `parameters.model` value (an HF + # repo id like "LiquidAI/LFM2.5-Audio-1.5B"); request.ModelFile + # is LocalAI's ModelPath-prefixed local copy that exists only + # when the gallery supplied a `files:` list. Mirror the + # transformers/vibevoice convention: prefer the repo id and + # only switch to the local path if it's been staged on disk. + model_id = request.Model + if not model_id: + model_id = request.ModelFile + if not model_id: + return backend_pb2.Result(success=False, message="No model identifier provided") + if request.ModelFile and os.path.isdir(request.ModelFile): + model_id = request.ModelFile + self.model_id = model_id + + # Pure fine-tune jobs don't need an in-memory inference model — the + # Trainer instantiates its own copy at StartFineTune time. + if self.mode == "finetune": + print(f"Loaded liquid-audio backend in fine-tune mode (model id: {model_id})", + file=sys.stderr) + return backend_pb2.Result(success=True, message="OK") + + from liquid_audio import LFM2AudioModel, LFM2AudioProcessor + + # liquid_audio's from_pretrained unconditionally routes through + # huggingface_hub.snapshot_download, which rejects local paths + # (HFValidationError on `/models/LiquidAI/LFM2.5-Audio-1.5B`). + # When LocalAI's gallery has already staged the weights on disk, + # short-circuit the download to return the local directory. + _patch_liquid_audio_local_paths() + + print(f"Loading liquid-audio model '{model_id}' on {self.device} ({self.dtype})", + file=sys.stderr) + self.processor = LFM2AudioProcessor.from_pretrained(model_id, device=self.device).eval() + self.model = LFM2AudioModel.from_pretrained( + model_id, device=self.device, dtype=self.dtype + ).eval() + + print(f"Liquid-audio mode={self.mode}, voice={self.voice}", file=sys.stderr) + return backend_pb2.Result(success=True, message="OK") + + except Exception as exc: + print(f"LoadModel failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + return backend_pb2.Result(success=False, message=str(exc)) + + + def Predict(self, request, context): + try: + text = "".join(self._generate_text_stream(request)) + return backend_pb2.Reply(message=text.encode("utf-8")) + except Exception as exc: + print(f"Predict failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(exc)) + return backend_pb2.Reply() + + def PredictStream(self, request, context): + try: + for delta in self._generate_text_stream(request): + yield backend_pb2.Reply(message=delta.encode("utf-8")) + except Exception as exc: + print(f"PredictStream failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(exc)) + + + def VAD(self, request, context): + # Stub voice-activity detector: RMS-energy threshold over 30ms frames at + # 16 kHz. Good enough for the realtime endpoint's handleVAD loop, which + # only inspects segment presence + last segment end. The proper signal + # would come from the model's audio encoder, but that ride-along is a + # PR-D scope item — until then this keeps the legacy pipeline path + # working without forcing the operator to install a separate VAD model. + import numpy as np + try: + audio = np.asarray(request.audio, dtype=np.float32) + if audio.size == 0: + return backend_pb2.VADResponse(segments=[]) + + sample_rate = 16000 + frame_size = sample_rate * 30 // 1000 # 30ms → 480 samples + threshold = float(self.options.get("vad_rms_threshold", 0.01)) + min_speech_frames = int(self.options.get("vad_min_speech_frames", 2)) # ≥60ms + # handleVAD ticks every 300 ms and only inspects segment presence + # + last segment end relative to silence_threshold (~500 ms). Cap + # the analysed window to the tail of the buffer so we don't redo + # the entire growing utterance every tick. + window_s = float(self.options.get("vad_window_s", 5.0)) + window_samples = int(window_s * sample_rate) + time_offset_s = 0.0 + if audio.size > window_samples: + time_offset_s = (audio.size - window_samples) / sample_rate + audio = audio[-window_samples:] + + n_frames = audio.size // frame_size + if n_frames == 0: + return backend_pb2.VADResponse(segments=[]) + frames = audio[: n_frames * frame_size].reshape(n_frames, frame_size) + rms = np.sqrt(np.mean(frames ** 2, axis=1)) + speech = rms > threshold + + def _emit(start_idx, end_idx, out): + if end_idx - start_idx >= min_speech_frames: + out.append(backend_pb2.VADSegment( + start=time_offset_s + start_idx * frame_size / sample_rate, + end=time_offset_s + end_idx * frame_size / sample_rate, + )) + + segments = [] + start_idx = None + for i, is_speech in enumerate(speech): + if is_speech and start_idx is None: + start_idx = i + elif not is_speech and start_idx is not None: + _emit(start_idx, i, segments) + start_idx = None + if start_idx is not None: + _emit(start_idx, n_frames, segments) + return backend_pb2.VADResponse(segments=segments) + except Exception as exc: + print(f"VAD failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(exc)) + return backend_pb2.VADResponse(segments=[]) + + + def TTS(self, request, context): + try: + if self.model is None or self.processor is None: + return backend_pb2.Result(success=False, message="Model not loaded") + + import torch + import torchaudio + from liquid_audio import ChatState + + voice = request.voice.lower() if request.voice else self.voice + voice = voice.removeprefix("lfm2:").removeprefix("lfm:") + if voice not in VOICE_PROMPTS: + voice = self.voice + system_prompt = VOICE_PROMPTS[voice] + + chat = ChatState(self.processor) + chat.new_turn("system") + chat.add_text(system_prompt) + chat.end_turn() + chat.new_turn("user") + chat.add_text(request.text or "") + chat.end_turn() + chat.new_turn("assistant") + + audio_top_k = int(self.options.get("audio_top_k", 64)) + audio_temp = float(self.options.get("audio_temperature", 0.8)) + max_new = int(self.options.get("max_new_tokens", 2048)) + + audio_out = [] + for tok in self.model.generate_sequential( + **chat, + max_new_tokens=max_new, + audio_temperature=audio_temp, + audio_top_k=audio_top_k, + ): + if tok.numel() > 1: + audio_out.append(tok) + + if len(audio_out) <= 1: + return backend_pb2.Result(success=False, message="No audio frames generated") + + # Drop the trailing end-of-audio frame, matching the package's examples. + audio_codes = torch.stack(audio_out[:-1], 1).unsqueeze(0) + waveform = self.processor.decode(audio_codes) + + out_path = request.dst + if not out_path: + return backend_pb2.Result(success=False, message="dst path is required") + os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) + # soundfile in preference to torchaudio.save — the latter routes + # through torchcodec, whose native libs need NVIDIA NPP that we + # don't bundle in the cuda13 image. + import soundfile as _sf + _sf.write(out_path, waveform.cpu().numpy().squeeze(0).T, 24_000) + + return backend_pb2.Result(success=True) + except Exception as exc: + print(f"TTS failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + return backend_pb2.Result(success=False, message=str(exc)) + + + def AudioToAudioStream(self, request_iterator, context): + """Bidirectional any-to-any speech-to-speech stream. + + See `backend.proto` AudioToAudioStream for the wire protocol. Audio + is decoded once per turn here; chunked detokenization for sub-second + TTFB is left to a future iteration once the LFM2AudioDetokenizer + gains a streaming entry point. + """ + try: + yield from self._audio_to_audio_stream(request_iterator, context) + except Exception as exc: + print(f"AudioToAudioStream failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + yield backend_pb2.AudioToAudioResponse( + event="error", + meta=json.dumps({"message": str(exc)}).encode("utf-8"), + ) + + def _audio_to_audio_stream(self, request_iterator, context): + if self.model is None or self.processor is None: + raise RuntimeError("Model not loaded") + + import torch + import torchaudio + from liquid_audio import ChatState + + cfg = None + chat = None + input_sample_rate = 16000 + output_sample_rate = 24000 + sequence = 0 + + def _new_event(event, **kwargs): + nonlocal sequence + sequence += 1 + kwargs.setdefault("sequence", sequence) + return backend_pb2.AudioToAudioResponse(event=event, **kwargs) + + def _ensure_chat(): + """Build a fresh ChatState seeded with the system prompt.""" + nonlocal chat + chat = ChatState(self.processor) + system_prompt = (cfg.system_prompt if cfg and cfg.system_prompt + else "Respond with interleaved text and audio.") + chat.new_turn("system") + chat.add_text(system_prompt) + chat.end_turn() + + # Buffers for the in-flight user turn + pcm_buffer = bytearray() + + def _consume_user_turn(): + nonlocal pcm_buffer + if not pcm_buffer: + return + # Avoid the bytes(pcm_buffer) copy and let the float widen happen + # in-place: numpy view → torch view → in-place divide. + import numpy as np + arr = np.frombuffer(memoryview(pcm_buffer), dtype=np.int16) + wav = torch.from_numpy(arr).to(torch.float32).div_(32768.0).unsqueeze(0) + chat.new_turn("user") + chat.add_audio(wav, input_sample_rate) + chat.end_turn() + pcm_buffer = bytearray() + + def _run_generation(): + """Run generate_interleaved; yield response events as we go.""" + chat.new_turn("assistant") + audio_top_k = int(self.options.get("audio_top_k", 4)) + audio_temp = float(self.options.get("audio_temperature", 1.0)) + text_top_k = int(self.options.get("text_top_k", 0)) or None + text_temp = float(self.options.get("text_temperature", 0)) or None + max_new = int(self.options.get("max_new_tokens", 512)) + + audio_tokens = [] + for tok in self.model.generate_interleaved( + **chat, + max_new_tokens=max_new, + text_temperature=text_temp, + text_top_k=text_top_k, + audio_temperature=audio_temp, + audio_top_k=audio_top_k, + ): + if tok.numel() == 1: + if tok.item() == IM_END_TOKEN: + break + text = self.processor.text.decode(tok) + if not text: + continue + yield _new_event( + "response.audio_transcript.delta", + meta=json.dumps({"delta": text}).encode("utf-8"), + ) + else: + audio_tokens.append(tok) + + # Detokenize the accumulated audio at end-of-turn — the + # LFM2AudioDetokenizer is non-streaming today. + if len(audio_tokens) > 1: + audio_codes = torch.stack(audio_tokens[:-1], 1).unsqueeze(0) + waveform = self.processor.decode(audio_codes) + # Convert to s16le PCM bytes at output_sample_rate + if output_sample_rate != 24000: + waveform = torchaudio.functional.resample( + waveform.cpu(), 24000, output_sample_rate + ) + pcm = (waveform.cpu().squeeze(0).clamp(-1, 1) * 32767.0).to( + torch.int16 + ).numpy().tobytes() + yield _new_event( + "response.audio.delta", + pcm=pcm, + sample_rate=output_sample_rate, + ) + + yield _new_event("response.done", meta=b"{}") + + for req in request_iterator: + if not context.is_active(): + return + payload = req.WhichOneof("payload") + if payload == "config": + cfg = req.config + if cfg.input_sample_rate > 0: + input_sample_rate = cfg.input_sample_rate + if cfg.output_sample_rate > 0: + output_sample_rate = cfg.output_sample_rate + # The first config implicitly resets state. + _ensure_chat() + pcm_buffer = bytearray() + elif payload == "frame": + if chat is None: + _ensure_chat() + if req.frame.pcm: + pcm_buffer.extend(req.frame.pcm) + if req.frame.end_of_input: + _consume_user_turn() + yield from _run_generation() + elif payload == "control": + event = req.control.event + if event == "input_audio_buffer.commit": + _consume_user_turn() + yield from _run_generation() + elif event == "response.cancel": + # Synchronous generation here means cancel can only + # take effect between turns; we ack so the client unblocks. + yield _new_event("response.done", meta=b'{"cancelled":true}') + elif event == "session.update": + # Free-form session re-config; treat as a soft reset. + _ensure_chat() + pcm_buffer = bytearray() + # Unknown events are ignored — forward-compatible. + + + def AudioTranscription(self, request, context): + try: + if self.model is None or self.processor is None: + return backend_pb2.TranscriptResult(segments=[], text="") + + import torchaudio + from liquid_audio import ChatState + + audio_path = request.dst + if not audio_path: + return backend_pb2.TranscriptResult(segments=[], text="") + + chat = ChatState(self.processor) + chat.new_turn("system") + chat.add_text("Perform ASR.") + chat.end_turn() + chat.new_turn("user") + # soundfile in preference to torchaudio.load — the latter routes + # through torchcodec which needs NVIDIA NPP libs we don't bundle. + import soundfile as _sf + import torch + audio_np, sr = _sf.read(audio_path, dtype="float32", always_2d=True) + wav = torch.from_numpy(audio_np.T) # (channels, samples) + if wav.shape[0] > 1: + # Down-mix to mono — the processor expects a single channel + wav = wav.mean(dim=0, keepdim=True) + chat.add_audio(wav, sr) + chat.end_turn() + chat.new_turn("assistant") + + max_new = int(self.options.get("max_new_tokens", 1024)) + + pieces = [] + for tok in self.model.generate_sequential(**chat, max_new_tokens=max_new): + if tok.numel() == 1: + if tok.item() == IM_END_TOKEN: + break + pieces.append(self.processor.text.decode(tok)) + + text = "".join(pieces).strip() + duration_ms = int((wav.shape[1] / sr) * 1000) + segment = backend_pb2.TranscriptSegment( + id=0, start=0, end=duration_ms, text=text, tokens=[], + ) + return backend_pb2.TranscriptResult(segments=[segment], text=text) + except Exception as exc: + print(f"AudioTranscription failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + return backend_pb2.TranscriptResult(segments=[], text="") + + + def StartFineTune(self, request, context): + if self.active_job is not None and not self.active_job.completed: + return backend_pb2.FineTuneJobResult( + job_id="", success=False, + message="A fine-tuning job is already running", + ) + + job_id = request.job_id or str(uuid.uuid4()) + job = ActiveJob(job_id) + self.active_job = job + + thread = threading.Thread(target=self._run_training, args=(request, job), daemon=True) + job.thread = thread + thread.start() + + return backend_pb2.FineTuneJobResult( + job_id=job_id, success=True, message="Training started", + ) + + def FineTuneProgress(self, request, context): + if self.active_job is None or self.active_job.job_id != request.job_id: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(f"Job {request.job_id} not found") + return + + job = self.active_job + while True: + try: + update = job.progress_queue.get(timeout=1.0) + except queue.Empty: + if job.completed or job.stopped: + break + if not context.is_active(): + break + continue + if update is None: + break + yield update + if update.status in ("completed", "failed", "stopped"): + break + + def StopFineTune(self, request, context): + # We can't kill the Accelerate training loop mid-step cleanly from here; + # LocalAI's job manager kills the backend process on stop. The flag below + # at least lets the progress stream terminate quickly. + if self.active_job is not None and self.active_job.job_id == request.job_id: + self.active_job.stopped = True + self.active_job.progress_queue.put(None) + return backend_pb2.Result(success=True, message="OK") + + def _run_training(self, request, job): + try: + self._do_train(request, job) + job.completed = True + job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, status="completed", message="Training completed", + progress_percent=100.0, + )) + except Exception as exc: + job.error = str(exc) + job.completed = True + print(f"Training failed: {exc}", file=sys.stderr) + print(traceback.format_exc(), file=sys.stderr) + job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, status="failed", message=str(exc), + )) + finally: + job.progress_queue.put(None) + + def _do_train(self, request, job): + from liquid_audio import LFM2AudioModel # noqa: F401 (sanity import) + from liquid_audio.data.dataloader import LFM2DataLoader + from liquid_audio.trainer import Trainer + + model_id = request.model or self.model_id or "LiquidAI/LFM2.5-Audio-1.5B" + + dataset_path = request.dataset_source + if not dataset_path: + raise ValueError("dataset_source is required (path to a preprocessed dataset)") + + extras = dict(request.extra_options) if request.extra_options else {} + val_path = extras.get("val_dataset") + + # Map FineTuneRequest hyperparameters to liquid_audio.Trainer constructor args + lr = request.learning_rate or 3e-5 + max_steps = request.max_steps or 1000 + warmup_steps = request.warmup_steps or min(100, max_steps // 10) + batch_size = request.batch_size or 16 + save_interval = request.save_steps or max(1, max_steps // 4) + + output_dir = request.output_dir or os.path.join( + os.environ.get("LIQUID_AUDIO_OUTPUT_DIR", "/tmp"), + f"liquid-audio-{job.job_id}", + ) + os.makedirs(output_dir, exist_ok=True) + + job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, status="loading_dataset", + message=f"Loading preprocessed dataset from {dataset_path}", + )) + train_data = LFM2DataLoader(dataset_path) + val_data = LFM2DataLoader(val_path) if val_path else None + + job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, status="loading_model", + message=f"Loading base model {model_id}", + )) + + # The Liquid Trainer logs via self.accelerator.print; we subclass it to + # also push progress events onto the queue every logging_interval steps. + progress_q = job.progress_queue + + class QueuedTrainer(Trainer): + def log(self_, model_output): + if self_.step > 0 and self_.step % self_.logging_interval == 0: + try: + loss = self_.accelerator.reduce( + model_output.loss.detach(), reduction="mean" + ).item() + except Exception: + loss = float("nan") + lr_now = self_.optimizer.param_groups[0]["lr"] + pct = (self_.step / self_.max_steps * 100.0) if self_.max_steps else 0.0 + progress_q.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, + current_step=int(self_.step), + total_steps=int(self_.max_steps), + current_epoch=float(self_.epoch), + loss=float(loss), + learning_rate=float(lr_now), + progress_percent=float(pct), + status="training", + )) + # Honour stop requests: raising here terminates the loop cleanly + if job.stopped: + raise KeyboardInterrupt("stop requested") + return super().log(model_output) + + def validate(self_): + progress_q.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, current_step=int(self_.step), + total_steps=int(self_.max_steps), status="training", + message=f"Running validation at step {self_.step}", + )) + return super().validate() + + trainer = QueuedTrainer( + model_id=model_id, + train_data=train_data, + val_data=val_data, + lr=lr, + max_steps=max_steps, + warmup_steps=warmup_steps, + batch_size=batch_size, + save_interval=save_interval, + output_dir=output_dir, + weight_decay=request.weight_decay or 0.1, + ) + + job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, status="training", message="Training started", + total_steps=int(max_steps), + )) + trainer.train() + + job.progress_queue.put(backend_pb2.FineTuneProgressUpdate( + job_id=job.job_id, status="saving", + message=f"Saved final model to {output_dir}", + checkpoint_path=os.path.join(output_dir, "final"), + )) + + + def _build_chat_state(self, messages, user_prompt, tools_prelude=None): + """Build a ChatState from a list of (role, content) tuples plus an optional final user turn. + + tools_prelude, when non-empty, is prepended as an extra system turn carrying + the LFM2 tool-list block — mirrors gallery/lfm.yaml's `function:` template + so the model sees the same prompt shape whether served via llama-cpp or here. + """ + from liquid_audio import ChatState + chat = ChatState(self.processor) + if tools_prelude: + chat.new_turn("system") + chat.add_text(tools_prelude) + chat.end_turn() + for role, content in messages: + chat.new_turn(role) + chat.add_text(content) + chat.end_turn() + if user_prompt: + chat.new_turn("user") + chat.add_text(user_prompt) + chat.end_turn() + chat.new_turn("assistant") + return chat + + def _collect_messages(self, request): + """Translate PredictOptions.Messages into (role, content) tuples.""" + out = [] + for m in request.Messages: + role = (m.role or "user").lower() + if role not in ("system", "user", "assistant"): + role = "user" + out.append((role, m.content or "")) + return out + + def _render_tools_prelude(self, request): + """Build the LFM2 `<|tool_list_start|>…<|tool_list_end|>` system prelude + from request.Tools (OpenAI Chat-Completions tool JSON). Returns "" when + no tools are attached. Output mirrors gallery/lfm.yaml's `function:` + template so the model sees the same prompt whether routed via llama-cpp + or this backend.""" + tools_raw = getattr(request, "Tools", "") or "" + if not tools_raw: + return "" + try: + tools = json.loads(tools_raw) + except json.JSONDecodeError: + print(f"liquid-audio: ignoring malformed Tools JSON: {tools_raw[:200]!r}", + file=sys.stderr) + return "" + if not isinstance(tools, list) or not tools: + return "" + # The LFM2 chat template uses single-quoted Python-dict-ish syntax in + # examples, but the tokenizer treats this whole block as opaque text; + # JSON works fine and is what other backends emit. + return ( + "You are a function calling AI model. You are provided with functions to " + "execute. You may call one or more functions to assist with the user query. " + "Don't make assumptions about what values to plug into functions.\n" + "List of tools: <|tool_list_start|>" + + json.dumps(tools, separators=(",", ":")) + + "<|tool_list_end|>" + ) + + def _generate_text_stream(self, request): + """Yield text-only deltas from generate_sequential. Caller joins for unary Predict.""" + if self.model is None or self.processor is None: + raise RuntimeError("Model not loaded") + messages = self._collect_messages(request) + user_prompt = request.Prompt or None + tools_prelude = self._render_tools_prelude(request) + # If the request already carries Messages, Prompt is the templated form + # of the same content — don't append a duplicate user turn. + chat = self._build_chat_state( + messages, + user_prompt if not messages else None, + tools_prelude=tools_prelude, + ) + + max_new = request.Tokens if request.Tokens > 0 else int(self.options.get("max_new_tokens", 512)) + temperature = request.Temperature if request.Temperature > 0 else None + top_k = request.TopK if request.TopK > 0 else None + + for tok in self.model.generate_sequential( + **chat, + max_new_tokens=max_new, + text_temperature=temperature, + text_top_k=top_k, + ): + if tok.numel() == 1: + if tok.item() == IM_END_TOKEN: + break + yield self.processor.text.decode(tok) + + +def serve(address): + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=MAX_WORKERS), + options=[ + ('grpc.max_message_length', 50 * 1024 * 1024), + ('grpc.max_send_message_length', 50 * 1024 * 1024), + ('grpc.max_receive_message_length', 50 * 1024 * 1024), + ], + interceptors=get_auth_interceptors(), + ) + backend_pb2_grpc.add_BackendServicer_to_server(BackendServicer(), server) + server.add_insecure_port(address) + server.start() + print(f"Liquid-audio backend listening on {address}", file=sys.stderr, flush=True) + + def stop(_signum, _frame): + server.stop(0) + sys.exit(0) + + signal.signal(signal.SIGTERM, stop) + signal.signal(signal.SIGINT, stop) + + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Liquid Audio gRPC backend") + parser.add_argument("--addr", default="localhost:50051", help="gRPC server address") + args = parser.parse_args() + serve(args.addr) diff --git a/backend/python/liquid-audio/install.sh b/backend/python/liquid-audio/install.sh new file mode 100755 index 000000000..c7ed8eaa8 --- /dev/null +++ b/backend/python/liquid-audio/install.sh @@ -0,0 +1,18 @@ +#!/bin/bash +set -e + +# liquid-audio requires Python ≥ 3.12 (per its pyproject.toml); the default +# portable Python in libbackend.sh is 3.10. Override before sourcing. +export PYTHON_VERSION="${PYTHON_VERSION:-3.12}" +export PYTHON_PATCH="${PYTHON_PATCH:-11}" + +backend_dir=$(dirname $0) +if [ -d $backend_dir/common ]; then + source $backend_dir/common/libbackend.sh +else + source $backend_dir/../common/libbackend.sh +fi + +# liquid-audio's torch wheels are large; allow upgrades to satisfy transitive pins +EXTRA_PIP_INSTALL_FLAGS+=" --upgrade --index-strategy=unsafe-first-match" +installRequirements diff --git a/backend/python/liquid-audio/protogen.sh b/backend/python/liquid-audio/protogen.sh new file mode 100755 index 000000000..df3325c6f --- /dev/null +++ b/backend/python/liquid-audio/protogen.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e + +backend_dir=$(dirname $0) +if [ -d $backend_dir/common ]; then + source $backend_dir/common/libbackend.sh +else + source $backend_dir/../common/libbackend.sh +fi + +runProtogen diff --git a/backend/python/liquid-audio/requirements-cpu.txt b/backend/python/liquid-audio/requirements-cpu.txt new file mode 100644 index 000000000..c2fee25e3 --- /dev/null +++ b/backend/python/liquid-audio/requirements-cpu.txt @@ -0,0 +1,13 @@ +--extra-index-url https://download.pytorch.org/whl/cpu +torch>=2.8.0 +torchaudio>=2.8.0 +torchcodec>=0.9.1 +transformers>=4.55.4 +accelerate>=1.10.1 +datasets>=4.8.4 +einops>=0.8.1 +librosa>=0.11.0 +soundfile>=0.12.1 +sentencepiece>=0.2.1 +huggingface-hub>=1.3.0 +liquid-audio>=1.2.0 diff --git a/backend/python/liquid-audio/requirements-cublas12.txt b/backend/python/liquid-audio/requirements-cublas12.txt new file mode 100644 index 000000000..8f1a965bb --- /dev/null +++ b/backend/python/liquid-audio/requirements-cublas12.txt @@ -0,0 +1,13 @@ +--extra-index-url https://download.pytorch.org/whl/cu121 +torch>=2.8.0 +torchaudio>=2.8.0 +torchcodec>=0.9.1 +transformers>=4.55.4 +accelerate>=1.10.1 +datasets>=4.8.4 +einops>=0.8.1 +librosa>=0.11.0 +soundfile>=0.12.1 +sentencepiece>=0.2.1 +huggingface-hub>=1.3.0 +liquid-audio>=1.2.0 diff --git a/backend/python/liquid-audio/requirements-cublas13.txt b/backend/python/liquid-audio/requirements-cublas13.txt new file mode 100644 index 000000000..4ab768158 --- /dev/null +++ b/backend/python/liquid-audio/requirements-cublas13.txt @@ -0,0 +1,13 @@ +--extra-index-url https://download.pytorch.org/whl/cu130 +torch>=2.8.0 +torchaudio>=2.8.0 +torchcodec>=0.9.1 +transformers>=4.55.4 +accelerate>=1.10.1 +datasets>=4.8.4 +einops>=0.8.1 +librosa>=0.11.0 +soundfile>=0.12.1 +sentencepiece>=0.2.1 +huggingface-hub>=1.3.0 +liquid-audio>=1.2.0 diff --git a/backend/python/liquid-audio/requirements-hipblas.txt b/backend/python/liquid-audio/requirements-hipblas.txt new file mode 100644 index 000000000..304f934b1 --- /dev/null +++ b/backend/python/liquid-audio/requirements-hipblas.txt @@ -0,0 +1,13 @@ +--extra-index-url https://download.pytorch.org/whl/rocm7.0 +torch>=2.8.0 +torchaudio>=2.8.0 +torchcodec>=0.9.1 +transformers>=4.55.4 +accelerate>=1.10.1 +datasets>=4.8.4 +einops>=0.8.1 +librosa>=0.11.0 +soundfile>=0.12.1 +sentencepiece>=0.2.1 +huggingface-hub>=1.3.0 +liquid-audio>=1.2.0 diff --git a/backend/python/liquid-audio/requirements-l4t13.txt b/backend/python/liquid-audio/requirements-l4t13.txt new file mode 100644 index 000000000..7ecc7203d --- /dev/null +++ b/backend/python/liquid-audio/requirements-l4t13.txt @@ -0,0 +1,13 @@ +--extra-index-url https://pypi.jetson-ai-lab.io/jp7/cu130 +torch>=2.8.0 +torchaudio>=2.8.0 +torchcodec>=0.9.1 +transformers>=4.55.4 +accelerate>=1.10.1 +datasets>=4.8.4 +einops>=0.8.1 +librosa>=0.11.0 +soundfile>=0.12.1 +sentencepiece>=0.2.1 +huggingface-hub>=1.3.0 +liquid-audio>=1.2.0 diff --git a/backend/python/liquid-audio/requirements-mps.txt b/backend/python/liquid-audio/requirements-mps.txt new file mode 100644 index 000000000..f57687f29 --- /dev/null +++ b/backend/python/liquid-audio/requirements-mps.txt @@ -0,0 +1,12 @@ +torch>=2.8.0 +torchaudio>=2.8.0 +torchcodec>=0.9.1 +transformers>=4.55.4 +accelerate>=1.10.1 +datasets>=4.8.4 +einops>=0.8.1 +librosa>=0.11.0 +soundfile>=0.12.1 +sentencepiece>=0.2.1 +huggingface-hub>=1.3.0 +liquid-audio>=1.2.0 diff --git a/backend/python/liquid-audio/requirements.txt b/backend/python/liquid-audio/requirements.txt new file mode 100644 index 000000000..0834a8fcd --- /dev/null +++ b/backend/python/liquid-audio/requirements.txt @@ -0,0 +1,3 @@ +grpcio==1.78.1 +protobuf +certifi diff --git a/backend/python/liquid-audio/run.sh b/backend/python/liquid-audio/run.sh new file mode 100755 index 000000000..bd17c6e1d --- /dev/null +++ b/backend/python/liquid-audio/run.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +backend_dir=$(dirname $0) +if [ -d $backend_dir/common ]; then + source $backend_dir/common/libbackend.sh +else + source $backend_dir/../common/libbackend.sh +fi + +startBackend $@ diff --git a/backend/python/liquid-audio/test.py b/backend/python/liquid-audio/test.py new file mode 100644 index 000000000..450ce3156 --- /dev/null +++ b/backend/python/liquid-audio/test.py @@ -0,0 +1,89 @@ +"""Smoke tests for the liquid-audio backend. + +These run without contacting HuggingFace or loading model weights: +they only verify that the gRPC service starts and Health() responds. + +To run an end-to-end inference test, set LIQUID_AUDIO_MODEL_ID +(e.g. "LiquidAI/LFM2.5-Audio-1.5B") in the environment — see test_inference(). +""" +import os +import subprocess +import sys +import time +import unittest + +import grpc + +# Ensure generated protobuf stubs are importable +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import backend_pb2 +import backend_pb2_grpc + + +class TestBackend(unittest.TestCase): + @classmethod + def setUpClass(cls): + addr = os.environ.get("LIQUID_AUDIO_TEST_ADDR", "localhost:50053") + cls.addr = addr + cls.server = subprocess.Popen( + [sys.executable, os.path.join(os.path.dirname(__file__), "backend.py"), "--addr", addr], + ) + time.sleep(2) # Give the server a moment to bind + + @classmethod + def tearDownClass(cls): + cls.server.terminate() + try: + cls.server.wait(timeout=5) + except subprocess.TimeoutExpired: + cls.server.kill() + + def _stub(self): + channel = grpc.insecure_channel(self.addr) + return backend_pb2_grpc.BackendStub(channel) + + def test_health(self): + stub = self._stub() + reply = stub.Health(backend_pb2.HealthMessage(), timeout=5) + self.assertEqual(reply.message, b"OK") + + def test_load_finetune_mode_without_weights(self): + """Loading in fine-tune mode should succeed without pulling model weights.""" + stub = self._stub() + result = stub.LoadModel( + backend_pb2.ModelOptions( + Model="LiquidAI/LFM2.5-Audio-1.5B", + Options=["mode:finetune"], + ), + timeout=10, + ) + self.assertTrue(result.success, msg=result.message) + + @unittest.skipUnless(os.environ.get("LIQUID_AUDIO_MODEL_ID"), + "Set LIQUID_AUDIO_MODEL_ID to run an end-to-end inference smoke test") + def test_inference(self): + """End-to-end: load a real LFM2-Audio model and run one short prediction.""" + stub = self._stub() + model_id = os.environ["LIQUID_AUDIO_MODEL_ID"] + result = stub.LoadModel( + backend_pb2.ModelOptions( + Model=model_id, + Options=["mode:chat"], + ), + timeout=600, + ) + self.assertTrue(result.success, msg=result.message) + reply = stub.Predict( + backend_pb2.PredictOptions( + Prompt="Hello!", + Tokens=8, + Temperature=0.0, + ), + timeout=120, + ) + self.assertGreater(len(reply.message), 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/backend/python/liquid-audio/test.sh b/backend/python/liquid-audio/test.sh new file mode 100755 index 000000000..eb59f2aaf --- /dev/null +++ b/backend/python/liquid-audio/test.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e + +backend_dir=$(dirname $0) +if [ -d $backend_dir/common ]; then + source $backend_dir/common/libbackend.sh +else + source $backend_dir/../common/libbackend.sh +fi + +runUnittests diff --git a/core/config/backend_capabilities.go b/core/config/backend_capabilities.go index e9c1920f0..4d66dc107 100644 --- a/core/config/backend_capabilities.go +++ b/core/config/backend_capabilities.go @@ -24,6 +24,7 @@ const ( UsecaseVAD = "vad" UsecaseAudioTransform = "audio_transform" UsecaseDiarization = "diarization" + UsecaseRealtimeAudio = "realtime_audio" ) // GRPCMethod identifies a Backend service RPC from backend.proto. @@ -45,6 +46,7 @@ const ( MethodVAD GRPCMethod = "VAD" MethodAudioTransform GRPCMethod = "AudioTransform" MethodDiarize GRPCMethod = "Diarize" + MethodAudioToAudioStream GRPCMethod = "AudioToAudioStream" ) // UsecaseInfo describes a single known_usecase value and how it maps @@ -147,6 +149,11 @@ var UsecaseInfoMap = map[string]UsecaseInfo{ GRPCMethod: MethodDiarize, Description: "Speaker diarization (who-spoke-when, per-speaker segments) via the Diarize RPC.", }, + UsecaseRealtimeAudio: { + Flag: FLAG_REALTIME_AUDIO, + GRPCMethod: MethodAudioToAudioStream, + Description: "Self-contained any-to-any audio model for the Realtime API — accepts microphone audio and emits speech + transcript (+ optional function calls) from a single backend via the AudioToAudioStream RPC.", + }, } // BackendCapability describes which gRPC methods and usecases a backend supports. @@ -397,6 +404,15 @@ var BackendCapabilities = map[string]BackendCapability{ Description: "Meta MusicGen via transformers — music generation from text", }, + // --- Any-to-any audio backends --- + "liquid-audio": { + GRPCMethods: []GRPCMethod{MethodPredict, MethodPredictStream, MethodAudioTranscription, MethodTTS, MethodAudioToAudioStream, MethodVAD}, + PossibleUsecases: []string{UsecaseChat, UsecaseCompletion, UsecaseTranscript, UsecaseTTS, UsecaseRealtimeAudio, UsecaseVAD}, + DefaultUsecases: []string{UsecaseRealtimeAudio, UsecaseChat, UsecaseTranscript, UsecaseTTS, UsecaseVAD}, + AcceptsAudios: true, + Description: "LFM2 / LFM2.5-Audio — self-contained any-to-any audio model for the Realtime API; also exposes chat, transcription, TTS and a stub energy-based VAD endpoint", + }, + // --- Audio transform backends --- "localvqe": { GRPCMethods: []GRPCMethod{MethodAudioTransform}, diff --git a/core/config/model_config.go b/core/config/model_config.go index 5a7c74c41..f14bc4a4e 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -636,6 +636,7 @@ const ( FLAG_SPEAKER_RECOGNITION ModelConfigUsecase = 0b1000000000000000 FLAG_AUDIO_TRANSFORM ModelConfigUsecase = 0b10000000000000000 FLAG_DIARIZATION ModelConfigUsecase = 0b100000000000000000 + FLAG_REALTIME_AUDIO ModelConfigUsecase = 0b1000000000000000000 // Common Subsets FLAG_LLM ModelConfigUsecase = FLAG_CHAT | FLAG_COMPLETION | FLAG_EDIT @@ -645,12 +646,12 @@ const ( // Flags within the same group are NOT orthogonal (e.g., chat and completion are // both text/language). A model is multimodal when its usecases span 2+ groups. var ModalityGroups = []ModelConfigUsecase{ - FLAG_CHAT | FLAG_COMPLETION | FLAG_EDIT, // text/language - FLAG_VISION | FLAG_DETECTION, // visual understanding - FLAG_TRANSCRIPT, // speech input - FLAG_TTS | FLAG_SOUND_GENERATION, // audio output - FLAG_AUDIO_TRANSFORM, // audio in/out transforms - FLAG_IMAGE | FLAG_VIDEO, // visual generation + FLAG_CHAT | FLAG_COMPLETION | FLAG_EDIT, // text/language + FLAG_VISION | FLAG_DETECTION, // visual understanding + FLAG_TRANSCRIPT | FLAG_REALTIME_AUDIO, // speech input — realtime_audio is any-to-any, so it counts here too + FLAG_TTS | FLAG_SOUND_GENERATION | FLAG_REALTIME_AUDIO, // audio output — and here, so a lone realtime_audio flag still reads as multimodal + FLAG_AUDIO_TRANSFORM, // audio in/out transforms + FLAG_IMAGE | FLAG_VIDEO, // visual generation } // IsMultimodal returns true if the given usecases span two or more orthogonal @@ -692,6 +693,7 @@ func GetAllModelConfigUsecases() map[string]ModelConfigUsecase { "FLAG_SPEAKER_RECOGNITION": FLAG_SPEAKER_RECOGNITION, "FLAG_AUDIO_TRANSFORM": FLAG_AUDIO_TRANSFORM, "FLAG_DIARIZATION": FLAG_DIARIZATION, + "FLAG_REALTIME_AUDIO": FLAG_REALTIME_AUDIO, } } @@ -866,6 +868,16 @@ func (c *ModelConfig) GuessUsecases(u ModelConfigUsecase) bool { } } + if (u & FLAG_REALTIME_AUDIO) == FLAG_REALTIME_AUDIO { + // Backends that own a single any-to-any loop and implement + // AudioToAudioStream — listed here so models without an explicit + // known_usecases still surface on the Talk page. + realtimeAudioBackends := []string{"liquid-audio"} + if !slices.Contains(realtimeAudioBackends, c.Backend) { + return false + } + } + return true } diff --git a/core/gallery/importers/importers.go b/core/gallery/importers/importers.go index 02606f099..88a4314a7 100644 --- a/core/gallery/importers/importers.go +++ b/core/gallery/importers/importers.go @@ -130,6 +130,8 @@ var defaultImporters = []Importer{ // and would otherwise swallow the C++ port's GGUF bundles. &VibeVoiceCppImporter{}, &VibeVoiceImporter{}, + // LiquidAudio (Python) — keep before LlamaCPP so non-GGUF LFM2-Audio repos route here. + &LiquidAudioImporter{}, &CoquiImporter{}, // Image/Video (Batch 3) &StableDiffusionGGMLImporter{}, diff --git a/core/gallery/importers/liquid-audio.go b/core/gallery/importers/liquid-audio.go new file mode 100644 index 000000000..dbea16020 --- /dev/null +++ b/core/gallery/importers/liquid-audio.go @@ -0,0 +1,145 @@ +package importers + +import ( + "encoding/json" + "path/filepath" + "strings" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/schema" + "go.yaml.in/yaml/v2" +) + +var _ Importer = &LiquidAudioImporter{} + +// LiquidAudioImporter recognises LiquidAI's LFM2-Audio family (LFM2-Audio-1.5B, +// LFM2.5-Audio-1.5B, community finetunes) and routes them to the Python +// `liquid-audio` backend. Detection is by repo-name substring so third-party +// mirrors still match. preferences.backend="liquid-audio" overrides detection. +// +// Once upstream llama.cpp PR #18641 lands and the GGUF gallery entries are +// added, GGUF mirrors of these models should route to llama-cpp; that's +// handled by ordering LlamaCPPImporter after this one and by the explicit +// "-gguf" exclusion below. +type LiquidAudioImporter struct{} + +func (i *LiquidAudioImporter) Name() string { return "liquid-audio" } +func (i *LiquidAudioImporter) Modality() string { return "tts" } +func (i *LiquidAudioImporter) AutoDetects() bool { return true } + +func (i *LiquidAudioImporter) Match(details Details) bool { + preferences, err := details.Preferences.MarshalJSON() + if err != nil { + return false + } + preferencesMap := make(map[string]any) + if len(preferences) > 0 { + if err := json.Unmarshal(preferences, &preferencesMap); err != nil { + return false + } + } + + if b, ok := preferencesMap["backend"].(string); ok && b == "liquid-audio" { + return true + } + + matchRepo := func(repo string) bool { + r := strings.ToLower(repo) + // Cede GGUF mirrors to the (later-ordered) llama-cpp importer. + if strings.HasSuffix(r, "-gguf") { + return false + } + return strings.Contains(r, "lfm2-audio") || strings.Contains(r, "lfm2.5-audio") + } + + if details.HuggingFace != nil { + repoName := details.HuggingFace.ModelID + if idx := strings.Index(repoName, "/"); idx >= 0 { + repoName = repoName[idx+1:] + } + if matchRepo(repoName) { + return true + } + } + + if _, repo, ok := HFOwnerRepoFromURI(details.URI); ok { + return matchRepo(repo) + } + return false +} + +func (i *LiquidAudioImporter) Import(details Details) (gallery.ModelConfig, error) { + preferences, err := details.Preferences.MarshalJSON() + if err != nil { + return gallery.ModelConfig{}, err + } + preferencesMap := make(map[string]any) + if len(preferences) > 0 { + if err := json.Unmarshal(preferences, &preferencesMap); err != nil { + return gallery.ModelConfig{}, err + } + } + + name, ok := preferencesMap["name"].(string) + if !ok { + name = filepath.Base(details.URI) + } + + description, ok := preferencesMap["description"].(string) + if !ok { + description = "Imported from " + details.URI + } + + model := details.URI + if details.HuggingFace != nil && details.HuggingFace.ModelID != "" { + model = details.HuggingFace.ModelID + } + + // Preferences may pin the mode (chat / asr / tts / s2s / finetune). + // Default to s2s — the headline any-to-any use case. + mode, _ := preferencesMap["mode"].(string) + if mode == "" { + mode = "s2s" + } + + options := []string{"mode:" + mode} + if voice, ok := preferencesMap["voice"].(string); ok && voice != "" { + options = append(options, "voice:"+voice) + } + + usecases := []string{"chat"} + switch mode { + case "asr": + usecases = []string{"transcript"} + case "tts": + usecases = []string{"tts"} + case "s2s": + // realtime_audio surfaces the model on the Talk page; chat/tts/ + // transcript/vad keep the standalone OpenAI-compatible endpoints + // working since liquid-audio implements all of them. + usecases = []string{"realtime_audio", "chat", "tts", "transcript", "vad"} + } + + modelConfig := config.ModelConfig{ + Name: name, + Description: description, + Backend: "liquid-audio", + KnownUsecaseStrings: usecases, + Options: options, + PredictionOptions: schema.PredictionOptions{ + BasicModelRequest: schema.BasicModelRequest{Model: model}, + }, + } + + data, err := yaml.Marshal(modelConfig) + if err != nil { + return gallery.ModelConfig{}, err + } + + return gallery.ModelConfig{ + Name: name, + Description: description, + ConfigFile: string(data), + }, nil +} diff --git a/core/gallery/importers/liquid-audio_test.go b/core/gallery/importers/liquid-audio_test.go new file mode 100644 index 000000000..f7dc5e046 --- /dev/null +++ b/core/gallery/importers/liquid-audio_test.go @@ -0,0 +1,91 @@ +package importers_test + +import ( + "encoding/json" + "fmt" + + "github.com/mudler/LocalAI/core/gallery/importers" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("LiquidAudioImporter", func() { + Context("detection from HuggingFace", func() { + It("matches LiquidAI/LFM2.5-Audio-1.5B", func() { + uri := "https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B" + preferences := json.RawMessage(`{}`) + + modelConfig, err := importers.DiscoverModelConfig(uri, preferences) + + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error: %v", err)) + Expect(modelConfig.ConfigFile).To(ContainSubstring("backend: liquid-audio")) + Expect(modelConfig.ConfigFile).To(ContainSubstring("LiquidAI/LFM2.5-Audio-1.5B")) + }) + + It("matches LiquidAI/LFM2-Audio-1.5B (older variant)", func() { + uri := "https://huggingface.co/LiquidAI/LFM2-Audio-1.5B" + preferences := json.RawMessage(`{}`) + + modelConfig, err := importers.DiscoverModelConfig(uri, preferences) + + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error: %v", err)) + Expect(modelConfig.ConfigFile).To(ContainSubstring("backend: liquid-audio")) + }) + + It("cedes -GGUF mirrors to the llama-cpp importer", func() { + // LiquidAI/LFM2.5-Audio-1.5B-GGUF should NOT route to liquid-audio. + // Once upstream PR #18641 lands and the GGUF gallery entry exists, + // this is the path that lets users opt into the C++ runtime. + uri := "https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B-GGUF" + preferences := json.RawMessage(`{}`) + + modelConfig, err := importers.DiscoverModelConfig(uri, preferences) + + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error: %v", err)) + Expect(modelConfig.ConfigFile).ToNot(ContainSubstring("backend: liquid-audio"), + fmt.Sprintf("GGUF repo should not match Python importer; got: %s", modelConfig.ConfigFile)) + }) + }) + + Context("preference override", func() { + It("honours preferences.backend=liquid-audio for arbitrary URIs", func() { + uri := "https://example.com/some-unrelated-model" + preferences := json.RawMessage(`{"backend": "liquid-audio"}`) + + modelConfig, err := importers.DiscoverModelConfig(uri, preferences) + + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error: %v", err)) + Expect(modelConfig.ConfigFile).To(ContainSubstring("backend: liquid-audio")) + }) + + It("picks up the mode preference", func() { + uri := "https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B" + preferences := json.RawMessage(`{"mode": "asr"}`) + + modelConfig, err := importers.DiscoverModelConfig(uri, preferences) + + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error: %v", err)) + Expect(modelConfig.ConfigFile).To(ContainSubstring("mode:asr")) + Expect(modelConfig.ConfigFile).To(ContainSubstring("transcript")) + }) + + It("picks up the voice preference", func() { + uri := "https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B" + preferences := json.RawMessage(`{"mode": "tts", "voice": "uk_male"}`) + + modelConfig, err := importers.DiscoverModelConfig(uri, preferences) + + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Error: %v", err)) + Expect(modelConfig.ConfigFile).To(ContainSubstring("voice:uk_male")) + }) + }) + + Context("Importer interface metadata", func() { + It("exposes name/modality/autodetect", func() { + imp := &importers.LiquidAudioImporter{} + Expect(imp.Name()).To(Equal("liquid-audio")) + Expect(imp.Modality()).To(Equal("tts")) + Expect(imp.AutoDetects()).To(BeTrue()) + }) + }) +}) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index f02aa7fe0..b7a0be6ac 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "os" + "strconv" "sync" "time" @@ -20,6 +21,8 @@ import ( "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/auth" + mcpTools "github.com/mudler/LocalAI/core/http/endpoints/mcp" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/core/templates" @@ -79,6 +82,26 @@ type Session struct { InputSampleRate int OutputSampleRate int MaxOutputTokens types.IntOrInf + // MaxHistoryItems caps the number of MessageItems passed to the LLM each + // turn (0 = unlimited). Small models — especially the LFM2.5-Audio 1.5B + // served via the liquid-audio backend — degrade quickly past a handful + // of turns. Counted from the tail; FunctionCall + FunctionCallOutput + // pairs are kept together so we never feed an orphaned tool result. + MaxHistoryItems int + + // AssistantExecutor is non-nil when the session opted into the in-process + // LocalAI Assistant tool surface. Tool calls whose name matches this + // executor's catalog are run inproc and their output is fed back to the + // model server-side; the client never sees a function_call_arguments + // event for those. Mirrors the chat handler's metadata.localai_assistant + // path. + AssistantExecutor mcpTools.ToolExecutor + + // AssistantTools is the cached ToolUnion slice we injected at session + // creation. Re-applied after every client session.update so a + // client-driven tool refresh (e.g. toggling a client MCP server) doesn't + // silently strip Manage Mode's tools. + AssistantTools []types.ToolUnion // Response cancellation: protects activeResponseCancel/activeResponseDone responseMu sync.Mutex @@ -205,6 +228,19 @@ func RealtimeTranscriptionSession(application *application.Application) echo.Han } } +// RealtimeSessionOptions bundles per-session knobs decoded from the WS query +// string (or the WebRTC handshake body). Mirrors what chat.go pulls off +// `metadata.localai_assistant` — admin-only opt-in to the in-process +// management tool surface. +type RealtimeSessionOptions struct { + LocalAIAssistant bool + // AuthEnabled mirrors chat.go's requireAssistantAccess gate. We resolve + // admin role at handshake time (where the echo.Context has the auth + // cookie/Bearer) and drop the result here so runRealtimeSession can + // decide without holding onto the request. + IsAdmin bool +} + func Realtime(application *application.Application) echo.HandlerFunc { return func(c echo.Context) error { ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) @@ -218,25 +254,105 @@ func Realtime(application *application.Application) echo.HandlerFunc { // Extract query parameters from Echo context before passing to websocket handler model := c.QueryParam("model") + assistantFlag, _ := strconv.ParseBool(c.QueryParam("localai_assistant")) + opts := RealtimeSessionOptions{ + LocalAIAssistant: assistantFlag, + IsAdmin: isCurrentUserAdmin(c, application), + } - registerRealtime(application, model)(ws) + registerRealtime(application, model, opts)(ws) return nil } } -func registerRealtime(application *application.Application, model string) func(c *websocket.Conn) { +// isCurrentUserAdmin replicates the chat-side admin check at the realtime +// handshake. When auth is disabled, every caller is treated as admin (same +// as chat's requireAssistantAccess). +func isCurrentUserAdmin(c echo.Context, application *application.Application) bool { + if application == nil || application.ApplicationConfig() == nil || !application.ApplicationConfig().Auth.Enabled { + return true + } + user := auth.GetUser(c) + return user != nil && user.Role == auth.RoleAdmin +} + +func registerRealtime(application *application.Application, model string, opts RealtimeSessionOptions) func(c *websocket.Conn) { return func(conn *websocket.Conn) { t := NewWebSocketTransport(conn) evaluator := application.TemplatesEvaluator() xlog.Debug("Realtime WebSocket connection established", "address", conn.RemoteAddr().String(), "model", model) - runRealtimeSession(application, t, model, evaluator) + runRealtimeSession(application, t, model, evaluator, opts) } } +// defaultMaxHistoryItems picks a sensible default cap for the session. +// Small any-to-any audio models degrade quickly past a handful of turns; +// legacy pipelines composing larger LLMs keep the historical "unlimited" +// default and rely on the LLM's own context window. +func defaultMaxHistoryItems(cfg *config.ModelConfig) int { + if cfg != nil && cfg.HasUsecases(config.FLAG_REALTIME_AUDIO) { + return 6 + } + return 0 +} + +// trimRealtimeItems returns the tail of items capped at maxItems (0 = no cap). +// Walks backwards keeping function_call + function_call_output pairs together +// so we never feed the LLM an orphaned tool result that references a call it +// can't see. +func trimRealtimeItems(items []*types.MessageItemUnion, maxItems int) []*types.MessageItemUnion { + if maxItems <= 0 || len(items) <= maxItems { + return items + } + // Find the cut point starting from len-maxItems and pull it left until + // we're not in the middle of a tool-call pair. + cut := len(items) - maxItems + for cut > 0 && items[cut] != nil && items[cut].FunctionCallOutput != nil { + cut-- + } + return items[cut:] +} + +// prepareRealtimeConfig validates a model config for use in a realtime session +// and fills in pipeline slots for self-contained any-to-any models. It returns +// an error code + message pair suitable for sendError; the bool indicates +// whether the caller should proceed. Extracted from runRealtimeSession so the +// gate logic can be exercised in unit tests without a full Application. +func prepareRealtimeConfig(cfg *config.ModelConfig) (errCode, errMsg string, ok bool) { + if cfg == nil { + return "invalid_model", "Model is not a pipeline model", false + } + + // Self-contained any-to-any models (e.g. liquid-audio) own the whole + // loop in one engine — surface them by populating empty pipeline slots + // with the model's own name so newModel can resolve a config for each + // role. The user can still pin individual slots (e.g. Pipeline.VAD = + // silero-vad) and those wins. + if cfg.HasUsecases(config.FLAG_REALTIME_AUDIO) { + if cfg.Pipeline.VAD == "" { + cfg.Pipeline.VAD = cfg.Name + } + if cfg.Pipeline.Transcription == "" { + cfg.Pipeline.Transcription = cfg.Name + } + if cfg.Pipeline.LLM == "" { + cfg.Pipeline.LLM = cfg.Name + } + if cfg.Pipeline.TTS == "" { + cfg.Pipeline.TTS = cfg.Name + } + return "", "", true + } + + if cfg.Pipeline.VAD == "" && cfg.Pipeline.Transcription == "" && cfg.Pipeline.TTS == "" && cfg.Pipeline.LLM == "" { + return "invalid_model", "Model is not a pipeline model", false + } + return "", "", true +} + // runRealtimeSession runs the main event loop for a realtime session. // It is transport-agnostic and works with both WebSocket and WebRTC. -func runRealtimeSession(application *application.Application, t Transport, model string, evaluator *templates.Evaluator) { - // TODO: Allow any-to-any model to be specified +func runRealtimeSession(application *application.Application, t Transport, model string, evaluator *templates.Evaluator, opts RealtimeSessionOptions) { cl := application.ModelConfigLoader() cfg, err := cl.LoadModelConfigFileByNameDefaultOptions(model, application.ApplicationConfig()) if err != nil { @@ -245,22 +361,79 @@ func runRealtimeSession(application *application.Application, t Transport, model return } - if cfg == nil || (cfg.Pipeline.VAD == "" && cfg.Pipeline.Transcription == "" && cfg.Pipeline.TTS == "" && cfg.Pipeline.LLM == "") { + if code, msg, ok := prepareRealtimeConfig(cfg); !ok { xlog.Error("model is not a pipeline", "model", model) - sendError(t, "invalid_model", "Model is not a pipeline model", "", "") + sendError(t, code, msg, "", "") return } + // LocalAI Assistant opt-in: gate on admin (same rule as chat.go's + // requireAssistantAccess) and grab the process-wide holder's executor. + // We collect tools + system prompt here and merge them into the session + // below so they're live from the first response.create. + var assistantTools []types.ToolUnion + var assistantSystemPrompt string + var assistantExecutor mcpTools.ToolExecutor + if opts.LocalAIAssistant { + if !opts.IsAdmin { + sendError(t, "forbidden", "localai_assistant requires admin", "", "") + return + } + appCfg := application.ApplicationConfig() + if appCfg != nil && appCfg.DisableLocalAIAssistant { + sendError(t, "unavailable", "LocalAI Assistant is disabled on this server", "", "") + return + } + holder := application.LocalAIAssistant() + if holder == nil || !holder.HasTools() { + sendError(t, "unavailable", "LocalAI Assistant is not available on this server", "", "") + return + } + exec := holder.Executor() + fns, discErr := exec.DiscoverTools(context.Background()) + if discErr != nil { + xlog.Error("realtime: failed to discover LocalAI Assistant tools", "error", discErr) + sendError(t, "tool_discovery_failed", "failed to discover assistant tools: "+discErr.Error(), "", "") + return + } + assistantExecutor = exec + assistantSystemPrompt = holder.SystemPrompt() + assistantTools = make([]types.ToolUnion, 0, len(fns)) + for _, fn := range fns { + fnCopy := fn + assistantTools = append(assistantTools, types.ToolUnion{ + Function: &types.ToolFunction{ + Name: fnCopy.Name, + Description: fnCopy.Description, + Parameters: fnCopy.Parameters, + }, + }) + } + xlog.Debug("realtime: LocalAI Assistant tools injected", "count", len(fns)) + } + sttModel := cfg.Pipeline.Transcription + // Compose the system prompt: prepend the assistant prompt when we have + // one (it teaches the model the safety rules and tool recipes), then the + // session's default voice instructions. Order matches chat.go's + // hasSystemMessage check — assistant prompt comes first. + instructions := defaultInstructions + if assistantSystemPrompt != "" { + instructions = assistantSystemPrompt + "\n\n" + defaultInstructions + } + sessionID := generateSessionID() session := &Session{ ID: sessionID, TranscriptionOnly: false, Model: model, Voice: cfg.TTSConfig.Voice, - Instructions: defaultInstructions, + Instructions: instructions, ModelConfig: cfg, + Tools: assistantTools, + AssistantTools: assistantTools, + AssistantExecutor: assistantExecutor, TurnDetection: &types.TurnDetectionUnion{ ServerVad: &types.ServerVad{ Threshold: 0.5, @@ -275,6 +448,7 @@ func runRealtimeSession(application *application.Application, t Transport, model Conversations: make(map[string]*Conversation), InputSampleRate: defaultRemoteSampleRate, OutputSampleRate: defaultRemoteSampleRate, + MaxHistoryItems: defaultMaxHistoryItems(cfg), } // Create a default conversation @@ -810,7 +984,28 @@ func updateSession(session *Session, update *types.SessionUnion, cl *config.Mode } if rt.Tools != nil { - session.Tools = rt.Tools + // Manage Mode tools survive a client-driven session.update — the + // alternative is silently dropping them whenever the user toggles + // a client MCP server, which would break the modality mid-session. + // Names from rt.Tools win on collision (the client is explicit; + // we preserve, we don't override). + merged := append([]types.ToolUnion(nil), rt.Tools...) + seen := make(map[string]struct{}, len(merged)) + for _, t := range merged { + if t.Function != nil { + seen[t.Function.Name] = struct{}{} + } + } + for _, t := range session.AssistantTools { + if t.Function == nil { + continue + } + if _, ok := seen[t.Function.Name]; ok { + continue + } + merged = append(merged, t) + } + session.Tools = merged } if rt.ToolChoice != nil { session.ToolChoice = rt.ToolChoice @@ -1104,7 +1299,17 @@ func generateResponse(ctx context.Context, session *Session, utt []byte, transcr triggerResponse(ctx, session, conv, t, nil) } +// maxAssistantToolTurns caps the server-side agentic loop. Mirrors the +// chat-page maxToolTurns:10 from useChat.js — the model gets up to this +// many consecutive tool round-trips before we return control to the user +// without another response cycle. +const maxAssistantToolTurns = 10 + func triggerResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams) { + triggerResponseAtTurn(ctx, session, conv, t, overrides, 0) +} + +func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams, toolTurn int) { config := session.ModelInterface.PredictConfig() // Default values @@ -1155,7 +1360,8 @@ func triggerResponse(ctx context.Context, session *Session, conv *Conversation, imgIndex := 0 conv.Lock.Lock() - for _, item := range conv.Items { + items := trimRealtimeItems(conv.Items, session.MaxHistoryItems) + for _, item := range items { if item.User != nil { msg := schema.Message{ Role: string(types.MessageRoleUser), @@ -1575,8 +1781,16 @@ func triggerResponse(ctx context.Context, session *Session, conv *Conversation, }) } - // Handle Tool Calls + // Handle Tool Calls. Two paths: + // - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run + // server-side; we append both the call and its output to conv.Items + // and re-trigger a follow-up response so the model can speak the + // result. The client only sees observability events. + // - All other tools follow the standard OpenAI flow: emit + // function_call_arguments.done and wait for the client to send + // conversation.item.create back. xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls)) + executedAssistantTool := false for i, tc := range finalToolCalls { toolCallID := generateItemID() callID := "call_" + generateUniqueID() // OpenAI uses call_xyz @@ -1608,6 +1822,51 @@ func triggerResponse(ctx context.Context, session *Session, conv *Conversation, Item: fcItem, }) + serverSide := session.AssistantExecutor != nil && session.AssistantExecutor.IsTool(tc.Name) + if serverSide { + output, execErr := session.AssistantExecutor.ExecuteTool(ctx, tc.Name, tc.Arguments) + if execErr != nil { + output = "Error: " + execErr.Error() + xlog.Error("realtime: assistant tool execution failed", "tool", tc.Name, "error", execErr) + } + foItem := types.MessageItemUnion{ + FunctionCallOutput: &types.MessageItemFunctionCallOutput{ + ID: generateItemID(), + CallID: callID, + Output: output, + Status: types.ItemStatusCompleted, + }, + } + conv.Lock.Lock() + conv.Items = append(conv.Items, &foItem) + conv.Lock.Unlock() + // Close the call out and emit the output as its own paired + // added/done — the OpenAI spec pairs every item-done with a + // preceding item-added, so we re-pair here for the output. + // The UI renders the transcript entry on item.done for both + // shapes (FunctionCall + FunctionCallOutput). + sendEvent(t, types.ResponseOutputItemDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: outputIndex, + Item: fcItem, + }) + sendEvent(t, types.ResponseOutputItemAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: outputIndex, + Item: foItem, + }) + sendEvent(t, types.ResponseOutputItemDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: outputIndex, + Item: foItem, + }) + executedAssistantTool = true + continue + } + sendEvent(t, types.ResponseFunctionCallArgumentsDeltaEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, @@ -1643,6 +1902,19 @@ func triggerResponse(ctx context.Context, session *Session, conv *Conversation, Status: types.ResponseStatusCompleted, }, }) + + // If we executed any assistant tools inproc, run another response cycle + // so the model can speak the result. Mirrors the chat-side agentic loop + // but driven server-side rather than by client round-trip. Bounded so a + // degenerate "model keeps calling tools" doesn't blow the stack. + if executedAssistantTool { + if toolTurn+1 >= maxAssistantToolTurns { + xlog.Warn("realtime: assistant tool-turn limit reached, stopping the agentic loop", + "limit", maxAssistantToolTurns, "model", session.Model) + return + } + triggerResponseAtTurn(ctx, session, conv, t, nil, toolTurn+1) + } } // Helper functions to generate unique IDs diff --git a/core/http/endpoints/openai/realtime_gate_test.go b/core/http/endpoints/openai/realtime_gate_test.go new file mode 100644 index 000000000..e49eb71eb --- /dev/null +++ b/core/http/endpoints/openai/realtime_gate_test.go @@ -0,0 +1,153 @@ +package openai + +import ( + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// withUsecases returns a *ModelConfigUsecase pointing at the OR of the given flags. +// Helper so each spec keeps its intent obvious. +func withUsecases(flags ...config.ModelConfigUsecase) *config.ModelConfigUsecase { + var u config.ModelConfigUsecase + for _, f := range flags { + u |= f + } + return &u +} + +var _ = Describe("prepareRealtimeConfig", func() { + It("rejects a nil config", func() { + code, msg, ok := prepareRealtimeConfig(nil) + Expect(ok).To(BeFalse()) + Expect(code).To(Equal("invalid_model")) + Expect(msg).To(ContainSubstring("not a pipeline model")) + }) + + It("rejects a model with no pipeline slots and no realtime_audio usecase", func() { + cfg := &config.ModelConfig{Name: "plain-chat"} + code, msg, ok := prepareRealtimeConfig(cfg) + Expect(ok).To(BeFalse()) + Expect(code).To(Equal("invalid_model")) + Expect(msg).To(ContainSubstring("not a pipeline model")) + }) + + It("accepts a model with a fully populated legacy pipeline", func() { + cfg := &config.ModelConfig{ + Name: "legacy", + Pipeline: config.Pipeline{ + VAD: "silero", + Transcription: "whisper", + LLM: "llama", + TTS: "piper", + }, + } + _, _, ok := prepareRealtimeConfig(cfg) + Expect(ok).To(BeTrue()) + Expect(cfg.Pipeline.LLM).To(Equal("llama"), "user-supplied pipeline slot must not be overwritten") + }) + + It("accepts a self-contained realtime_audio model and self-pipelines empty slots", func() { + cfg := &config.ModelConfig{ + Name: "lfm2.5-audio-realtime", + KnownUsecases: withUsecases(config.FLAG_REALTIME_AUDIO), + } + _, _, ok := prepareRealtimeConfig(cfg) + Expect(ok).To(BeTrue()) + Expect(cfg.Pipeline.VAD).To(Equal("lfm2.5-audio-realtime")) + Expect(cfg.Pipeline.Transcription).To(Equal("lfm2.5-audio-realtime")) + Expect(cfg.Pipeline.LLM).To(Equal("lfm2.5-audio-realtime")) + Expect(cfg.Pipeline.TTS).To(Equal("lfm2.5-audio-realtime")) + }) + + It("preserves user-pinned pipeline slots on a realtime_audio model", func() { + // A user might want a dedicated silero-vad and let the realtime_audio + // model own only STT/LLM/TTS. + cfg := &config.ModelConfig{ + Name: "lfm-with-external-vad", + KnownUsecases: withUsecases(config.FLAG_REALTIME_AUDIO), + Pipeline: config.Pipeline{ + VAD: "silero-vad", + }, + } + _, _, ok := prepareRealtimeConfig(cfg) + Expect(ok).To(BeTrue()) + Expect(cfg.Pipeline.VAD).To(Equal("silero-vad")) + Expect(cfg.Pipeline.Transcription).To(Equal("lfm-with-external-vad")) + Expect(cfg.Pipeline.LLM).To(Equal("lfm-with-external-vad")) + Expect(cfg.Pipeline.TTS).To(Equal("lfm-with-external-vad")) + }) + + It("accepts a model with at least one legacy pipeline slot set", func() { + // Pre-existing behaviour: the gate only rejected when ALL four slots + // were empty. Lock that in so the change doesn't tighten the gate. + cfg := &config.ModelConfig{ + Name: "partial", + Pipeline: config.Pipeline{ + LLM: "llama", + }, + } + _, _, ok := prepareRealtimeConfig(cfg) + Expect(ok).To(BeTrue()) + }) +}) + +var _ = Describe("defaultMaxHistoryItems", func() { + It("caps realtime_audio sessions at 6", func() { + cfg := &config.ModelConfig{KnownUsecases: withUsecases(config.FLAG_REALTIME_AUDIO)} + Expect(defaultMaxHistoryItems(cfg)).To(Equal(6)) + }) + It("leaves legacy pipelines unlimited", func() { + cfg := &config.ModelConfig{Pipeline: config.Pipeline{LLM: "llama"}} + Expect(defaultMaxHistoryItems(cfg)).To(Equal(0)) + }) + It("tolerates nil", func() { + Expect(defaultMaxHistoryItems(nil)).To(Equal(0)) + }) +}) + +var _ = Describe("trimRealtimeItems", func() { + user := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} + } + assistant := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{Assistant: &types.MessageItemAssistant{ID: id}} + } + fnCall := func(id, callID string) *types.MessageItemUnion { + return &types.MessageItemUnion{FunctionCall: &types.MessageItemFunctionCall{ID: id, CallID: callID}} + } + fnOut := func(id, callID string) *types.MessageItemUnion { + return &types.MessageItemUnion{FunctionCallOutput: &types.MessageItemFunctionCallOutput{ID: id, CallID: callID}} + } + + It("returns the input unchanged when cap is zero", func() { + in := []*types.MessageItemUnion{user("u1"), assistant("a1")} + Expect(trimRealtimeItems(in, 0)).To(Equal(in)) + }) + + It("returns the input unchanged when under the cap", func() { + in := []*types.MessageItemUnion{user("u1"), assistant("a1")} + Expect(trimRealtimeItems(in, 4)).To(Equal(in)) + }) + + It("keeps the tail when over the cap", func() { + in := []*types.MessageItemUnion{user("u1"), assistant("a1"), user("u2"), assistant("a2"), user("u3")} + out := trimRealtimeItems(in, 3) + Expect(out).To(HaveLen(3)) + Expect(out[0].User.ID).To(Equal("u2")) + Expect(out[2].User.ID).To(Equal("u3")) + }) + + It("pulls the cut left to keep a function_call paired with its output", func() { + // 0:user 1:fc 2:fc_out 3:assistant — cap=2 would otherwise start at + // index 2 (orphan fc_out). Helper must roll back to include 1. + in := []*types.MessageItemUnion{user("u1"), fnCall("fc1", "c1"), fnOut("fo1", "c1"), assistant("a1")} + out := trimRealtimeItems(in, 2) + // Expect at least the fc + fc_out + assistant (3 items, cap was 2) + // — the rollback prefers correctness over the cap. + Expect(len(out)).To(BeNumerically(">=", 3)) + Expect(out[0].FunctionCall).NotTo(BeNil()) + Expect(out[1].FunctionCallOutput).NotTo(BeNil()) + }) +}) diff --git a/core/http/endpoints/openai/realtime_webrtc.go b/core/http/endpoints/openai/realtime_webrtc.go index 864c67b19..fd305799c 100644 --- a/core/http/endpoints/openai/realtime_webrtc.go +++ b/core/http/endpoints/openai/realtime_webrtc.go @@ -15,6 +15,10 @@ import ( type RealtimeCallRequest struct { SDP string `json:"sdp"` Model string `json:"model"` + // LocalAIAssistant opts the session into the in-process admin tool + // surface (same modality as the chat page's "Manage Mode"). Admin-only; + // the realtime entry point gates it the same way the chat handler does. + LocalAIAssistant bool `json:"localai_assistant,omitempty"` } // RealtimeCallResponse is the JSON response for POST /v1/realtime/calls. @@ -165,9 +169,13 @@ func RealtimeCalls(application *application.Application) echo.HandlerFunc { // Start the realtime session in a goroutine evaluator := application.TemplatesEvaluator() + opts := RealtimeSessionOptions{ + LocalAIAssistant: req.LocalAIAssistant, + IsAdmin: isCurrentUserAdmin(c, application), + } go func() { defer transport.Close() - runRealtimeSession(application, transport, req.Model, evaluator) + runRealtimeSession(application, transport, req.Model, evaluator, opts) }() return c.JSON(http.StatusCreated, RealtimeCallResponse{ diff --git a/core/http/react-ui/public/locales/en/models.json b/core/http/react-ui/public/locales/en/models.json index 441b2fc09..b503dd187 100644 --- a/core/http/react-ui/public/locales/en/models.json +++ b/core/http/react-ui/public/locales/en/models.json @@ -24,6 +24,7 @@ "diarization": "Diarization", "soundGen": "Sound", "audioTransform": "Audio FX", + "realtimeAudio": "Realtime Audio", "embedding": "Embeddings", "rerank": "Rerank", "detection": "Detection", diff --git a/core/http/react-ui/src/pages/FineTune.jsx b/core/http/react-ui/src/pages/FineTune.jsx index 1b593f8cd..61a9be8fe 100644 --- a/core/http/react-ui/src/pages/FineTune.jsx +++ b/core/http/react-ui/src/pages/FineTune.jsx @@ -732,6 +732,9 @@ export default function FineTune() { const [seed, setSeed] = useState(0) const [mixedPrecision, setMixedPrecision] = useState('') const [extraOptions, setExtraOptions] = useState([]) + // liquid-audio specific knobs (folded into extra_options on submit) + const [liquidAudioVoice, setLiquidAudioVoice] = useState('') + const [liquidAudioValDataset, setLiquidAudioValDataset] = useState('') const [hfToken, setHfToken] = useState('') const [showAdvanced, setShowAdvanced] = useState(false) const [resumeFromCheckpoint, setResumeFromCheckpoint] = useState('') @@ -801,6 +804,12 @@ export default function FineTune() { for (const { key, value } of extraOptions) { if (key.trim()) extra[key.trim()] = value } + // Fold liquid-audio specific fields into extra_options. The Python + // backend reads `voice` and `val_dataset` directly from there. + if (backend === 'liquid-audio') { + if (liquidAudioVoice) extra.voice = liquidAudioVoice + if (liquidAudioValDataset.trim()) extra.val_dataset = liquidAudioValDataset.trim() + } const isAdapter = ['lora', 'loha', 'lokr'].includes(trainingType) @@ -872,6 +881,10 @@ export default function FineTune() { for (const { key, value } of extraOptions) { if (key.trim()) extra[key.trim()] = value } + if (backend === 'liquid-audio') { + if (liquidAudioVoice) extra.voice = liquidAudioVoice + if (liquidAudioValDataset.trim()) extra.val_dataset = liquidAudioValDataset.trim() + } return { model, backend, @@ -965,10 +978,15 @@ export default function FineTune() { setSaveTotalLimit(Number(config.extra_options.save_total_limit)) } + // Restore liquid-audio specific extras (also filtered out of the + // freeform list below). + if (config.extra_options?.voice != null) setLiquidAudioVoice(String(config.extra_options.voice)) + if (config.extra_options?.val_dataset != null) setLiquidAudioValDataset(String(config.extra_options.val_dataset)) + // Convert extra_options object to [{key, value}] entries, filtering out handled keys if (config.extra_options && typeof config.extra_options === 'object') { const entries = Object.entries(config.extra_options) - .filter(([k]) => !['max_seq_length', 'save_total_limit', 'hf_token', 'eval_strategy', 'eval_steps', 'eval_split', 'eval_dataset_source', 'eval_split_ratio'].includes(k)) + .filter(([k]) => !['max_seq_length', 'save_total_limit', 'hf_token', 'eval_strategy', 'eval_steps', 'eval_split', 'eval_dataset_source', 'eval_split_ratio', 'voice', 'val_dataset'].includes(k)) .map(([key, value]) => ({ key, value: String(value) })) setExtraOptions(entries) } @@ -1458,6 +1476,31 @@ export default function FineTune() { )} + {backend === 'liquid-audio' && ( +
+ +
+ Dataset must be preprocessed by LFM2AudioChatMapper (a directory of LFM2DataLoader-ready arrow files). See liquid_audio/examples/preprocess_jenny_tts.py for the conversion recipe. +
+
+
+ + +
+
+ + setLiquidAudioValDataset(e.target.value)} placeholder="e.g. /data/jenny_tts/val" className="input" /> +
+
+
+ )} +
diff --git a/core/http/react-ui/src/pages/Models.jsx b/core/http/react-ui/src/pages/Models.jsx index bf930fab7..20c64b3ac 100644 --- a/core/http/react-ui/src/pages/Models.jsx +++ b/core/http/react-ui/src/pages/Models.jsx @@ -28,6 +28,7 @@ const FILTERS = [ { key: 'diarization', labelKey: 'filters.diarization', icon: 'fa-users' }, { key: 'sound_generation', labelKey: 'filters.soundGen', icon: 'fa-music' }, { key: 'audio_transform', labelKey: 'filters.audioTransform', icon: 'fa-sliders' }, + { key: 'realtime_audio', labelKey: 'filters.realtimeAudio', icon: 'fa-tower-broadcast' }, { key: 'embeddings', labelKey: 'filters.embedding', icon: 'fa-vector-square' }, { key: 'rerank', labelKey: 'filters.rerank', icon: 'fa-sort' }, { key: 'detection', labelKey: 'filters.detection', icon: 'fa-bullseye' }, diff --git a/core/http/react-ui/src/pages/Talk.jsx b/core/http/react-ui/src/pages/Talk.jsx index b72342fad..cf92102ac 100644 --- a/core/http/react-ui/src/pages/Talk.jsx +++ b/core/http/react-ui/src/pages/Talk.jsx @@ -2,6 +2,10 @@ import { useState, useRef, useEffect, useCallback, useMemo } from 'react' import { useOutletContext, useNavigate } from 'react-router-dom' import { realtimeApi } from '../utils/api' import ModelSelector from '../components/ModelSelector' +import ClientMCPDropdown from '../components/ClientMCPDropdown' +import { useMCPClient } from '../hooks/useMCPClient' +import { loadClientMCPServers } from '../utils/mcpClientStorage' +import { useAuth } from '../context/AuthContext' const STATUS_STYLES = { disconnected: { icon: 'fa-solid fa-circle', color: 'var(--color-text-secondary)', bg: 'transparent' }, @@ -40,6 +44,27 @@ export default function Talk() { const [voiceEdited, setVoiceEdited] = useState(false) const [language, setLanguage] = useState('') + // Client MCP — mirrors the chat page's wiring (useMCPClient + ClientMCPDropdown). + // Talk has a single ephemeral session, so the active server set lives in component + // state rather than per-chat config. + const [clientMCPServers, setClientMCPServers] = useState(() => loadClientMCPServers()) + const [activeMCPIds, setActiveMCPIds] = useState([]) + const { + connect: mcpConnect, + disconnect: mcpDisconnect, + getToolsForLLM, + isClientTool, + executeTool, + connectionStatuses, + getConnectedTools, + } = useMCPClient() + + // LocalAI Assistant ("Manage Mode") — mirrors the chat-page toggle. + // Admin-only; the realtime endpoint enforces the gate too. When on, the + // backend mounts the in-process MCP admin tool surface for this session. + const { isAdmin } = useAuth() + const [manageMode, setManageMode] = useState(false) + // Diagnostics const [diagVisible, setDiagVisible] = useState(false) @@ -75,7 +100,7 @@ export default function Talk() { if (!voiceEdited) setVoice(models[0].voice || '') } }) - .catch(err => addToast(`Failed to load pipeline models: ${err.message}`, 'error', 5000, { link: { href: '/app/traces?tab=backend', text: 'View traces' } })) + .catch(err => addToast(`Failed to load realtime models: ${err.message}`, 'error', 5000, { link: { href: '/app/traces?tab=backend', text: 'View traces' } })) .finally(() => setModelsLoading(false)) }, []) @@ -84,6 +109,32 @@ export default function Talk() { transcriptEndRef.current?.scrollIntoView({ behavior: 'smooth' }) }, [transcript]) + // Mirror Chat.jsx: connect / disconnect client MCP servers as the user toggles them. + useEffect(() => { + const activeSet = new Set(activeMCPIds) + for (const server of clientMCPServers) { + const status = connectionStatuses[server.id]?.status + if (activeSet.has(server.id) && status !== 'connected' && status !== 'connecting') { + mcpConnect(server) + } else if (!activeSet.has(server.id) && (status === 'connected' || status === 'connecting')) { + mcpDisconnect(server.id) + } + } + }, [activeMCPIds.join(','), clientMCPServers, connectionStatuses, mcpConnect, mcpDisconnect]) + + const handleClientMCPToggle = useCallback((serverId) => { + setActiveMCPIds(prev => prev.includes(serverId) ? prev.filter(s => s !== serverId) : [...prev, serverId]) + }, []) + const handleClientMCPServerAdded = useCallback((server) => { + setClientMCPServers(loadClientMCPServers()) + setActiveMCPIds(prev => prev.includes(server.id) ? prev : [...prev, server.id]) + }, []) + const handleClientMCPServerRemoved = useCallback(async (id) => { + await mcpDisconnect(id) + setClientMCPServers(loadClientMCPServers()) + setActiveMCPIds(prev => prev.filter(s => s !== id)) + }, [mcpDisconnect]) + const selectedModelInfo = pipelineModels.find(m => m.name === selectedModel) // ── Status helper ── @@ -96,7 +147,9 @@ export default function Talk() { const sendSessionUpdate = useCallback(() => { const dc = dcRef.current if (!dc || dc.readyState !== 'open') return - if (!instructions.trim() && !voice.trim() && !language.trim()) return + + const tools = getToolsForLLM() + if (!instructions.trim() && !voice.trim() && !language.trim() && tools.length === 0) return const session = {} if (instructions.trim()) session.instructions = instructions.trim() @@ -105,9 +158,57 @@ export default function Talk() { if (voice.trim()) session.audio.output = { voice: voice.trim() } if (language.trim()) session.audio.input = { transcription: { language: language.trim() } } } + // Pass MCP-server-advertised tools straight through. Server-side they + // get rendered into the model's prompt via the function:/argument_regex + // pair on the model config (gallery/lfm.yaml for LFM2.5-Audio). + if (tools.length > 0) session.tools = tools dc.send(JSON.stringify({ type: 'session.update', session })) - }, [instructions, voice, language]) + }, [instructions, voice, language, getToolsForLLM]) + + // Re-send session.update whenever the tool set changes mid-session so the + // model sees newly-toggled MCP servers without a reconnect. + useEffect(() => { + if (isConnected) sendSessionUpdate() + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [activeMCPIds.join(',')]) + + // ── Function-call dispatcher ── + // Mirrors the chat-page agentic loop: collect args from the model's + // function_call_arguments.done event, hand them to the MCP client's + // executeTool, then echo the result back via conversation.item.create + + // response.create so the model can complete its turn with the tool output. + const handleFunctionCall = useCallback(async (event) => { + const dc = dcRef.current + if (!dc || dc.readyState !== 'open') return + const { call_id: callId, name, arguments: argsJson } = event + if (!callId || !name) return + if (!isClientTool(name)) { + // No MCP server advertises this tool — let the model know so it can + // recover instead of hanging. + dc.send(JSON.stringify({ + type: 'conversation.item.create', + item: { type: 'function_call_output', call_id: callId, output: `Error: unknown tool "${name}"` }, + })) + dc.send(JSON.stringify({ type: 'response.create' })) + return + } + updateStatus('thinking', `Running tool ${name}...`) + try { + const result = await executeTool(name, argsJson) + dc.send(JSON.stringify({ + type: 'conversation.item.create', + item: { type: 'function_call_output', call_id: callId, output: typeof result === 'string' ? result : JSON.stringify(result) }, + })) + dc.send(JSON.stringify({ type: 'response.create' })) + } catch (err) { + dc.send(JSON.stringify({ + type: 'conversation.item.create', + item: { type: 'function_call_output', call_id: callId, output: `Error: ${err?.message || err}` }, + })) + dc.send(JSON.stringify({ type: 'response.create' })) + } + }, [executeTool, isClientTool, updateStatus]) // ── Server event handler ── const handleServerEvent = useCallback((event) => { @@ -163,6 +264,32 @@ export default function Talk() { case 'response.output_audio.delta': updateStatus('speaking', 'Speaking...') break + case 'response.output_item.done': { + // Server-executed tools (Manage Mode) surface as output items — + // FunctionCall when the model invokes a tool, FunctionCallOutput + // once the server has run it. Render both on `done` so we get + // each transcript entry exactly once. + const item = event.item + if (!item) break + if (item.FunctionCall) { + setTranscript(prev => [...prev, { + role: 'tool_call', + text: `${item.FunctionCall.name}(${item.FunctionCall.arguments || ''})`, + }]) + } else if (item.FunctionCallOutput) { + let preview = item.FunctionCallOutput.output || '' + // Pretty-print JSON for readability; fall back to raw string. + try { preview = JSON.stringify(JSON.parse(preview), null, 2) } catch (_) { /* keep raw */ } + setTranscript(prev => [...prev, { role: 'tool_result', text: preview }]) + streamingRef.current = null // tool result ends the current assistant text run + } + break + } + case 'response.function_call_arguments.done': + // Don't await — keep the event loop free; handleFunctionCall sends + // conversation.item.create + response.create when it's done. + handleFunctionCall(event) + break case 'response.done': updateStatus('listening', 'Listening...') break @@ -171,12 +298,12 @@ export default function Talk() { updateStatus('error', 'Error: ' + (event.error?.message || 'Unknown error')) break } - }, [sendSessionUpdate, updateStatus]) + }, [sendSessionUpdate, updateStatus, handleFunctionCall]) // ── Connect ── const connect = useCallback(async () => { if (!selectedModel) { - addToast('Please select a pipeline model first.', 'warning') + addToast('Please select a realtime model first.', 'warning') return } if (!navigator.mediaDevices?.getUserMedia) { @@ -237,6 +364,7 @@ export default function Talk() { const data = await realtimeApi.call({ sdp: pc.localDescription.sdp, model: selectedModel, + localai_assistant: manageMode, }) await pc.setRemoteDescription({ type: 'answer', sdp: data.sdp }) @@ -245,7 +373,7 @@ export default function Talk() { updateStatus('error', 'Connection failed: ' + err.message) disconnect() } - }, [selectedModel, diagVisible, handleServerEvent, updateStatus, addToast]) + }, [selectedModel, manageMode, diagVisible, handleServerEvent, updateStatus, addToast]) // ── Disconnect ── const disconnect = useCallback(() => { @@ -508,8 +636,58 @@ export default function Talk() {
+ {/* Tools (client-side MCP servers, mirroring the chat page) */} +
+ + + {isAdmin && ( + + )} +
+ {/* Pipeline details */} - {selectedModelInfo && ( + {selectedModelInfo && selectedModelInfo.self_contained && ( +
+ + Self-contained any-to-any — + + {selectedModelInfo.name} + + handles VAD · STT · LLM · TTS +
+ )} + {selectedModelInfo && !selectedModelInfo.self_contained && (
)} @@ -600,16 +779,28 @@ export default function Talk() { Conversation will appear here...

)} - {transcript.map((entry, i) => ( -
- -

{entry.text}

-
- ))} + {transcript.map((entry, i) => { + const isToolCall = entry.role === 'tool_call' + const isToolResult = entry.role === 'tool_result' + const isUser = entry.role === 'user' + const iconClass = isToolCall ? 'fa-solid fa-screwdriver-wrench' + : isToolResult ? 'fa-solid fa-clipboard-list' + : isUser ? 'fa-solid fa-user' : 'fa-solid fa-robot' + const iconColor = isToolCall || isToolResult ? 'var(--color-text-secondary)' + : isUser ? 'var(--color-primary)' : 'var(--color-accent)' + return ( +
+ +

{entry.text}

+
+ ) + })}
diff --git a/core/http/react-ui/src/utils/capabilities.js b/core/http/react-ui/src/utils/capabilities.js index 2b43b2092..5d2956e77 100644 --- a/core/http/react-ui/src/utils/capabilities.js +++ b/core/http/react-ui/src/utils/capabilities.js @@ -20,3 +20,4 @@ export const CAP_DETECTION = 'FLAG_DETECTION' export const CAP_FACE_RECOGNITION = 'FLAG_FACE_RECOGNITION' export const CAP_SPEAKER_RECOGNITION = 'FLAG_SPEAKER_RECOGNITION' export const CAP_AUDIO_TRANSFORM = 'FLAG_AUDIO_TRANSFORM' +export const CAP_REALTIME_AUDIO = 'FLAG_REALTIME_AUDIO' diff --git a/core/http/routes/ui.go b/core/http/routes/ui.go index 911fed695..32c223eed 100644 --- a/core/http/routes/ui.go +++ b/core/http/routes/ui.go @@ -18,7 +18,11 @@ func RegisterUIRoutes(app *echo.Echo, // SPA routes are handled by the 404 fallback in app.go which serves // index.html for any unmatched HTML request, enabling client-side routing. - // Pipeline models API (for the Talk page WebRTC interface) + // Pipeline models API (for the Talk page WebRTC interface). + // A model qualifies when it either declares an explicit VAD+STT+LLM+TTS + // pipeline (legacy/composed) or carries the realtime_audio usecase (a + // self-contained any-to-any audio backend like liquid-audio that owns the + // full loop in a single AudioToAudioStream RPC). app.GET("/api/pipeline-models", func(c echo.Context) error { type pipelineModelInfo struct { Name string `json:"name"` @@ -27,9 +31,17 @@ func RegisterUIRoutes(app *echo.Echo, LLM string `json:"llm"` TTS string `json:"tts"` Voice string `json:"voice"` + // SelfContained is true for any-to-any audio models — the four + // pipeline slots are populated with the model's own name so the + // UI can render them, but the Realtime API routes the session + // directly to the backend's AudioToAudioStream RPC. + SelfContained bool `json:"self_contained,omitempty"` } pipelineModels := cl.GetModelConfigsByFilter(func(_ string, cfg *config.ModelConfig) bool { + if cfg.HasUsecases(config.FLAG_REALTIME_AUDIO) { + return true + } p := cfg.Pipeline return p.VAD != "" && p.Transcription != "" && p.LLM != "" && p.TTS != "" }) @@ -38,8 +50,20 @@ func RegisterUIRoutes(app *echo.Echo, return cmp.Compare(a.Name, b.Name) }) - var models []pipelineModelInfo + models := make([]pipelineModelInfo, 0, len(pipelineModels)) for _, cfg := range pipelineModels { + if cfg.HasUsecases(config.FLAG_REALTIME_AUDIO) { + models = append(models, pipelineModelInfo{ + Name: cfg.Name, + VAD: cfg.Name, + Transcription: cfg.Name, + LLM: cfg.Name, + TTS: cfg.Name, + Voice: cfg.TTSConfig.Voice, + SelfContained: true, + }) + continue + } models = append(models, pipelineModelInfo{ Name: cfg.Name, VAD: cfg.Pipeline.VAD, diff --git a/core/http/routes/ui_api.go b/core/http/routes/ui_api.go index fbd5247eb..d75f13947 100644 --- a/core/http/routes/ui_api.go +++ b/core/http/routes/ui_api.go @@ -54,6 +54,7 @@ var usecaseFilters = map[string]config.ModelConfigUsecase{ config.UsecaseVAD: config.FLAG_VAD, config.UsecaseAudioTransform: config.FLAG_AUDIO_TRANSFORM, config.UsecaseDiarization: config.FLAG_DIARIZATION, + config.UsecaseRealtimeAudio: config.FLAG_REALTIME_AUDIO, } diff --git a/core/http/routes/ui_pipeline_models_test.go b/core/http/routes/ui_pipeline_models_test.go new file mode 100644 index 000000000..24f4c3099 --- /dev/null +++ b/core/http/routes/ui_pipeline_models_test.go @@ -0,0 +1,153 @@ +package routes_test + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + + "github.com/labstack/echo/v4" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/routes" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Pipeline models API", func() { + var ( + app *echo.Echo + tempDir string + configLoader *config.ModelConfigLoader + ) + + BeforeEach(func() { + var err error + tempDir, err = os.MkdirTemp("", "pipeline-models-test-*") + Expect(err).NotTo(HaveOccurred()) + + configLoader = config.NewModelConfigLoader(tempDir) + }) + + AfterEach(func() { + Expect(os.RemoveAll(tempDir)).To(Succeed()) + }) + + writeConfig := func(name, body string) { + path := filepath.Join(tempDir, name+".yaml") + Expect(os.WriteFile(path, []byte(body), 0o644)).To(Succeed()) + } + + queryPipelineModels := func() []map[string]any { + Expect(configLoader.LoadModelConfigsFromPath(tempDir)).To(Succeed()) + + app = echo.New() + routes.RegisterUIRoutes(app, configLoader, nil, nil, func(next echo.HandlerFunc) echo.HandlerFunc { return next }) + + req := httptest.NewRequest(http.MethodGet, "/api/pipeline-models", nil) + rec := httptest.NewRecorder() + app.ServeHTTP(rec, req) + Expect(rec.Code).To(Equal(http.StatusOK)) + body, err := io.ReadAll(rec.Body) + Expect(err).NotTo(HaveOccurred()) + + var got []map[string]any + Expect(json.Unmarshal(body, &got)).To(Succeed()) + return got + } + + It("returns models with an explicit VAD/STT/LLM/TTS pipeline", func() { + writeConfig("legacy-pipeline", ` +name: legacy-pipeline +backend: llama-cpp +pipeline: + vad: silero + transcription: whisper + llm: llama + tts: piper +tts: + voice: en-amy +`) + // A model with a partial pipeline must not appear. + writeConfig("half-pipeline", ` +name: half-pipeline +backend: llama-cpp +pipeline: + vad: silero + transcription: whisper +`) + + models := queryPipelineModels() + Expect(models).To(HaveLen(1)) + Expect(models[0]["name"]).To(Equal("legacy-pipeline")) + Expect(models[0]["vad"]).To(Equal("silero")) + Expect(models[0]["llm"]).To(Equal("llama")) + Expect(models[0]["voice"]).To(Equal("en-amy")) + // self_contained is omitempty — absent for legacy pipelines. + _, hasFlag := models[0]["self_contained"] + Expect(hasFlag).To(BeFalse()) + }) + + It("surfaces self-contained any-to-any models tagged with realtime_audio", func() { + writeConfig("lfm-realtime", ` +name: lfm-realtime +backend: liquid-audio +known_usecases: + - realtime_audio + - chat + - tts + - transcript +tts: + voice: us_female +`) + + models := queryPipelineModels() + Expect(models).To(HaveLen(1)) + Expect(models[0]["name"]).To(Equal("lfm-realtime")) + // All four pipeline slots are populated with the model's own name so + // the Talk page UI has something to render. + Expect(models[0]["vad"]).To(Equal("lfm-realtime")) + Expect(models[0]["transcription"]).To(Equal("lfm-realtime")) + Expect(models[0]["llm"]).To(Equal("lfm-realtime")) + Expect(models[0]["tts"]).To(Equal("lfm-realtime")) + Expect(models[0]["voice"]).To(Equal("us_female")) + Expect(models[0]["self_contained"]).To(BeTrue()) + }) + + It("includes both legacy and self-contained models in the same response", func() { + writeConfig("legacy", ` +name: legacy +backend: llama-cpp +pipeline: + vad: silero + transcription: whisper + llm: llama + tts: piper +`) + writeConfig("realtime", ` +name: realtime +backend: liquid-audio +known_usecases: + - realtime_audio +`) + + models := queryPipelineModels() + Expect(models).To(HaveLen(2)) + // Sorted by name → legacy, realtime. + Expect(models[0]["name"]).To(Equal("legacy")) + Expect(models[1]["name"]).To(Equal("realtime")) + Expect(models[1]["self_contained"]).To(BeTrue()) + }) + + It("excludes models that have neither a pipeline nor realtime_audio", func() { + writeConfig("plain-chat", ` +name: plain-chat +backend: llama-cpp +known_usecases: + - chat +`) + + Expect(queryPipelineModels()).To(BeEmpty()) + }) +}) diff --git a/core/services/nodes/health_mock_test.go b/core/services/nodes/health_mock_test.go index f4a706261..c5ca0e26f 100644 --- a/core/services/nodes/health_mock_test.go +++ b/core/services/nodes/health_mock_test.go @@ -232,6 +232,9 @@ func (c *fakeBackendClient) AudioTransform(_ context.Context, _ *pb.AudioTransfo func (c *fakeBackendClient) AudioTransformStream(_ context.Context, _ ...ggrpc.CallOption) (grpc.AudioTransformStreamClient, error) { return nil, nil } +func (c *fakeBackendClient) AudioToAudioStream(_ context.Context, _ ...ggrpc.CallOption) (grpc.AudioToAudioStreamClient, error) { + return nil, nil +} func (c *fakeBackendClient) ModelMetadata(_ context.Context, _ *pb.ModelOptions, _ ...ggrpc.CallOption) (*pb.ModelMetadataResponse, error) { return nil, nil } diff --git a/core/services/nodes/inflight_test.go b/core/services/nodes/inflight_test.go index 3ea4c017d..edb04b6f8 100644 --- a/core/services/nodes/inflight_test.go +++ b/core/services/nodes/inflight_test.go @@ -176,6 +176,10 @@ func (f *fakeGRPCBackend) AudioTransformStream(_ context.Context, _ ...ggrpc.Cal return nil, nil } +func (f *fakeGRPCBackend) AudioToAudioStream(_ context.Context, _ ...ggrpc.CallOption) (grpc.AudioToAudioStreamClient, error) { + return nil, nil +} + func (f *fakeGRPCBackend) ModelMetadata(_ context.Context, _ *pb.ModelOptions, _ ...ggrpc.CallOption) (*pb.ModelMetadataResponse, error) { return &pb.ModelMetadataResponse{}, nil } diff --git a/gallery/index.yaml b/gallery/index.yaml index 5cfe31b47..f00dfd628 100644 --- a/gallery/index.yaml +++ b/gallery/index.yaml @@ -3197,6 +3197,110 @@ - filename: llama-cpp/models/LFM2.5-1.2B-Nova-Function-Calling.Q4_K_M.gguf sha256: 5d039ad4195447cf4b6dbee8f7fe11f985c01d671a18153084c869077e431fbf uri: https://huggingface.co/NovachronoAI/LFM2.5-1.2B-Nova-Function-Calling-GGUF/resolve/main/LFM2.5-1.2B-Nova-Function-Calling.Q4_K_M.gguf +- name: lfm2.5-audio-1.5b-realtime + url: github:mudler/LocalAI/gallery/liquid-audio.yaml@master + urls: + - https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B + description: | + LFM2.5-Audio-1.5B is LiquidAI's any-to-any audio foundation model. The + 1.2B LFM2.5 backbone plus a FastConformer audio encoder and an LFM2-based + audio detokenizer give real-time speech-to-speech with text + audio output + interleaved at 12.5 Hz / 24 kHz. This entry runs in S2S (speech-to-speech) + mode and is the model the LocalAI realtime API any-to-any path consumes. + Switch to ASR, TTS, or chat by picking the sibling gallery entries. + license: LFM-Open-License-v1.0 + icon: https://cdn-avatars.huggingface.co/v1/production/uploads/61b8e2ba285851687028d395/7_6D7rWrLxp2hb6OHSV1p.png + tags: + - lfm2 + - liquid + - audio + - speech-to-speech + - any-to-any + - realtime + - 1.5b + last_checked: "2026-05-11" + overrides: + backend: liquid-audio + # realtime_audio drives the Talk-page filter; the rest let the model + # also surface on the chat / transcribe / speech endpoints when called + # directly (the backend implements all three RPCs). + known_usecases: + - realtime_audio + - chat + - transcript + - tts + - vad + options: + - mode:s2s +- name: lfm2.5-audio-1.5b-chat + url: github:mudler/LocalAI/gallery/liquid-audio.yaml@master + urls: + - https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B + description: | + LFM2.5-Audio-1.5B in text-only chat mode. The model runs `generate_sequential` + with no audio modality, behaving like a small LFM2 chat model. Pick this + entry for tool-calling experiments without the audio overhead. + license: LFM-Open-License-v1.0 + tags: + - lfm2 + - liquid + - audio + - chat + - 1.5b + last_checked: "2026-05-11" + overrides: + backend: liquid-audio + known_usecases: + - chat + options: + - mode:chat +- name: lfm2.5-audio-1.5b-asr + url: github:mudler/LocalAI/gallery/liquid-audio.yaml@master + urls: + - https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B + description: | + LFM2.5-Audio-1.5B in ASR mode. System prompt `Perform ASR.` is prepended; + output is capitalised and punctuated. Wire this entry as a transcription + model on the /v1/audio/transcriptions endpoint. + license: LFM-Open-License-v1.0 + tags: + - lfm2 + - liquid + - audio + - asr + - speech-to-text + - 1.5b + last_checked: "2026-05-11" + overrides: + backend: liquid-audio + known_usecases: + - transcript + options: + - mode:asr +- name: lfm2.5-audio-1.5b-tts + url: github:mudler/LocalAI/gallery/liquid-audio.yaml@master + urls: + - https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B + description: | + LFM2.5-Audio-1.5B in TTS mode. Four baked voices: us_male, us_female, + uk_male, uk_female — pick the default at load time via `voice:` option, + or override per-request via the OpenAI `/v1/audio/speech` `voice` field. + license: LFM-Open-License-v1.0 + tags: + - lfm2 + - liquid + - audio + - tts + - text-to-speech + - 1.5b + last_checked: "2026-05-11" + overrides: + backend: liquid-audio + known_usecases: + - tts + options: + - mode:tts + - voice:us_female - name: mistral-nemo-instruct-2407-12b-thinking-m-claude-opus-high-reasoning-i1 url: github:mudler/LocalAI/gallery/virtual.yaml@master urls: diff --git a/gallery/lfm.yaml b/gallery/lfm.yaml index 145230822..0cd0d313f 100644 --- a/gallery/lfm.yaml +++ b/gallery/lfm.yaml @@ -10,6 +10,16 @@ config_file: | - - - <|endoftext|> + function: + # LFM2 Pythonic tool-call syntax: <|tool_call_start|>[name(k="v", ...)]<|tool_call_end|> + # Mirrors common_chat_params_init_lfm2 in llama.cpp/common/chat.cpp. + response_regex: + - '<\|tool_call_start\|>\[(?P\w+)\((?P.*?)\)\]<\|tool_call_end\|>' + argument_regex: + - '(?P\w+)\s*=\s*"(?P[^"]*)"' + - '(?P\w+)\s*=\s*(?P-?\d+(?:\.\d+)?|true|false|null)' + argument_regex_key_name: key + argument_regex_value_name: value template: chat: | {{.Input -}} diff --git a/gallery/liquid-audio.yaml b/gallery/liquid-audio.yaml new file mode 100644 index 000000000..cd9ead91b --- /dev/null +++ b/gallery/liquid-audio.yaml @@ -0,0 +1,40 @@ +--- +name: "liquid-audio" + +description: | + LiquidAI LFM2 / LFM2.5 Audio models served by the Python `liquid-audio` backend. + Supports four roles via the `mode:` option: + - chat text-only chat completion (generate_sequential, no audio) + - asr speech-to-text (Perform ASR. system prompt) + - tts text-to-speech in 4 baked voices (us_male/us_female/uk_male/uk_female) + - s2s interleaved speech-to-speech (the realtime any-to-any path) + +license: "LFM Open License v1.0" + +urls: + - https://huggingface.co/LiquidAI/LFM2.5-Audio-1.5B + - https://github.com/Liquid4All/liquid-audio + +config_file: | + backend: liquid-audio + context_size: 32768 + f16: true + mmap: true + # realtime_audio surfaces the model on the Talk page; chat/tts/transcript + # let it also serve the standalone /v1/chat/completions, /v1/audio/speech, + # and /v1/audio/transcriptions endpoints (backend implements all three). + known_usecases: + - realtime_audio + - chat + - tts + - transcript + - vad + parameters: + model: LiquidAI/LFM2.5-Audio-1.5B + # Special tokens emitted in the text track during interleaved generation. + # Included so a future client-side parser can spot them; the LFM2 tool-call + # format itself is auto-detected by the upstream llama.cpp parser when the + # model loads under that backend. + stopwords: + - <|im_end|> + - <|endoftext|> diff --git a/pkg/functions/parse_lfm2_test.go b/pkg/functions/parse_lfm2_test.go new file mode 100644 index 000000000..f8d364a39 --- /dev/null +++ b/pkg/functions/parse_lfm2_test.go @@ -0,0 +1,106 @@ +package functions_test + +import ( + . "github.com/mudler/LocalAI/pkg/functions" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// LFM2 / LFM2.5 emit tool calls in a Pythonic syntax wrapped in special tokens: +// +// <|tool_call_start|>[func_name(arg1="value1", arg2="value2")]<|tool_call_end|> +// +// See backend/cpp/llama-cpp/llama.cpp/common/chat.cpp:1277 (common_chat_params_init_lfm2) +// and https://docs.liquid.ai/lfm/key-concepts/tool-use. The format is auto-detected +// by upstream llama.cpp when the chat template contains <|tool_list_start|>/<|tool_list_end|>. +// +// The tests below pin the LocalAI-side parser config (response_regex + argument_regex) +// that the lfm gallery template ships, so configurations relying on the gRPC backend +// returning raw text (rather than pre-parsed tool_calls via use_jinja) still work. +var _ = Describe("LFM2 Pythonic tool-call parsing", func() { + // Matches the markers exactly; non-greedy `arguments` so the closing `)]` of one + // call doesn't swallow trailing content that happens to share characters. + const lfm2ResponseRegex = `<\|tool_call_start\|>\[(?P\w+)\((?P.*?)\)\]<\|tool_call_end\|>` + + // Two argument extractors: quoted strings and bare scalars (numbers / true / false / null). + // ParseFunctionCallArgs runs every regex in order, so later matches with the same key + // would overwrite earlier ones — which is fine here because the patterns are disjoint. + var lfm2ArgRegex = []string{ + `(?P\w+)\s*=\s*"(?P[^"]*)"`, + `(?P\w+)\s*=\s*(?P-?\d+(?:\.\d+)?|true|false|null)`, + } + + cfg := func() FunctionsConfig { + return FunctionsConfig{ + ResponseRegex: []string{lfm2ResponseRegex}, + ArgumentRegex: lfm2ArgRegex, + ArgumentRegexKey: "key", + ArgumentRegexValue: "value", + } + } + + It("parses a single string-arg call", func() { + input := `<|tool_call_start|>[get_weather(city="Berlin")]<|tool_call_end|>` + results := ParseFunctionCall(input, cfg()) + Expect(results).To(HaveLen(1)) + Expect(results[0].Name).To(Equal("get_weather")) + Expect(results[0].Arguments).To(Equal(`{"city":"Berlin"}`)) + }) + + It("parses multiple string args", func() { + input := `<|tool_call_start|>[search(query="hello world", source="web")]<|tool_call_end|>` + results := ParseFunctionCall(input, cfg()) + Expect(results).To(HaveLen(1)) + Expect(results[0].Name).To(Equal("search")) + // argument map ordering is not stable; check content as JSON + Expect(results[0].Arguments).To(SatisfyAny( + Equal(`{"query":"hello world","source":"web"}`), + Equal(`{"source":"web","query":"hello world"}`), + )) + }) + + It("parses numeric and boolean args", func() { + input := `<|tool_call_start|>[set_volume(level=42, mute=false)]<|tool_call_end|>` + results := ParseFunctionCall(input, cfg()) + Expect(results).To(HaveLen(1)) + Expect(results[0].Name).To(Equal("set_volume")) + // ArgumentRegex always emits string values; the JSON we produce represents + // them as strings. A typed parser is a future enhancement (PEG parser). + Expect(results[0].Arguments).To(SatisfyAny( + Equal(`{"level":"42","mute":"false"}`), + Equal(`{"mute":"false","level":"42"}`), + )) + }) + + It("parses a no-args call", func() { + input := `<|tool_call_start|>[get_time()]<|tool_call_end|>` + results := ParseFunctionCall(input, cfg()) + Expect(results).To(HaveLen(1)) + Expect(results[0].Name).To(Equal("get_time")) + Expect(results[0].Arguments).To(Equal(`{}`)) + }) + + It("ignores surrounding text", func() { + input := `Sure, let me check. +<|tool_call_start|>[get_weather(city="Paris")]<|tool_call_end|> +Standby.` + results := ParseFunctionCall(input, cfg()) + Expect(results).To(HaveLen(1)) + Expect(results[0].Name).To(Equal("get_weather")) + Expect(results[0].Arguments).To(Equal(`{"city":"Paris"}`)) + }) + + It("returns no results when the markers are absent", func() { + input := `Plain text response with no tool call.` + results := ParseFunctionCall(input, cfg()) + Expect(results).To(BeEmpty()) + }) + + It("preserves quoted argument values that contain spaces and equals signs", func() { + input := `<|tool_call_start|>[search(query="x = y + 1")]<|tool_call_end|>` + results := ParseFunctionCall(input, cfg()) + Expect(results).To(HaveLen(1)) + Expect(results[0].Name).To(Equal("search")) + Expect(results[0].Arguments).To(Equal(`{"query":"x = y + 1"}`)) + }) +}) diff --git a/pkg/grpc/backend.go b/pkg/grpc/backend.go index 2f3b2a192..eaabea8ef 100644 --- a/pkg/grpc/backend.go +++ b/pkg/grpc/backend.go @@ -82,6 +82,7 @@ type Backend interface { AudioTransform(ctx context.Context, in *pb.AudioTransformRequest, opts ...grpc.CallOption) (*pb.AudioTransformResult, error) AudioTransformStream(ctx context.Context, opts ...grpc.CallOption) (AudioTransformStreamClient, error) + AudioToAudioStream(ctx context.Context, opts ...grpc.CallOption) (AudioToAudioStreamClient, error) ModelMetadata(ctx context.Context, in *pb.ModelOptions, opts ...grpc.CallOption) (*pb.ModelMetadataResponse, error) diff --git a/pkg/grpc/base/base.go b/pkg/grpc/base/base.go index c5a0fc62f..66f7a9a18 100644 --- a/pkg/grpc/base/base.go +++ b/pkg/grpc/base/base.go @@ -158,6 +158,11 @@ func (llm *Base) AudioTransformStream(in <-chan *pb.AudioTransformFrameRequest, return fmt.Errorf("unimplemented") } +func (llm *Base) AudioToAudioStream(in <-chan *pb.AudioToAudioRequest, out chan<- *pb.AudioToAudioResponse) error { + close(out) + return fmt.Errorf("unimplemented") +} + func (llm *Base) StartFineTune(*pb.FineTuneRequest) (*pb.FineTuneJobResult, error) { return nil, fmt.Errorf("unimplemented") } diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index d7277ee6b..8360d2645 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -805,6 +805,67 @@ func (c *Client) AudioTransformStream(ctx context.Context, opts ...grpc.CallOpti }, nil } +// AudioToAudioStreamClient is the duplex interface returned by +// (*Client).AudioToAudioStream. Mirrors AudioTransformStreamClient's +// shape so realtime-API callers can plug in interchangeable backends. +type AudioToAudioStreamClient interface { + Send(*pb.AudioToAudioRequest) error + Recv() (*pb.AudioToAudioResponse, error) + CloseSend() error + Context() context.Context +} + +type audioToAudioStreamClient struct { + pb.Backend_AudioToAudioStreamClient + conn *grpc.ClientConn + closer func() +} + +func (s *audioToAudioStreamClient) CloseSend() error { + err := s.Backend_AudioToAudioStreamClient.CloseSend() + if s.closer != nil { + s.closer() + } + return err +} + +func (c *Client) AudioToAudioStream(ctx context.Context, opts ...grpc.CallOption) (AudioToAudioStreamClient, error) { + if !c.parallel { + c.opMutex.Lock() + } + c.setBusy(true) + c.wdMark() + + cleanup := func() { + c.wdUnMark() + c.setBusy(false) + if !c.parallel { + c.opMutex.Unlock() + } + } + + conn, err := c.dial() + if err != nil { + cleanup() + return nil, err + } + client := pb.NewBackendClient(conn) + stream, err := client.AudioToAudioStream(ctx, opts...) + if err != nil { + _ = conn.Close() + cleanup() + return nil, err + } + return &audioToAudioStreamClient{ + Backend_AudioToAudioStreamClient: stream, + conn: conn, + closer: func() { + _ = conn.Close() + cleanup() + }, + }, nil +} + func (c *Client) StartFineTune(ctx context.Context, in *pb.FineTuneRequest, opts ...grpc.CallOption) (*pb.FineTuneJobResult, error) { if !c.parallel { c.opMutex.Lock() diff --git a/pkg/grpc/embed.go b/pkg/grpc/embed.go index c9fd307bf..15d9615c8 100644 --- a/pkg/grpc/embed.go +++ b/pkg/grpc/embed.go @@ -181,6 +181,31 @@ func (e *embedBackend) AudioTransformStream(ctx context.Context, opts ...grpc.Ca }, nil } +func (e *embedBackend) AudioToAudioStream(ctx context.Context, opts ...grpc.CallOption) (AudioToAudioStreamClient, error) { + reqs := make(chan *pb.AudioToAudioRequest, 8) + resps := make(chan *pb.AudioToAudioResponse, 8) + srvDone := make(chan error, 1) + + server := &embedBackendAudioToAudioStream{ + ctx: ctx, + reqs: reqs, + resps: resps, + } + + go func() { + err := e.s.AudioToAudioStream(server) + close(resps) + srvDone <- err + }() + + return &embedBackendAudioToAudioStreamClient{ + ctx: ctx, + reqs: reqs, + resps: resps, + srvDone: srvDone, + }, nil +} + func (e *embedBackend) ModelMetadata(ctx context.Context, in *pb.ModelOptions, opts ...grpc.CallOption) (*pb.ModelMetadataResponse, error) { return e.s.ModelMetadata(ctx, in) } @@ -236,6 +261,8 @@ func (e *embedBackend) Free(ctx context.Context) error { var _ pb.Backend_AudioTransformStreamServer = new(embedBackendAudioTransformStream) var _ AudioTransformStreamClient = new(embedBackendAudioTransformStreamClient) +var _ pb.Backend_AudioToAudioStreamServer = new(embedBackendAudioToAudioStream) +var _ AudioToAudioStreamClient = new(embedBackendAudioToAudioStreamClient) // embedBackendAudioTransformStream is the server side of an in-process bidi // stream. The hosted server reads requests from `reqs` (closed by client when @@ -332,6 +359,99 @@ func (e *embedBackendAudioTransformStreamClient) CloseSend() error { func (e *embedBackendAudioTransformStreamClient) Context() context.Context { return e.ctx } +// embedBackendAudioToAudioStream is the in-process server-side handle for +// the bidirectional any-to-any audio RPC. Mirrors embedBackendAudioTransform +// Stream — the hosted server reads requests from `reqs` (closed by client +// when done sending) and writes responses to `resps`. +type embedBackendAudioToAudioStream struct { + ctx context.Context + reqs <-chan *pb.AudioToAudioRequest + resps chan<- *pb.AudioToAudioResponse +} + +func (e *embedBackendAudioToAudioStream) Send(resp *pb.AudioToAudioResponse) error { + select { + case e.resps <- resp: + return nil + case <-e.ctx.Done(): + return e.ctx.Err() + } +} + +func (e *embedBackendAudioToAudioStream) Recv() (*pb.AudioToAudioRequest, error) { + select { + case req, ok := <-e.reqs: + if !ok { + return nil, io.EOF + } + return req, nil + case <-e.ctx.Done(): + return nil, e.ctx.Err() + } +} + +func (e *embedBackendAudioToAudioStream) SetHeader(md metadata.MD) error { return nil } +func (e *embedBackendAudioToAudioStream) SendHeader(md metadata.MD) error { return nil } +func (e *embedBackendAudioToAudioStream) SetTrailer(md metadata.MD) {} +func (e *embedBackendAudioToAudioStream) Context() context.Context { return e.ctx } +func (e *embedBackendAudioToAudioStream) SendMsg(m any) error { + if x, ok := m.(*pb.AudioToAudioResponse); ok { + return e.Send(x) + } + return nil +} +func (e *embedBackendAudioToAudioStream) RecvMsg(m any) error { return nil } + +type embedBackendAudioToAudioStreamClient struct { + ctx context.Context + reqs chan<- *pb.AudioToAudioRequest + resps <-chan *pb.AudioToAudioResponse + srvDone <-chan error + closeOnce bool +} + +func (e *embedBackendAudioToAudioStreamClient) Send(req *pb.AudioToAudioRequest) error { + select { + case e.reqs <- req: + return nil + case <-e.ctx.Done(): + return e.ctx.Err() + } +} + +func (e *embedBackendAudioToAudioStreamClient) Recv() (*pb.AudioToAudioResponse, error) { + select { + case resp, ok := <-e.resps: + if !ok { + // Server goroutine writes to srvDone immediately after closing + // resps; block (cap with ctx) so we don't race past a real error. + select { + case err := <-e.srvDone: + if err != nil { + return nil, err + } + case <-e.ctx.Done(): + return nil, e.ctx.Err() + } + return nil, io.EOF + } + return resp, nil + case <-e.ctx.Done(): + return nil, e.ctx.Err() + } +} + +func (e *embedBackendAudioToAudioStreamClient) CloseSend() error { + if e.closeOnce { + return nil + } + e.closeOnce = true + close(e.reqs) + return nil +} + +func (e *embedBackendAudioToAudioStreamClient) Context() context.Context { return e.ctx } + var _ pb.Backend_AudioTranscriptionStreamServer = new(embedBackendAudioTranscriptionStream) type embedBackendAudioTranscriptionStream struct { diff --git a/pkg/grpc/interface.go b/pkg/grpc/interface.go index 82f8af23b..bce3f689c 100644 --- a/pkg/grpc/interface.go +++ b/pkg/grpc/interface.go @@ -45,6 +45,7 @@ type AIModel interface { AudioTransform(*pb.AudioTransformRequest) (*pb.AudioTransformResult, error) AudioTransformStream(in <-chan *pb.AudioTransformFrameRequest, out chan<- *pb.AudioTransformFrameResponse) error + AudioToAudioStream(in <-chan *pb.AudioToAudioRequest, out chan<- *pb.AudioToAudioResponse) error ModelMetadata(*pb.ModelOptions) (*pb.ModelMetadataResponse, error) diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index 396547ca9..4eaa71297 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -487,6 +487,66 @@ func (s *server) AudioTransformStream(stream pb.Backend_AudioTransformStreamServ return recvErr } +// AudioToAudioStream is the bidirectional any-to-any S2S handler. The +// shape mirrors AudioTransformStream exactly (recv → in chan, out chan → +// send) so backends can implement either via the same goroutine idiom. +func (s *server) AudioToAudioStream(stream pb.Backend_AudioToAudioStreamServer) error { + if s.llm.Locking() { + s.llm.Lock() + defer s.llm.Unlock() + } + + in := make(chan *pb.AudioToAudioRequest, 8) + out := make(chan *pb.AudioToAudioResponse, 8) + + recvErrCh := make(chan error, 1) + go func() { + defer close(in) + for { + req, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + recvErrCh <- nil + return + } + recvErrCh <- err + return + } + select { + case in <- req: + case <-stream.Context().Done(): + recvErrCh <- stream.Context().Err() + return + } + } + }() + + sendDone := make(chan error, 1) + go func() { + for resp := range out { + if err := stream.Send(resp); err != nil { + sendDone <- err + for range out { + } + return + } + } + sendDone <- nil + }() + + backendErr := s.llm.AudioToAudioStream(in, out) + sendErr := <-sendDone + recvErr := <-recvErrCh + + if backendErr != nil { + return backendErr + } + if sendErr != nil { + return sendErr + } + return recvErr +} + func (s *server) StartFineTune(ctx context.Context, in *pb.FineTuneRequest) (*pb.FineTuneJobResult, error) { if s.llm.Locking() { s.llm.Lock()