mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 04:56:52 -04:00
* feat(liquid-audio): add LFM2.5-Audio any-to-any backend + realtime_audio usecase
Wires LiquidAI's LFM2.5-Audio-1.5B as a self-contained Realtime API model:
single engine handles VAD, transcription, LLM, and TTS in one bidirectional
stream — drop-in alternative to a VAD+STT+LLM+TTS pipeline.
Backend
- backend/python/liquid-audio/ — new Python gRPC backend wrapping the
`liquid-audio` package. Modes: chat / asr / tts / s2s, voice presets,
Load/Predict/PredictStream/AudioTranscription/TTS/VAD/AudioToAudioStream/
Free and StartFineTune/FineTuneProgress/StopFineTune. Runtime monkey-patch
on `liquid_audio.utils.snapshot_download` so absolute local paths from
LocalAI's gallery resolve without a HF round-trip. soundfile in place of
torchaudio.load/save (torchcodec drags NVIDIA NPP we don't bundle).
- backend/backend.proto + pkg/grpc/{backend,client,server,base,embed,
interface}.go — new AudioToAudioStream RPC mirroring AudioTransformStream
(config/frame/control oneof in; typed event+pcm+meta out).
- core/services/nodes/{health_mock,inflight}_test.go — add stubs for the
new RPC to the test fakes.
Config + capabilities
- core/config/backend_capabilities.go — UsecaseRealtimeAudio, MethodAudio
ToAudioStream, UsecaseInfoMap entry, liquid-audio BackendCapability row.
- core/config/model_config.go — FLAG_REALTIME_AUDIO bitmask, ModalityGroups
membership in both speech-input and audio-output groups so a lone flag
still reads as multimodal, GetAllModelConfigUsecases entry, GuessUsecases
branch.
Realtime endpoint
- core/http/endpoints/openai/realtime.go — extract prepareRealtimeConfig()
so the gate is unit-testable; accept realtime_audio models and self-fill
empty pipeline slots with the model's own name (user-pinned slots win).
- core/http/endpoints/openai/realtime_gate_test.go — six specs covering nil
cfg, empty pipeline, legacy pipeline, self-contained realtime_audio,
user-pinned VAD slot, and partial legacy pipeline.
UI + endpoints
- core/http/routes/ui.go — /api/pipeline-models accepts either a legacy
VAD+STT+LLM+TTS pipeline or a realtime_audio model; surfaces a
self_contained flag so the Talk page can collapse the four cards.
- core/http/routes/ui_api.go — realtime_audio in usecaseFilters.
- core/http/routes/ui_pipeline_models_test.go — covers both code paths.
- core/http/react-ui/src/pages/Talk.jsx — self-contained badge instead of
the four-slot grid; rename Edit Pipeline → Edit Model Config; less
pipeline-specific wording.
- core/http/react-ui/src/pages/Models.jsx + locales/en/models.json — new
realtime_audio filter button + i18n.
- core/http/react-ui/src/utils/capabilities.js — CAP_REALTIME_AUDIO.
- core/http/react-ui/src/pages/FineTune.jsx — voice + validation-dataset
fields, surfaced when backend === liquid-audio, plumbed via
extra_options on submit/export/import.
Gallery + importer
- gallery/liquid-audio.yaml — config template with known_usecases:
[realtime_audio, chat, tts, transcript, vad].
- gallery/index.yaml — four model entries (realtime/chat/asr/tts) keyed by
mode option. Fixed pre-existing `transcribe` typo on the asr entry
(loader silently dropped the unknown string → entry never surfaced as a
transcript model).
- gallery/lfm.yaml — function block for the LFM2 Pythonic tool-call format
`<|tool_call_start|>[name(k="v")]<|tool_call_end|>` matching
common_chat_params_init_lfm2 in vendored llama.cpp.
- core/gallery/importers/{liquid-audio,liquid-audio_test}.go — detector
matches LFM2-Audio HF repos (excludes -gguf mirrors); mode/voice
preferences plumbed through to options.
- core/gallery/importers/importers.go — register LiquidAudioImporter
before LlamaCPPImporter.
- pkg/functions/parse_lfm2_test.go — seven specs for the response/argument
regex pair on the LFM2 pythonic format.
Build matrix
- .github/backend-matrix.yml — seven liquid-audio targets (cuda12, cuda13,
l4t-cuda-13, hipblas, intel, cpu amd64, cpu arm64). Jetpack r36 cuda-12
is skipped (Ubuntu 22.04 / Python 3.10 incompatible with liquid-audio's
3.12 floor).
- backend/index.yaml — anchor + 13 image entries.
- Makefile — .NOTPARALLEL, prepare-test-extra, test-extra,
docker-build-liquid-audio.
Docs
- .agents/plans/liquid-audio-integration.md — phased plan; PR-D (real
any-to-any wiring via AudioToAudioStream), PR-E (mid-audio tool-call
detector), PR-G (GGUF entries once upstream llama.cpp PR #18641 lands)
remain.
- .agents/api-endpoints-and-auth.md — expand the capability-surface
checklist with every place a new FLAG_* needs to be registered.
Assisted-by: claude-code:claude-opus-4-7-1m [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>
* feat(realtime): function calling + history cap for any-to-any models
Three pieces, all on the realtime_audio path that just landed:
1. liquid-audio backend (backend/python/liquid-audio/backend.py):
- _build_chat_state grows a `tools_prelude` arg.
- new _render_tools_prelude parses request.Tools (the OpenAI Chat
Completions function array realtime.go already serialises) and
emits an LFM2 `<|tool_list_start|>…<|tool_list_end|>` system turn
ahead of the user history. Mirrors gallery/lfm.yaml's `function:`
template so the model sees the same prompt shape whether served
via llama-cpp or here. Without this the backend silently dropped
tools — function calling was wired end-to-end on the Go side but
the model never saw a tool list.
2. Realtime history cap (core/http/endpoints/openai/realtime.go):
- Session grows MaxHistoryItems int; default picked by new
defaultMaxHistoryItems(cfg) — 6 for realtime_audio models (LFM2.5
1.5B degrades quickly past a handful of turns), 0/unlimited for
legacy pipelines composing larger LLMs.
- triggerResponse runs conv.Items through trimRealtimeItems before
building conversationHistory. Helper walks the cut left if it
would orphan a function_call_output, so tool result + call pairs
stay intact.
- realtime_gate_test.go: specs for defaultMaxHistoryItems and
trimRealtimeItems (zero cap, under cap, over cap, tool-call pair
preservation).
3. Talk page (core/http/react-ui/src/pages/Talk.jsx):
- Reuses the chat page's MCP plumbing — useMCPClient hook,
ClientMCPDropdown component, same auto-connect/disconnect effect
pattern. No bespoke tool registry, no new REST endpoints; tools
come from whichever MCP servers the user toggles on, exactly as
on the chat page.
- sendSessionUpdate now passes session.tools=getToolsForLLM(); the
update re-fires when the active server set changes mid-session.
- New response.function_call_arguments.done handler executes via
the hook's executeTool (which round-trips through the MCP client
SDK), then replies with conversation.item.create
{type:function_call_output} + response.create so the model
completes its turn with the tool output. Mirrors chat's
client-side agentic loop, translated to the realtime wire shape.
UI changes require a LocalAI image rebuild (Dockerfile:308-313 bakes
react-ui/dist into the runtime image). Backend.py changes can be
swapped live in /backends/<id>/backend.py + /backend/shutdown.
Assisted-by: claude-code:claude-opus-4-7-1m [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>
* feat(realtime): LocalAI Assistant ("Manage Mode") for the Talk page
Mirrors the chat-page metadata.localai_assistant flow so users can ask the
realtime model what's loaded / installed / configured. Tools are run
server-side via the same in-process MCP holder that powers the chat
modality — no transport switch, no proxy, no new wire protocol.
Wire:
- core/http/endpoints/openai/realtime.go:
- RealtimeSessionOptions{LocalAIAssistant,IsAdmin}; isCurrentUserAdmin
helper mirrors chat.go's requireAssistantAccess (no-op when auth
disabled, else requires auth.RoleAdmin).
- Session grows AssistantExecutor mcpTools.ToolExecutor.
- runRealtimeSession, when opts.LocalAIAssistant is set: gate on admin,
fail closed if DisableLocalAIAssistant or the holder has no tools,
DiscoverTools and inject into session.Tools, prepend
holder.SystemPrompt() to instructions.
- Tool-call dispatch loop: when AssistantExecutor.IsTool(name), run
ExecuteTool inproc, append a FunctionCallOutput to conv.Items, skip
the function_call_arguments client emit (the client can't execute
these — it doesn't know about them). After the loop, if any
assistant tool ran, trigger another response so the model speaks the
result. Mirrors chat's agentic loop, driven server-side rather than
via client round-trip.
- core/http/endpoints/openai/realtime_webrtc.go: RealtimeCallRequest
gains `localai_assistant` (JSON omitempty). Handshake calls
isCurrentUserAdmin and builds RealtimeSessionOptions.
- core/http/react-ui/src/pages/Talk.jsx: admin-only "Manage Mode"
checkbox under the Tools dropdown; passes localai_assistant: true to
realtimeApi.call's body, captured in the connect callback's deps.
Mirroring chat's pattern means the in-process MCP tools surface "just
works" for the Talk page without exposing a Streamable-HTTP MCP endpoint
(which was the alternative). Clients with their own MCP servers can
still use the existing ClientMCPDropdown path in parallel; the realtime
handler distinguishes them by AssistantExecutor.IsTool() at dispatch
time.
Assisted-by: claude-code:claude-opus-4-7-1m [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>
* feat(realtime): render Manage Mode tool calls in the Talk transcript
Previously the realtime endpoint only emitted response.output_item.added
for the FunctionCall item, and Talk.jsx's switch ignored the event — so
server-side tool runs were invisible in the UI. The model would speak
the result but the user had no way to see what tool was actually
called.
realtime.go: after executing an assistant tool inproc, emit a second
output_item.added/.done pair for the FunctionCallOutput item. Mirrors
the way the chat page displays tool_call + tool_result blocks.
Talk.jsx: handle both response.output_item.added and .done. Render
FunctionCall (with arguments) and FunctionCallOutput (pretty-printed
JSON when possible) as two transcript entries — `tool_call` with the
wrench icon, `tool_result` with the clipboard icon, both in mono-space
secondary-colour. Resets streamingRef after the result so the next
assistant text delta starts a fresh transcript entry instead of
appending to the previous turn.
Assisted-by: claude-code:claude-opus-4-7-1m [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>
* refactor(realtime): bound the Manage Mode tool-loop + preserve assistant tools
Fallout from a review pass on the Manage Mode patches:
- Bound the server-side agentic loop. triggerResponse used to recurse on
executedAssistantTool with no cap — a model that kept calling tools
would blow the goroutine stack. New maxAssistantToolTurns = 10 (mirrors
useChat.js's maxToolTurns). Public triggerResponse is now a thin shim
over triggerResponseAtTurn(toolTurn int); recursion increments the
counter and stops at the cap with an xlog.Warn.
- Preserve Manage Mode tools across client session.update. The handler
used to blindly overwrite session.Tools, so toggling a client MCP
server mid-session silently wiped the in-process admin tools. Session
now caches the original AssistantTools slice at session creation and
the session.update handler merges them back in (client names win on
collision — the client is explicit).
- strconv.ParseBool for the localai_assistant query param instead of
hand-rolled "1" || "true". Mirrors LocalAIAssistantFromMetadata.
- Talk.jsx: render both tool_call and tool_result on
response.output_item.done instead of splitting them across .added and
.done. The server's event pairing (added → done) stays correct; the
UI just doesn't need to inspect both phases of the same item. One
switch case instead of two, no behavioural change.
Out of scope (noted for follow-ups): extract a shared assistant-tools
helper between chat.go and realtime.go (duplication is small enough
that two parallel implementations stay readable for now), and an i18n
key for the Manage Mode helper text (Talk.jsx doesn't use i18n
anywhere else yet).
Assisted-by: claude-code:claude-opus-4-7-1m [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>
* ci(test-extra): wire liquid-audio backend smoke test
The backend ships test.py + a `make test` target and is listed in
backend-matrix.yml, so scripts/changed-backends.js already writes a
`liquid-audio=true|false` output when files under backend/python/liquid-audio/
change. The workflow just wasn't reading it.
- Expose the `liquid-audio` output on the detect-changes job
- Add a tests-liquid-audio job that runs `make` + `make test` in
backend/python/liquid-audio, gated on the per-backend detect flag
The smoke covers Health() and LoadModel(mode:finetune); fine-tune mode
short-circuits before any HuggingFace download (backend.py:192), so the
job needs neither weights nor a GPU. The full-inference path remains
gated on LIQUID_AUDIO_MODEL_ID, which CI doesn't set.
The four new Go test files (core/gallery/importers/liquid-audio_test.go,
core/http/endpoints/openai/realtime_gate_test.go,
core/http/routes/ui_pipeline_models_test.go, pkg/functions/parse_lfm2_test.go)
are already picked up by the existing test.yml workflow via `make test` →
`ginkgo -r ./pkg/... ./core/...`; their packages all carry RunSpecs entries.
Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Richard Palethorpe <io@richiejp.com>
---------
Signed-off-by: Richard Palethorpe <io@richiejp.com>
872 lines
36 KiB
Python
872 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Liquid Audio backend for LocalAI.
|
|
|
|
Wraps LiquidAI's `liquid-audio` Python package (https://github.com/Liquid4All/liquid-audio).
|
|
The same model serves four roles, selected by the `mode` option at load time:
|
|
chat, asr, tts, s2s. Fine-tuning is exposed via StartFineTune.
|
|
"""
|
|
from concurrent import futures
|
|
import argparse
|
|
import json
|
|
import os
|
|
import queue
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
import uuid
|
|
|
|
import grpc
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'common'))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'common'))
|
|
from grpc_auth import get_auth_interceptors # noqa: E402
|
|
from python_utils import parse_options # noqa: E402
|
|
|
|
import backend_pb2 # noqa: E402
|
|
import backend_pb2_grpc # noqa: E402
|
|
|
|
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
|
|
MAX_WORKERS = int(os.environ.get('PYTHON_GRPC_MAX_WORKERS', '1'))
|
|
|
|
# Voice id → system-prompt suffix. The model only ships these four voices.
|
|
VOICE_PROMPTS = {
|
|
"us_male": "Perform TTS. Use the US male voice.",
|
|
"us_female": "Perform TTS. Use the US female voice.",
|
|
"uk_male": "Perform TTS. Use the UK male voice.",
|
|
"uk_female": "Perform TTS. Use the UK female voice.",
|
|
}
|
|
DEFAULT_VOICE = "us_female"
|
|
|
|
# Special-token IDs that LFM2-Audio emits to delimit modality boundaries.
|
|
# Sourced from liquid_audio/model/lfm2_audio.py (see generate_sequential/_sample_*).
|
|
TEXT_END_TOKEN = 130 # <|text_end|>
|
|
AUDIO_START_TOKEN = 128 # <|audio_start|>
|
|
IM_END_TOKEN = 7 # <|im_end|>
|
|
AUDIO_EOS_CODE = 2048 # signals end-of-audio in any codebook position
|
|
|
|
_PATCHED_LOCAL_PATHS = False
|
|
|
|
|
|
def _patch_liquid_audio_local_paths():
|
|
"""Make liquid_audio.utils.get_model_dir() tolerate local directories.
|
|
|
|
Upstream always passes its argument to huggingface_hub.snapshot_download,
|
|
which only accepts `owner/repo` ids. LocalAI's gallery hands us absolute
|
|
paths under <ModelPath>/<owner>/<repo>, so we intercept snapshot_download
|
|
in the liquid_audio.utils namespace and return the directory as-is when
|
|
it already exists on disk. Idempotent.
|
|
"""
|
|
global _PATCHED_LOCAL_PATHS
|
|
if _PATCHED_LOCAL_PATHS:
|
|
return
|
|
import liquid_audio.utils as _la_utils
|
|
_orig_snapshot_download = _la_utils.snapshot_download
|
|
|
|
def _local_first_snapshot_download(repo_id, revision=None, **kwargs):
|
|
if isinstance(repo_id, (str, os.PathLike)) and os.path.isdir(str(repo_id)):
|
|
return str(repo_id)
|
|
return _orig_snapshot_download(repo_id, revision=revision, **kwargs)
|
|
|
|
_la_utils.snapshot_download = _local_first_snapshot_download
|
|
_PATCHED_LOCAL_PATHS = True
|
|
|
|
|
|
def _select_device():
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
return "cuda"
|
|
if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
|
|
return "mps"
|
|
return "cpu"
|
|
|
|
|
|
class ActiveJob:
|
|
"""Tracks an in-flight fine-tune so FineTuneProgress can stream from its queue."""
|
|
|
|
def __init__(self, job_id):
|
|
self.job_id = job_id
|
|
self.progress_queue = queue.Queue()
|
|
self.thread = None
|
|
self.stopped = False
|
|
self.completed = False
|
|
self.error = None
|
|
|
|
|
|
class BackendServicer(backend_pb2_grpc.BackendServicer):
|
|
def __init__(self):
|
|
self.processor = None
|
|
self.model = None
|
|
self.device = "cpu"
|
|
self.dtype = None
|
|
self.options = {}
|
|
self.model_id = None
|
|
self.active_job = None
|
|
|
|
@property
|
|
def mode(self):
|
|
return str(self.options.get("mode", "chat")).lower()
|
|
|
|
@property
|
|
def voice(self):
|
|
v = str(self.options.get("voice", DEFAULT_VOICE)).lower()
|
|
return v if v in VOICE_PROMPTS else DEFAULT_VOICE
|
|
|
|
|
|
def Free(self, request, context):
|
|
# Called by LocalAI when unloading the model. Drop GPU tensors so the
|
|
# next load starts from a clean state instead of bumping into OOM.
|
|
try:
|
|
for attr in ("model", "processor", "tokenizer"):
|
|
if hasattr(self, attr):
|
|
try:
|
|
delattr(self, attr)
|
|
except Exception:
|
|
pass
|
|
import gc
|
|
gc.collect()
|
|
try:
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
torch.cuda.empty_cache()
|
|
except Exception:
|
|
pass
|
|
return backend_pb2.Result(success=True, message="OK")
|
|
except Exception as exc:
|
|
print(f"Free failed: {exc}", file=sys.stderr)
|
|
return backend_pb2.Result(success=False, message=str(exc))
|
|
|
|
|
|
def Health(self, request, context):
|
|
return backend_pb2.Reply(message=bytes("OK", 'utf-8'))
|
|
|
|
|
|
def LoadModel(self, request, context):
|
|
try:
|
|
import torch
|
|
|
|
self.options = parse_options(request.Options)
|
|
if self.options.get("voice") and self.options["voice"] not in VOICE_PROMPTS:
|
|
print(f"Warning: unknown voice '{self.options['voice']}'; defaulting to '{DEFAULT_VOICE}'",
|
|
file=sys.stderr)
|
|
|
|
requested_device = self.options.get("device")
|
|
self.device = requested_device or _select_device()
|
|
if self.device == "cuda" and not torch.cuda.is_available():
|
|
return backend_pb2.Result(success=False, message="CUDA requested but not available")
|
|
if self.device == "mps" and not (hasattr(torch.backends, "mps") and
|
|
torch.backends.mps.is_available()):
|
|
print("MPS not available; falling back to CPU", file=sys.stderr)
|
|
self.device = "cpu"
|
|
|
|
dtype_name = str(self.options.get("dtype", "bfloat16")).lower()
|
|
self.dtype = {
|
|
"bfloat16": torch.bfloat16,
|
|
"bf16": torch.bfloat16,
|
|
"float16": torch.float16,
|
|
"fp16": torch.float16,
|
|
"half": torch.float16,
|
|
"float32": torch.float32,
|
|
"fp32": torch.float32,
|
|
}.get(dtype_name, torch.bfloat16)
|
|
|
|
# request.Model holds the raw `parameters.model` value (an HF
|
|
# repo id like "LiquidAI/LFM2.5-Audio-1.5B"); request.ModelFile
|
|
# is LocalAI's ModelPath-prefixed local copy that exists only
|
|
# when the gallery supplied a `files:` list. Mirror the
|
|
# transformers/vibevoice convention: prefer the repo id and
|
|
# only switch to the local path if it's been staged on disk.
|
|
model_id = request.Model
|
|
if not model_id:
|
|
model_id = request.ModelFile
|
|
if not model_id:
|
|
return backend_pb2.Result(success=False, message="No model identifier provided")
|
|
if request.ModelFile and os.path.isdir(request.ModelFile):
|
|
model_id = request.ModelFile
|
|
self.model_id = model_id
|
|
|
|
# Pure fine-tune jobs don't need an in-memory inference model — the
|
|
# Trainer instantiates its own copy at StartFineTune time.
|
|
if self.mode == "finetune":
|
|
print(f"Loaded liquid-audio backend in fine-tune mode (model id: {model_id})",
|
|
file=sys.stderr)
|
|
return backend_pb2.Result(success=True, message="OK")
|
|
|
|
from liquid_audio import LFM2AudioModel, LFM2AudioProcessor
|
|
|
|
# liquid_audio's from_pretrained unconditionally routes through
|
|
# huggingface_hub.snapshot_download, which rejects local paths
|
|
# (HFValidationError on `/models/LiquidAI/LFM2.5-Audio-1.5B`).
|
|
# When LocalAI's gallery has already staged the weights on disk,
|
|
# short-circuit the download to return the local directory.
|
|
_patch_liquid_audio_local_paths()
|
|
|
|
print(f"Loading liquid-audio model '{model_id}' on {self.device} ({self.dtype})",
|
|
file=sys.stderr)
|
|
self.processor = LFM2AudioProcessor.from_pretrained(model_id, device=self.device).eval()
|
|
self.model = LFM2AudioModel.from_pretrained(
|
|
model_id, device=self.device, dtype=self.dtype
|
|
).eval()
|
|
|
|
print(f"Liquid-audio mode={self.mode}, voice={self.voice}", file=sys.stderr)
|
|
return backend_pb2.Result(success=True, message="OK")
|
|
|
|
except Exception as exc:
|
|
print(f"LoadModel failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
return backend_pb2.Result(success=False, message=str(exc))
|
|
|
|
|
|
def Predict(self, request, context):
|
|
try:
|
|
text = "".join(self._generate_text_stream(request))
|
|
return backend_pb2.Reply(message=text.encode("utf-8"))
|
|
except Exception as exc:
|
|
print(f"Predict failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
context.set_code(grpc.StatusCode.INTERNAL)
|
|
context.set_details(str(exc))
|
|
return backend_pb2.Reply()
|
|
|
|
def PredictStream(self, request, context):
|
|
try:
|
|
for delta in self._generate_text_stream(request):
|
|
yield backend_pb2.Reply(message=delta.encode("utf-8"))
|
|
except Exception as exc:
|
|
print(f"PredictStream failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
context.set_code(grpc.StatusCode.INTERNAL)
|
|
context.set_details(str(exc))
|
|
|
|
|
|
def VAD(self, request, context):
|
|
# Stub voice-activity detector: RMS-energy threshold over 30ms frames at
|
|
# 16 kHz. Good enough for the realtime endpoint's handleVAD loop, which
|
|
# only inspects segment presence + last segment end. The proper signal
|
|
# would come from the model's audio encoder, but that ride-along is a
|
|
# PR-D scope item — until then this keeps the legacy pipeline path
|
|
# working without forcing the operator to install a separate VAD model.
|
|
import numpy as np
|
|
try:
|
|
audio = np.asarray(request.audio, dtype=np.float32)
|
|
if audio.size == 0:
|
|
return backend_pb2.VADResponse(segments=[])
|
|
|
|
sample_rate = 16000
|
|
frame_size = sample_rate * 30 // 1000 # 30ms → 480 samples
|
|
threshold = float(self.options.get("vad_rms_threshold", 0.01))
|
|
min_speech_frames = int(self.options.get("vad_min_speech_frames", 2)) # ≥60ms
|
|
# handleVAD ticks every 300 ms and only inspects segment presence
|
|
# + last segment end relative to silence_threshold (~500 ms). Cap
|
|
# the analysed window to the tail of the buffer so we don't redo
|
|
# the entire growing utterance every tick.
|
|
window_s = float(self.options.get("vad_window_s", 5.0))
|
|
window_samples = int(window_s * sample_rate)
|
|
time_offset_s = 0.0
|
|
if audio.size > window_samples:
|
|
time_offset_s = (audio.size - window_samples) / sample_rate
|
|
audio = audio[-window_samples:]
|
|
|
|
n_frames = audio.size // frame_size
|
|
if n_frames == 0:
|
|
return backend_pb2.VADResponse(segments=[])
|
|
frames = audio[: n_frames * frame_size].reshape(n_frames, frame_size)
|
|
rms = np.sqrt(np.mean(frames ** 2, axis=1))
|
|
speech = rms > threshold
|
|
|
|
def _emit(start_idx, end_idx, out):
|
|
if end_idx - start_idx >= min_speech_frames:
|
|
out.append(backend_pb2.VADSegment(
|
|
start=time_offset_s + start_idx * frame_size / sample_rate,
|
|
end=time_offset_s + end_idx * frame_size / sample_rate,
|
|
))
|
|
|
|
segments = []
|
|
start_idx = None
|
|
for i, is_speech in enumerate(speech):
|
|
if is_speech and start_idx is None:
|
|
start_idx = i
|
|
elif not is_speech and start_idx is not None:
|
|
_emit(start_idx, i, segments)
|
|
start_idx = None
|
|
if start_idx is not None:
|
|
_emit(start_idx, n_frames, segments)
|
|
return backend_pb2.VADResponse(segments=segments)
|
|
except Exception as exc:
|
|
print(f"VAD failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
context.set_code(grpc.StatusCode.INTERNAL)
|
|
context.set_details(str(exc))
|
|
return backend_pb2.VADResponse(segments=[])
|
|
|
|
|
|
def TTS(self, request, context):
|
|
try:
|
|
if self.model is None or self.processor is None:
|
|
return backend_pb2.Result(success=False, message="Model not loaded")
|
|
|
|
import torch
|
|
import torchaudio
|
|
from liquid_audio import ChatState
|
|
|
|
voice = request.voice.lower() if request.voice else self.voice
|
|
voice = voice.removeprefix("lfm2:").removeprefix("lfm:")
|
|
if voice not in VOICE_PROMPTS:
|
|
voice = self.voice
|
|
system_prompt = VOICE_PROMPTS[voice]
|
|
|
|
chat = ChatState(self.processor)
|
|
chat.new_turn("system")
|
|
chat.add_text(system_prompt)
|
|
chat.end_turn()
|
|
chat.new_turn("user")
|
|
chat.add_text(request.text or "")
|
|
chat.end_turn()
|
|
chat.new_turn("assistant")
|
|
|
|
audio_top_k = int(self.options.get("audio_top_k", 64))
|
|
audio_temp = float(self.options.get("audio_temperature", 0.8))
|
|
max_new = int(self.options.get("max_new_tokens", 2048))
|
|
|
|
audio_out = []
|
|
for tok in self.model.generate_sequential(
|
|
**chat,
|
|
max_new_tokens=max_new,
|
|
audio_temperature=audio_temp,
|
|
audio_top_k=audio_top_k,
|
|
):
|
|
if tok.numel() > 1:
|
|
audio_out.append(tok)
|
|
|
|
if len(audio_out) <= 1:
|
|
return backend_pb2.Result(success=False, message="No audio frames generated")
|
|
|
|
# Drop the trailing end-of-audio frame, matching the package's examples.
|
|
audio_codes = torch.stack(audio_out[:-1], 1).unsqueeze(0)
|
|
waveform = self.processor.decode(audio_codes)
|
|
|
|
out_path = request.dst
|
|
if not out_path:
|
|
return backend_pb2.Result(success=False, message="dst path is required")
|
|
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
|
|
# soundfile in preference to torchaudio.save — the latter routes
|
|
# through torchcodec, whose native libs need NVIDIA NPP that we
|
|
# don't bundle in the cuda13 image.
|
|
import soundfile as _sf
|
|
_sf.write(out_path, waveform.cpu().numpy().squeeze(0).T, 24_000)
|
|
|
|
return backend_pb2.Result(success=True)
|
|
except Exception as exc:
|
|
print(f"TTS failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
return backend_pb2.Result(success=False, message=str(exc))
|
|
|
|
|
|
def AudioToAudioStream(self, request_iterator, context):
|
|
"""Bidirectional any-to-any speech-to-speech stream.
|
|
|
|
See `backend.proto` AudioToAudioStream for the wire protocol. Audio
|
|
is decoded once per turn here; chunked detokenization for sub-second
|
|
TTFB is left to a future iteration once the LFM2AudioDetokenizer
|
|
gains a streaming entry point.
|
|
"""
|
|
try:
|
|
yield from self._audio_to_audio_stream(request_iterator, context)
|
|
except Exception as exc:
|
|
print(f"AudioToAudioStream failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
yield backend_pb2.AudioToAudioResponse(
|
|
event="error",
|
|
meta=json.dumps({"message": str(exc)}).encode("utf-8"),
|
|
)
|
|
|
|
def _audio_to_audio_stream(self, request_iterator, context):
|
|
if self.model is None or self.processor is None:
|
|
raise RuntimeError("Model not loaded")
|
|
|
|
import torch
|
|
import torchaudio
|
|
from liquid_audio import ChatState
|
|
|
|
cfg = None
|
|
chat = None
|
|
input_sample_rate = 16000
|
|
output_sample_rate = 24000
|
|
sequence = 0
|
|
|
|
def _new_event(event, **kwargs):
|
|
nonlocal sequence
|
|
sequence += 1
|
|
kwargs.setdefault("sequence", sequence)
|
|
return backend_pb2.AudioToAudioResponse(event=event, **kwargs)
|
|
|
|
def _ensure_chat():
|
|
"""Build a fresh ChatState seeded with the system prompt."""
|
|
nonlocal chat
|
|
chat = ChatState(self.processor)
|
|
system_prompt = (cfg.system_prompt if cfg and cfg.system_prompt
|
|
else "Respond with interleaved text and audio.")
|
|
chat.new_turn("system")
|
|
chat.add_text(system_prompt)
|
|
chat.end_turn()
|
|
|
|
# Buffers for the in-flight user turn
|
|
pcm_buffer = bytearray()
|
|
|
|
def _consume_user_turn():
|
|
nonlocal pcm_buffer
|
|
if not pcm_buffer:
|
|
return
|
|
# Avoid the bytes(pcm_buffer) copy and let the float widen happen
|
|
# in-place: numpy view → torch view → in-place divide.
|
|
import numpy as np
|
|
arr = np.frombuffer(memoryview(pcm_buffer), dtype=np.int16)
|
|
wav = torch.from_numpy(arr).to(torch.float32).div_(32768.0).unsqueeze(0)
|
|
chat.new_turn("user")
|
|
chat.add_audio(wav, input_sample_rate)
|
|
chat.end_turn()
|
|
pcm_buffer = bytearray()
|
|
|
|
def _run_generation():
|
|
"""Run generate_interleaved; yield response events as we go."""
|
|
chat.new_turn("assistant")
|
|
audio_top_k = int(self.options.get("audio_top_k", 4))
|
|
audio_temp = float(self.options.get("audio_temperature", 1.0))
|
|
text_top_k = int(self.options.get("text_top_k", 0)) or None
|
|
text_temp = float(self.options.get("text_temperature", 0)) or None
|
|
max_new = int(self.options.get("max_new_tokens", 512))
|
|
|
|
audio_tokens = []
|
|
for tok in self.model.generate_interleaved(
|
|
**chat,
|
|
max_new_tokens=max_new,
|
|
text_temperature=text_temp,
|
|
text_top_k=text_top_k,
|
|
audio_temperature=audio_temp,
|
|
audio_top_k=audio_top_k,
|
|
):
|
|
if tok.numel() == 1:
|
|
if tok.item() == IM_END_TOKEN:
|
|
break
|
|
text = self.processor.text.decode(tok)
|
|
if not text:
|
|
continue
|
|
yield _new_event(
|
|
"response.audio_transcript.delta",
|
|
meta=json.dumps({"delta": text}).encode("utf-8"),
|
|
)
|
|
else:
|
|
audio_tokens.append(tok)
|
|
|
|
# Detokenize the accumulated audio at end-of-turn — the
|
|
# LFM2AudioDetokenizer is non-streaming today.
|
|
if len(audio_tokens) > 1:
|
|
audio_codes = torch.stack(audio_tokens[:-1], 1).unsqueeze(0)
|
|
waveform = self.processor.decode(audio_codes)
|
|
# Convert to s16le PCM bytes at output_sample_rate
|
|
if output_sample_rate != 24000:
|
|
waveform = torchaudio.functional.resample(
|
|
waveform.cpu(), 24000, output_sample_rate
|
|
)
|
|
pcm = (waveform.cpu().squeeze(0).clamp(-1, 1) * 32767.0).to(
|
|
torch.int16
|
|
).numpy().tobytes()
|
|
yield _new_event(
|
|
"response.audio.delta",
|
|
pcm=pcm,
|
|
sample_rate=output_sample_rate,
|
|
)
|
|
|
|
yield _new_event("response.done", meta=b"{}")
|
|
|
|
for req in request_iterator:
|
|
if not context.is_active():
|
|
return
|
|
payload = req.WhichOneof("payload")
|
|
if payload == "config":
|
|
cfg = req.config
|
|
if cfg.input_sample_rate > 0:
|
|
input_sample_rate = cfg.input_sample_rate
|
|
if cfg.output_sample_rate > 0:
|
|
output_sample_rate = cfg.output_sample_rate
|
|
# The first config implicitly resets state.
|
|
_ensure_chat()
|
|
pcm_buffer = bytearray()
|
|
elif payload == "frame":
|
|
if chat is None:
|
|
_ensure_chat()
|
|
if req.frame.pcm:
|
|
pcm_buffer.extend(req.frame.pcm)
|
|
if req.frame.end_of_input:
|
|
_consume_user_turn()
|
|
yield from _run_generation()
|
|
elif payload == "control":
|
|
event = req.control.event
|
|
if event == "input_audio_buffer.commit":
|
|
_consume_user_turn()
|
|
yield from _run_generation()
|
|
elif event == "response.cancel":
|
|
# Synchronous generation here means cancel can only
|
|
# take effect between turns; we ack so the client unblocks.
|
|
yield _new_event("response.done", meta=b'{"cancelled":true}')
|
|
elif event == "session.update":
|
|
# Free-form session re-config; treat as a soft reset.
|
|
_ensure_chat()
|
|
pcm_buffer = bytearray()
|
|
# Unknown events are ignored — forward-compatible.
|
|
|
|
|
|
def AudioTranscription(self, request, context):
|
|
try:
|
|
if self.model is None or self.processor is None:
|
|
return backend_pb2.TranscriptResult(segments=[], text="")
|
|
|
|
import torchaudio
|
|
from liquid_audio import ChatState
|
|
|
|
audio_path = request.dst
|
|
if not audio_path:
|
|
return backend_pb2.TranscriptResult(segments=[], text="")
|
|
|
|
chat = ChatState(self.processor)
|
|
chat.new_turn("system")
|
|
chat.add_text("Perform ASR.")
|
|
chat.end_turn()
|
|
chat.new_turn("user")
|
|
# soundfile in preference to torchaudio.load — the latter routes
|
|
# through torchcodec which needs NVIDIA NPP libs we don't bundle.
|
|
import soundfile as _sf
|
|
import torch
|
|
audio_np, sr = _sf.read(audio_path, dtype="float32", always_2d=True)
|
|
wav = torch.from_numpy(audio_np.T) # (channels, samples)
|
|
if wav.shape[0] > 1:
|
|
# Down-mix to mono — the processor expects a single channel
|
|
wav = wav.mean(dim=0, keepdim=True)
|
|
chat.add_audio(wav, sr)
|
|
chat.end_turn()
|
|
chat.new_turn("assistant")
|
|
|
|
max_new = int(self.options.get("max_new_tokens", 1024))
|
|
|
|
pieces = []
|
|
for tok in self.model.generate_sequential(**chat, max_new_tokens=max_new):
|
|
if tok.numel() == 1:
|
|
if tok.item() == IM_END_TOKEN:
|
|
break
|
|
pieces.append(self.processor.text.decode(tok))
|
|
|
|
text = "".join(pieces).strip()
|
|
duration_ms = int((wav.shape[1] / sr) * 1000)
|
|
segment = backend_pb2.TranscriptSegment(
|
|
id=0, start=0, end=duration_ms, text=text, tokens=[],
|
|
)
|
|
return backend_pb2.TranscriptResult(segments=[segment], text=text)
|
|
except Exception as exc:
|
|
print(f"AudioTranscription failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
return backend_pb2.TranscriptResult(segments=[], text="")
|
|
|
|
|
|
def StartFineTune(self, request, context):
|
|
if self.active_job is not None and not self.active_job.completed:
|
|
return backend_pb2.FineTuneJobResult(
|
|
job_id="", success=False,
|
|
message="A fine-tuning job is already running",
|
|
)
|
|
|
|
job_id = request.job_id or str(uuid.uuid4())
|
|
job = ActiveJob(job_id)
|
|
self.active_job = job
|
|
|
|
thread = threading.Thread(target=self._run_training, args=(request, job), daemon=True)
|
|
job.thread = thread
|
|
thread.start()
|
|
|
|
return backend_pb2.FineTuneJobResult(
|
|
job_id=job_id, success=True, message="Training started",
|
|
)
|
|
|
|
def FineTuneProgress(self, request, context):
|
|
if self.active_job is None or self.active_job.job_id != request.job_id:
|
|
context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
context.set_details(f"Job {request.job_id} not found")
|
|
return
|
|
|
|
job = self.active_job
|
|
while True:
|
|
try:
|
|
update = job.progress_queue.get(timeout=1.0)
|
|
except queue.Empty:
|
|
if job.completed or job.stopped:
|
|
break
|
|
if not context.is_active():
|
|
break
|
|
continue
|
|
if update is None:
|
|
break
|
|
yield update
|
|
if update.status in ("completed", "failed", "stopped"):
|
|
break
|
|
|
|
def StopFineTune(self, request, context):
|
|
# We can't kill the Accelerate training loop mid-step cleanly from here;
|
|
# LocalAI's job manager kills the backend process on stop. The flag below
|
|
# at least lets the progress stream terminate quickly.
|
|
if self.active_job is not None and self.active_job.job_id == request.job_id:
|
|
self.active_job.stopped = True
|
|
self.active_job.progress_queue.put(None)
|
|
return backend_pb2.Result(success=True, message="OK")
|
|
|
|
def _run_training(self, request, job):
|
|
try:
|
|
self._do_train(request, job)
|
|
job.completed = True
|
|
job.progress_queue.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id, status="completed", message="Training completed",
|
|
progress_percent=100.0,
|
|
))
|
|
except Exception as exc:
|
|
job.error = str(exc)
|
|
job.completed = True
|
|
print(f"Training failed: {exc}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
job.progress_queue.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id, status="failed", message=str(exc),
|
|
))
|
|
finally:
|
|
job.progress_queue.put(None)
|
|
|
|
def _do_train(self, request, job):
|
|
from liquid_audio import LFM2AudioModel # noqa: F401 (sanity import)
|
|
from liquid_audio.data.dataloader import LFM2DataLoader
|
|
from liquid_audio.trainer import Trainer
|
|
|
|
model_id = request.model or self.model_id or "LiquidAI/LFM2.5-Audio-1.5B"
|
|
|
|
dataset_path = request.dataset_source
|
|
if not dataset_path:
|
|
raise ValueError("dataset_source is required (path to a preprocessed dataset)")
|
|
|
|
extras = dict(request.extra_options) if request.extra_options else {}
|
|
val_path = extras.get("val_dataset")
|
|
|
|
# Map FineTuneRequest hyperparameters to liquid_audio.Trainer constructor args
|
|
lr = request.learning_rate or 3e-5
|
|
max_steps = request.max_steps or 1000
|
|
warmup_steps = request.warmup_steps or min(100, max_steps // 10)
|
|
batch_size = request.batch_size or 16
|
|
save_interval = request.save_steps or max(1, max_steps // 4)
|
|
|
|
output_dir = request.output_dir or os.path.join(
|
|
os.environ.get("LIQUID_AUDIO_OUTPUT_DIR", "/tmp"),
|
|
f"liquid-audio-{job.job_id}",
|
|
)
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
job.progress_queue.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id, status="loading_dataset",
|
|
message=f"Loading preprocessed dataset from {dataset_path}",
|
|
))
|
|
train_data = LFM2DataLoader(dataset_path)
|
|
val_data = LFM2DataLoader(val_path) if val_path else None
|
|
|
|
job.progress_queue.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id, status="loading_model",
|
|
message=f"Loading base model {model_id}",
|
|
))
|
|
|
|
# The Liquid Trainer logs via self.accelerator.print; we subclass it to
|
|
# also push progress events onto the queue every logging_interval steps.
|
|
progress_q = job.progress_queue
|
|
|
|
class QueuedTrainer(Trainer):
|
|
def log(self_, model_output):
|
|
if self_.step > 0 and self_.step % self_.logging_interval == 0:
|
|
try:
|
|
loss = self_.accelerator.reduce(
|
|
model_output.loss.detach(), reduction="mean"
|
|
).item()
|
|
except Exception:
|
|
loss = float("nan")
|
|
lr_now = self_.optimizer.param_groups[0]["lr"]
|
|
pct = (self_.step / self_.max_steps * 100.0) if self_.max_steps else 0.0
|
|
progress_q.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id,
|
|
current_step=int(self_.step),
|
|
total_steps=int(self_.max_steps),
|
|
current_epoch=float(self_.epoch),
|
|
loss=float(loss),
|
|
learning_rate=float(lr_now),
|
|
progress_percent=float(pct),
|
|
status="training",
|
|
))
|
|
# Honour stop requests: raising here terminates the loop cleanly
|
|
if job.stopped:
|
|
raise KeyboardInterrupt("stop requested")
|
|
return super().log(model_output)
|
|
|
|
def validate(self_):
|
|
progress_q.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id, current_step=int(self_.step),
|
|
total_steps=int(self_.max_steps), status="training",
|
|
message=f"Running validation at step {self_.step}",
|
|
))
|
|
return super().validate()
|
|
|
|
trainer = QueuedTrainer(
|
|
model_id=model_id,
|
|
train_data=train_data,
|
|
val_data=val_data,
|
|
lr=lr,
|
|
max_steps=max_steps,
|
|
warmup_steps=warmup_steps,
|
|
batch_size=batch_size,
|
|
save_interval=save_interval,
|
|
output_dir=output_dir,
|
|
weight_decay=request.weight_decay or 0.1,
|
|
)
|
|
|
|
job.progress_queue.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id, status="training", message="Training started",
|
|
total_steps=int(max_steps),
|
|
))
|
|
trainer.train()
|
|
|
|
job.progress_queue.put(backend_pb2.FineTuneProgressUpdate(
|
|
job_id=job.job_id, status="saving",
|
|
message=f"Saved final model to {output_dir}",
|
|
checkpoint_path=os.path.join(output_dir, "final"),
|
|
))
|
|
|
|
|
|
def _build_chat_state(self, messages, user_prompt, tools_prelude=None):
|
|
"""Build a ChatState from a list of (role, content) tuples plus an optional final user turn.
|
|
|
|
tools_prelude, when non-empty, is prepended as an extra system turn carrying
|
|
the LFM2 tool-list block — mirrors gallery/lfm.yaml's `function:` template
|
|
so the model sees the same prompt shape whether served via llama-cpp or here.
|
|
"""
|
|
from liquid_audio import ChatState
|
|
chat = ChatState(self.processor)
|
|
if tools_prelude:
|
|
chat.new_turn("system")
|
|
chat.add_text(tools_prelude)
|
|
chat.end_turn()
|
|
for role, content in messages:
|
|
chat.new_turn(role)
|
|
chat.add_text(content)
|
|
chat.end_turn()
|
|
if user_prompt:
|
|
chat.new_turn("user")
|
|
chat.add_text(user_prompt)
|
|
chat.end_turn()
|
|
chat.new_turn("assistant")
|
|
return chat
|
|
|
|
def _collect_messages(self, request):
|
|
"""Translate PredictOptions.Messages into (role, content) tuples."""
|
|
out = []
|
|
for m in request.Messages:
|
|
role = (m.role or "user").lower()
|
|
if role not in ("system", "user", "assistant"):
|
|
role = "user"
|
|
out.append((role, m.content or ""))
|
|
return out
|
|
|
|
def _render_tools_prelude(self, request):
|
|
"""Build the LFM2 `<|tool_list_start|>…<|tool_list_end|>` system prelude
|
|
from request.Tools (OpenAI Chat-Completions tool JSON). Returns "" when
|
|
no tools are attached. Output mirrors gallery/lfm.yaml's `function:`
|
|
template so the model sees the same prompt whether routed via llama-cpp
|
|
or this backend."""
|
|
tools_raw = getattr(request, "Tools", "") or ""
|
|
if not tools_raw:
|
|
return ""
|
|
try:
|
|
tools = json.loads(tools_raw)
|
|
except json.JSONDecodeError:
|
|
print(f"liquid-audio: ignoring malformed Tools JSON: {tools_raw[:200]!r}",
|
|
file=sys.stderr)
|
|
return ""
|
|
if not isinstance(tools, list) or not tools:
|
|
return ""
|
|
# The LFM2 chat template uses single-quoted Python-dict-ish syntax in
|
|
# examples, but the tokenizer treats this whole block as opaque text;
|
|
# JSON works fine and is what other backends emit.
|
|
return (
|
|
"You are a function calling AI model. You are provided with functions to "
|
|
"execute. You may call one or more functions to assist with the user query. "
|
|
"Don't make assumptions about what values to plug into functions.\n"
|
|
"List of tools: <|tool_list_start|>"
|
|
+ json.dumps(tools, separators=(",", ":"))
|
|
+ "<|tool_list_end|>"
|
|
)
|
|
|
|
def _generate_text_stream(self, request):
|
|
"""Yield text-only deltas from generate_sequential. Caller joins for unary Predict."""
|
|
if self.model is None or self.processor is None:
|
|
raise RuntimeError("Model not loaded")
|
|
messages = self._collect_messages(request)
|
|
user_prompt = request.Prompt or None
|
|
tools_prelude = self._render_tools_prelude(request)
|
|
# If the request already carries Messages, Prompt is the templated form
|
|
# of the same content — don't append a duplicate user turn.
|
|
chat = self._build_chat_state(
|
|
messages,
|
|
user_prompt if not messages else None,
|
|
tools_prelude=tools_prelude,
|
|
)
|
|
|
|
max_new = request.Tokens if request.Tokens > 0 else int(self.options.get("max_new_tokens", 512))
|
|
temperature = request.Temperature if request.Temperature > 0 else None
|
|
top_k = request.TopK if request.TopK > 0 else None
|
|
|
|
for tok in self.model.generate_sequential(
|
|
**chat,
|
|
max_new_tokens=max_new,
|
|
text_temperature=temperature,
|
|
text_top_k=top_k,
|
|
):
|
|
if tok.numel() == 1:
|
|
if tok.item() == IM_END_TOKEN:
|
|
break
|
|
yield self.processor.text.decode(tok)
|
|
|
|
|
|
def serve(address):
|
|
server = grpc.server(
|
|
futures.ThreadPoolExecutor(max_workers=MAX_WORKERS),
|
|
options=[
|
|
('grpc.max_message_length', 50 * 1024 * 1024),
|
|
('grpc.max_send_message_length', 50 * 1024 * 1024),
|
|
('grpc.max_receive_message_length', 50 * 1024 * 1024),
|
|
],
|
|
interceptors=get_auth_interceptors(),
|
|
)
|
|
backend_pb2_grpc.add_BackendServicer_to_server(BackendServicer(), server)
|
|
server.add_insecure_port(address)
|
|
server.start()
|
|
print(f"Liquid-audio backend listening on {address}", file=sys.stderr, flush=True)
|
|
|
|
def stop(_signum, _frame):
|
|
server.stop(0)
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, stop)
|
|
signal.signal(signal.SIGINT, stop)
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(_ONE_DAY_IN_SECONDS)
|
|
except KeyboardInterrupt:
|
|
server.stop(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Liquid Audio gRPC backend")
|
|
parser.add_argument("--addr", default="localhost:50051", help="gRPC server address")
|
|
args = parser.parse_args()
|
|
serve(args.addr)
|