Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
44513a7067 chore(deps): bump mxschmitt/action-tmate from 3.23 to 3.24
Bumps [mxschmitt/action-tmate](https://github.com/mxschmitt/action-tmate) from 3.23 to 3.24.
- [Release notes](https://github.com/mxschmitt/action-tmate/releases)
- [Changelog](https://github.com/mxschmitt/action-tmate/blob/master/RELEASE.md)
- [Commits](https://github.com/mxschmitt/action-tmate/compare/v3.23...v3.24)

---
updated-dependencies:
- dependency-name: mxschmitt/action-tmate
  dependency-version: '3.24'
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-03 03:23:19 +00:00
143 changed files with 678 additions and 6064 deletions

View File

@@ -1766,6 +1766,20 @@ include:
dockerfile: "./backend/Dockerfile.llama-cpp"
context: "./"
ubuntu-version: '2404'
- build-type: 'hipblas'
cuda-major-version: ""
cuda-minor-version: ""
platforms: 'linux/amd64'
tag-latest: 'auto'
tag-suffix: '-gpu-rocm-hipblas-turboquant'
builder-base-image: 'quay.io/go-skynet/ci-cache:base-grpc-rocm-amd64'
runs-on: 'ubuntu-latest'
base-image: "rocm/dev-ubuntu-24.04:7.2.1"
skip-drivers: 'false'
backend: "turboquant"
dockerfile: "./backend/Dockerfile.turboquant"
context: "./"
ubuntu-version: '2404'
- build-type: 'hipblas'
cuda-major-version: ""
cuda-minor-version: ""

View File

@@ -3,7 +3,6 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
@@ -114,17 +113,6 @@ func main() {
fmt.Println("Searching for trending models on HuggingFace...")
rawModels, err := client.GetTrending(searchTerm, limit)
if err != nil {
if errors.Is(err, hfapi.ErrRateLimited) {
fmt.Printf("HuggingFace API is rate limited after retries, skipping this run: %v\n", err)
writeSummary(AddedModelSummary{
SearchTerm: searchTerm,
TotalFound: 0,
ModelsAdded: 0,
Quantization: quantization,
ProcessingTime: time.Since(startTime).String(),
})
return
}
fmt.Fprintf(os.Stderr, "Error fetching models: %v\n", err)
os.Exit(1)
}
@@ -289,3 +277,4 @@ func truncateString(s string, maxLen int) string {
}
return s[:maxLen] + "..."
}

View File

@@ -18,7 +18,7 @@ jobs:
if: ${{ github.actor != 'dependabot[bot]' }}
- name: Run Gosec Security Scanner
if: ${{ github.actor != 'dependabot[bot]' }}
uses: securego/gosec@v2.27.1
uses: securego/gosec@v2.22.9
with:
# we let the report trigger content trigger a failure using the GitHub Security features.
args: '-no-fail -fmt sarif -out results.sarif ./...'

View File

@@ -71,7 +71,7 @@ jobs:
if-no-files-found: ignore
- name: Setup tmate session if tests fail
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3.23
uses: mxschmitt/action-tmate@v3.24
with:
detached: true
connect-timeout-seconds: 180
@@ -116,7 +116,7 @@ jobs:
PATH="$PATH:$HOME/go/bin" BUILD_TYPE="GITHUB_CI_HAS_BROKEN_METAL" CMAKE_ARGS="-DGGML_F16C=OFF -DGGML_AVX512=OFF -DGGML_AVX2=OFF -DGGML_FMA=OFF" make --jobs 4 --output-sync=target test
- name: Setup tmate session if tests fail
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3.23
uses: mxschmitt/action-tmate@v3.24
with:
detached: true
connect-timeout-seconds: 180

View File

@@ -79,7 +79,7 @@ jobs:
PATH="$PATH:$HOME/go/bin" make backends/local-store backends/silero-vad backends/llama-cpp backends/whisper backends/piper backends/stablediffusion-ggml docker-build-e2e e2e-aio
- name: Setup tmate session if tests fail
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3.23
uses: mxschmitt/action-tmate@v3.24
with:
detached: true
connect-timeout-seconds: 180

View File

@@ -57,7 +57,7 @@ jobs:
PATH="$PATH:$HOME/go/bin" make build-mock-backend test-e2e
- name: Setup tmate session if tests fail
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3.23
uses: mxschmitt/action-tmate@v3.24
with:
detached: true
connect-timeout-seconds: 180

View File

@@ -75,7 +75,7 @@ jobs:
retention-days: 7
- name: Setup tmate session if tests fail
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3.23
uses: mxschmitt/action-tmate@v3.24
with:
detached: true
connect-timeout-seconds: 180

View File

@@ -309,20 +309,13 @@ run-e2e-aio: protogen-go
@echo 'Running e2e AIO tests'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --flake-attempts $(TEST_FLAKES) -v -r ./tests/e2e-aio
# Distributed architecture e2e (PostgreSQL + NATS via testcontainers).
# Includes NatsJWT specs (JWT-enabled NATS). Requires Docker.
# VLLMMultinode is excluded here; use test-e2e-vllm-multinode for that.
test-e2e-distributed: protogen-go
@echo 'Running distributed e2e tests (label Distributed, incl. NatsJWT)'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --label-filter='Distributed && !VLLMMultinode' --flake-attempts $(TEST_FLAKES) -v -r ./tests/e2e/distributed
# vLLM multi-node DP smoke (CPU). Builds local-ai:tests and the
# cpu-vllm backend from the current working tree, then drives a
# head + headless follower via testcontainers-go and asserts a chat
# completion. BuildKit caches both images, so re-runs only rebuild
# what changed. The test lives under tests/e2e/distributed and is
# selected by the VLLMMultinode label so it doesn't run alongside
# test-e2e-distributed.
# the other distributed-suite tests by default.
test-e2e-vllm-multinode: docker-build-e2e extract-backend-vllm protogen-go
@echo 'Running e2e vLLM multi-node DP test'
LOCALAI_IMAGE=local-ai \

View File

@@ -537,15 +537,6 @@ message TTSRequest {
string dst = 3;
string voice = 4;
optional string language = 5;
// instructions is a free-form, per-request style/voice description (maps to
// the OpenAI `instructions` field). Backends that support expressive synthesis
// (e.g. Qwen3-TTS CustomVoice/VoiceDesign) prefer this over the static YAML
// option when set; backends that don't simply ignore it.
optional string instructions = 6;
// params carries optional, backend-specific per-request generation parameters
// (e.g. Chatterbox exaggeration/cfg_weight/temperature). Values are strings and
// coerced by the backend; unset leaves the backend's configured defaults.
map<string, string> params = 7;
}
message VADRequest {

View File

@@ -60,12 +60,10 @@ elseif(DS4_GPU STREQUAL "cpu")
set(DS4_OBJS "${DS4_DIR}/ds4_cpu.o")
endif()
# ds4.c now references ds4_distributed.c (distributed inference) and ds4_ssd.c
# (SSD expert-cache), each split into its own translation unit upstream. Both
# are GPU-agnostic objects shared by every GPU mode, so link them in regardless
# of DS4_GPU.
# ds4.c now references ds4_distributed.c (distributed inference was split into
# its own translation unit upstream). It is a single GPU-agnostic object shared
# by every GPU mode, so link it in regardless of DS4_GPU.
list(APPEND DS4_OBJS "${DS4_DIR}/ds4_distributed.o")
list(APPEND DS4_OBJS "${DS4_DIR}/ds4_ssd.o")
add_executable(${TARGET}
grpc-server.cpp

View File

@@ -1,10 +1,10 @@
# ds4 backend Makefile.
#
# Upstream pin lives below as DS4_VERSION?=c463029c205c2ec8d7ab6c0df4a3f52979091286
# Upstream pin lives below as DS4_VERSION?=ba00a8a88c4c5810a3d1fed6b7b8fa2b44b82fdc
# (.github/bump_deps.sh) can find and update it - matches the
# llama-cpp / ik-llama-cpp / turboquant convention.
DS4_VERSION?=c463029c205c2ec8d7ab6c0df4a3f52979091286
DS4_VERSION?=ba00a8a88c4c5810a3d1fed6b7b8fa2b44b82fdc
DS4_REPO?=https://github.com/antirez/ds4
CURRENT_MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
@@ -18,20 +18,19 @@ UNAME_S := $(shell uname -s)
CMAKE_ARGS ?= -DCMAKE_BUILD_TYPE=Release
# ds4_distributed.o and ds4_ssd.o are GPU-agnostic translation units that
# ds4.c/ds4_cpu.o now reference (upstream split distributed inference and the
# SSD expert-cache into their own .c files). Both objects are shared by every
# GPU mode, so they are appended unconditionally below.
# ds4_distributed.o is a GPU-agnostic translation unit that ds4.c/ds4_cpu.o now
# reference (upstream split distributed inference into its own .c). The same
# object is shared by every GPU mode, so it is appended unconditionally below.
ifeq ($(BUILD_TYPE),cublas)
CMAKE_ARGS += -DDS4_GPU=cuda
DS4_OBJ_TARGET := ds4.o ds4_cuda.o ds4_distributed.o ds4_ssd.o
DS4_OBJ_TARGET := ds4.o ds4_cuda.o ds4_distributed.o
else ifeq ($(UNAME_S),Darwin)
CMAKE_ARGS += -DDS4_GPU=metal
DS4_OBJ_TARGET := ds4.o ds4_metal.o ds4_distributed.o ds4_ssd.o
DS4_OBJ_TARGET := ds4.o ds4_metal.o ds4_distributed.o
else
# CPU reference path (Linux only - macOS CPU path is broken by VM bug per ds4 README).
CMAKE_ARGS += -DDS4_GPU=cpu
DS4_OBJ_TARGET := ds4_cpu.o ds4_distributed.o ds4_ssd.o
DS4_OBJ_TARGET := ds4_cpu.o ds4_distributed.o
endif
ifneq ($(NATIVE),true)
@@ -56,11 +55,11 @@ ds4:
# the right per-platform compile flags (Objective-C/Metal on Darwin, nvcc on Linux+CUDA).
ds4/ds4.o: ds4
ifeq ($(BUILD_TYPE),cublas)
+$(MAKE) -C ds4 ds4.o ds4_cuda.o ds4_distributed.o ds4_ssd.o
+$(MAKE) -C ds4 ds4.o ds4_cuda.o ds4_distributed.o
else ifeq ($(UNAME_S),Darwin)
+$(MAKE) -C ds4 ds4.o ds4_metal.o ds4_distributed.o ds4_ssd.o
+$(MAKE) -C ds4 ds4.o ds4_metal.o ds4_distributed.o
else
+$(MAKE) -C ds4 ds4_cpu.o ds4_distributed.o ds4_ssd.o
+$(MAKE) -C ds4 ds4_cpu.o ds4_distributed.o
endif
grpc-server: ds4/ds4.o

View File

@@ -1,5 +1,5 @@
IK_LLAMA_VERSION?=6b9de3dbaa21ae95ea80638e5ee836795cc48c93
IK_LLAMA_VERSION?=3f40e73c367ad9f0c1b1819f28c7348c26aa340d
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
CMAKE_ARGS?=

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=28ca1e600c5dac1854fb7e09611914013430b037
LLAMA_VERSION?=5dcb71166686799f0d873eab7386234302d05ecf
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -381,15 +381,6 @@ json parse_options(bool streaming, const backend::PredictOptions* predict, const
});
}
// for each video in the request, add the video data
for (int i = 0; i < predict->videos_size(); i++) {
data["video_data"].push_back(json
{
{"id", i},
{"data", predict->videos(i)},
});
}
data["stop"] = predict->stopprompts();
// data["n_probs"] = predict->nprobs();
//TODO: images,
@@ -491,13 +482,23 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
if (!request->draftmodel().empty()) {
params.speculative.draft.mparams.path = request->draftmodel();
// Default to draft type if a draft model is set but no explicit type.
// Upstream made the speculative type a vector (ggml-org/llama.cpp#22838)
// and renamed COMMON_SPECULATIVE_TYPE_DRAFT -> ..._DRAFT_SIMPLE (#22964).
// Upstream (post ggml-org/llama.cpp#22838) made the speculative type a
// vector; the turboquant fork still uses the legacy scalar. The
// LOCALAI_LEGACY_LLAMA_CPP_SPEC macro is injected by
// backend/cpp/turboquant/patch-grpc-server.sh for fork builds only.
// Upstream renamed COMMON_SPECULATIVE_TYPE_DRAFT -> ..._DRAFT_SIMPLE
// in ggml-org/llama.cpp#22964; the fork still uses the old name.
#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC
if (params.speculative.type == COMMON_SPECULATIVE_TYPE_NONE) {
params.speculative.type = COMMON_SPECULATIVE_TYPE_DRAFT;
}
#else
const bool no_spec_type = params.speculative.types.empty() ||
(params.speculative.types.size() == 1 && params.speculative.types[0] == COMMON_SPECULATIVE_TYPE_NONE);
if (no_spec_type) {
params.speculative.types = { COMMON_SPECULATIVE_TYPE_DRAFT_SIMPLE };
}
#endif
}
// params.model_alias ??
@@ -573,10 +574,9 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
// tokens (0 disables the minimum). Match upstream's default (256). This
// field was renamed from `checkpoint_every_nt` in llama.cpp; the semantics
// also shifted from a fixed cadence to a minimum spacing. The turboquant
// fork still lacks common_params::checkpoint_min_step, so skip it there
// (LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP is injected by
// backend/cpp/turboquant/patch-grpc-server.sh).
#ifndef LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP
// fork branched before the field existed, so skip it on the legacy path
// (LOCALAI_LEGACY_LLAMA_CPP_SPEC is injected by patch-grpc-server.sh).
#ifndef LOCALAI_LEGACY_LLAMA_CPP_SPEC
params.checkpoint_min_step = 256;
#endif
@@ -752,7 +752,7 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
params.cache_idle_slots = false;
}
#ifndef LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP
#ifndef LOCALAI_LEGACY_LLAMA_CPP_SPEC
// --- minimum context-checkpoint spacing (upstream -cms / --checkpoint-min-step) ---
// 0 disables the minimum-spacing gate. Old option names (`checkpoint_every_nt`,
// `checkpoint_every_n_tokens`) are kept as aliases for backward compatibility
@@ -906,6 +906,17 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
// Speculative decoding options
} else if (!strcmp(optname, "spec_type") || !strcmp(optname, "speculative_type")) {
#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC
// Fork only knows a single scalar `type`. Take the first comma-
// separated value and assign it via the singular helper.
std::string first = optval_str;
const auto comma = first.find(',');
if (comma != std::string::npos) first = first.substr(0, comma);
auto type = common_speculative_type_from_name(first);
if (type != COMMON_SPECULATIVE_TYPE_COUNT) {
params.speculative.type = type;
}
#else
// Upstream switched to a vector of types (comma-separated for multi-type
// chaining via common_speculative_types_from_names). We keep accepting a
// single value here, but also tolerate comma-separated lists.
@@ -934,6 +945,7 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
if (!parsed.empty()) {
params.speculative.types = parsed;
}
#endif
} else if (!strcmp(optname, "spec_n_max") || !strcmp(optname, "draft_max")) {
if (optval != NULL) {
try { params.speculative.draft.n_max = std::stoi(optval_str); } catch (...) {}
@@ -971,6 +983,21 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
// shares the target context size. Accept the option for backward
// compatibility but silently ignore it.
// Everything below relies on struct shape introduced in ggml-org/llama.cpp#22838
// (parallel drafting): `ngram_mod`, `ngram_map_k`, `ngram_map_k4v`,
// `ngram_cache`, and the `draft.{cache_type_*, cpuparams*, tensor_buft_overrides}`
// fields. The turboquant fork branched before that, so its build defines
// LOCALAI_LEGACY_LLAMA_CPP_SPEC via patch-grpc-server.sh and these option
// keys become unrecognized (silently dropped, like any unknown opt) for it.
//
// The `#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC` / `#else` split below sits at the
// closing-brace position of the `draft_ctx_size` branch on purpose: in the
// legacy build the chain ends here (the brace closes draft_ctx_size), and in
// the modern build the chain continues with `} else if (...)` instead, so the
// brace count stays balanced under both branches of the preprocessor.
#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC
}
#else
// --- ngram_mod family (upstream --spec-ngram-mod-*) ---
} else if (!strcmp(optname, "spec_ngram_mod_n_min")) {
if (optval != NULL) {
@@ -1100,6 +1127,7 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
}
if (!cur.empty()) flush(cur);
}
#endif // LOCALAI_LEGACY_LLAMA_CPP_SPEC — closes the `else`/`#ifdef` opened at draft_ctx_size
}
// Set params.n_parallel from environment variable if not set via options (fallback)
@@ -1149,11 +1177,15 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
params.tensor_buft_overrides.push_back({nullptr, nullptr});
}
}
// Terminate the draft tensor_buft_overrides list with a sentinel, mirroring
// the main-model handling above.
// The draft tensor_buft_overrides are only populated under the modern
// (post-#22838) layout, whose population code is itself gated by
// LOCALAI_LEGACY_LLAMA_CPP_SPEC above. The turboquant fork lacks
// common_params_speculative::draft entirely, so skip the sentinel there too.
#ifndef LOCALAI_LEGACY_LLAMA_CPP_SPEC
if (!params.speculative.draft.tensor_buft_overrides.empty()) {
params.speculative.draft.tensor_buft_overrides.push_back({nullptr, nullptr});
}
#endif
// TODO: Add yarn
@@ -1512,7 +1544,7 @@ public:
msg_json["role"] = msg.role();
bool is_last_user_msg = (i == last_user_msg_idx);
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0 || request->videos_size() > 0);
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0);
// Handle content - can be string, null, or array
// For multimodal content, we'll embed images/audio from separate fields
@@ -1563,16 +1595,6 @@ public:
content_array.push_back(audio_chunk);
}
}
if (request->videos_size() > 0) {
for (int j = 0; j < request->videos_size(); j++) {
json video_chunk;
video_chunk["type"] = "input_video";
json input_video;
input_video["data"] = request->videos(j);
video_chunk["input_video"] = input_video;
content_array.push_back(video_chunk);
}
}
msg_json["content"] = content_array;
} else {
// Use content as-is (already array or not last user message)
@@ -1607,16 +1629,6 @@ public:
content_array.push_back(audio_chunk);
}
}
if (request->videos_size() > 0) {
for (int j = 0; j < request->videos_size(); j++) {
json video_chunk;
video_chunk["type"] = "input_video";
json input_video;
input_video["data"] = request->videos(j);
video_chunk["input_video"] = input_video;
content_array.push_back(video_chunk);
}
}
msg_json["content"] = content_array;
} else if (msg.role() == "tool") {
// Tool role messages must have content field set, even if empty
@@ -1932,17 +1944,6 @@ public:
body_json["chat_template_kwargs"]["enable_thinking"] = (et_it->second == "true");
}
// Pass reasoning_effort via chat_template_kwargs too: the lever
// jinja templates like gpt-oss (Harmony) / LFM2.5 read, distinct
// from enable_thinking which those templates ignore.
auto re_it = metadata.find("reasoning_effort");
if (re_it != metadata.end() && !re_it->second.empty()) {
if (!body_json.contains("chat_template_kwargs")) {
body_json["chat_template_kwargs"] = json::object();
}
body_json["chat_template_kwargs"]["reasoning_effort"] = re_it->second;
}
// Debug: Print full body_json before template processing (includes messages, tools, tool_choice, etc.)
SRV_DBG("[CONVERSATION DEBUG] PredictStream: Full body_json before oaicompat_chat_params_parse:\n%s\n", body_json.dump(2).c_str());
@@ -2068,16 +2069,6 @@ public:
files.push_back(decoded_data);
}
}
const auto &video_data = data.find("video_data");
if (video_data != data.end() && video_data->is_array())
{
for (const auto &video : *video_data)
{
auto decoded_data = base64_decode(video["data"].get<std::string>());
files.push_back(decoded_data);
}
}
}
const bool has_mtmd = ctx_server.impl->mctx != nullptr;
@@ -2330,7 +2321,7 @@ public:
}
bool is_last_user_msg = (i == last_user_msg_idx);
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0 || request->videos_size() > 0);
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0);
// Handle content - can be string, null, or array
// For multimodal content, we'll embed images/audio from separate fields
@@ -2383,16 +2374,6 @@ public:
content_array.push_back(audio_chunk);
}
}
if (request->videos_size() > 0) {
for (int j = 0; j < request->videos_size(); j++) {
json video_chunk;
video_chunk["type"] = "input_video";
json input_video;
input_video["data"] = request->videos(j);
video_chunk["input_video"] = input_video;
content_array.push_back(video_chunk);
}
}
msg_json["content"] = content_array;
} else {
// Use content as-is (already array or not last user message)
@@ -2432,16 +2413,6 @@ public:
content_array.push_back(audio_chunk);
}
}
if (request->videos_size() > 0) {
for (int j = 0; j < request->videos_size(); j++) {
json video_chunk;
video_chunk["type"] = "input_video";
json input_video;
input_video["data"] = request->videos(j);
video_chunk["input_video"] = input_video;
content_array.push_back(video_chunk);
}
}
msg_json["content"] = content_array;
SRV_INF("[CONTENT DEBUG] Predict: Message %d created content array with media\n", i);
} else if (!msg.tool_calls().empty()) {
@@ -2766,17 +2737,6 @@ public:
body_json["chat_template_kwargs"]["enable_thinking"] = (predict_et_it->second == "true");
}
// Pass reasoning_effort via chat_template_kwargs too: the lever
// jinja templates like gpt-oss (Harmony) / LFM2.5 read, distinct
// from enable_thinking which those templates ignore.
auto predict_re_it = predict_metadata.find("reasoning_effort");
if (predict_re_it != predict_metadata.end() && !predict_re_it->second.empty()) {
if (!body_json.contains("chat_template_kwargs")) {
body_json["chat_template_kwargs"] = json::object();
}
body_json["chat_template_kwargs"]["reasoning_effort"] = predict_re_it->second;
}
// Debug: Print full body_json before template processing (includes messages, tools, tool_choice, etc.)
SRV_DBG("[CONVERSATION DEBUG] Predict: Full body_json before oaicompat_chat_params_parse:\n%s\n", body_json.dump(2).c_str());
@@ -2904,16 +2864,6 @@ public:
files.push_back(decoded_data);
}
}
const auto &video_data = data.find("video_data");
if (video_data != data.end() && video_data->is_array())
{
for (const auto &video : *video_data)
{
auto decoded_data = base64_decode(video["data"].get<std::string>());
files.push_back(decoded_data);
}
}
}
// process files

View File

@@ -1,7 +1,7 @@
# Pinned to the HEAD of feature/turboquant-kv-cache on https://github.com/TheTom/llama-cpp-turboquant.
# Auto-bumped nightly by .github/workflows/bump_deps.yaml.
TURBOQUANT_VERSION?=7d9715f1f071fa07c7b2ad3dbfd320b314139e65
TURBOQUANT_VERSION?=5aeb2fdbe26cd4c534c6fa15de73cb5749bd0403
LLAMA_REPO?=https://github.com/TheTom/llama-cpp-turboquant
CMAKE_ARGS?=

View File

@@ -4,19 +4,21 @@
#
# 1. Augment the kv_cache_types[] allow-list so `LoadModel` accepts the
# fork-specific `turbo2` / `turbo3` / `turbo4` cache types.
# 2. Define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP at the top of the file
# so the grpc-server option parser skips the two references to
# common_params::checkpoint_min_step (the default and the option handler).
# That field does not exist in the fork yet; drop this once it does.
#
# The fork used to lag upstream on the whole common_params_speculative refactor
# (ggml-org/llama.cpp#22397/#22838/#22964), the model_tgt rename (#22838) and
# get_media_marker (#21962), which required a much larger compat shim here
# (flat-field sed renames + a coarse LOCALAI_LEGACY_LLAMA_CPP_SPEC define). The
# fork has since rebased past all of those, so the only remaining gap is
# checkpoint_min_step. If a future bump reintroduces a divergence, add a narrow
# guard in grpc-server.cpp keyed on a fork-specific macro and inject it here
# rather than resurrecting the coarse one.
# 2. Replace `get_media_marker()` (added upstream in ggml-org/llama.cpp#21962,
# server-side random per-instance marker) with the legacy "<__media__>"
# literal. The fork branched before that PR, so server-common.cpp has no
# get_media_marker symbol. The fork's mtmd_default_marker() still returns
# "<__media__>", and Go-side tooling falls back to that sentinel when the
# backend does not expose media_marker, so substituting the literal keeps
# behavior identical on the turboquant path.
# 3. Revert the `common_params_speculative` field references to the
# pre-refactor flat layout. Upstream ggml-org/llama.cpp#22397 split the
# struct into nested `draft` / `ngram_simple` / `ngram_mod` / etc. members;
# the turboquant fork branched before that PR and still exposes the flat
# `n_max`, `mparams_dft`, `ngram_size_n`, ... fields. The substitutions
# below map the new nested paths back to the legacy flat names so the
# shared grpc-server.cpp keeps compiling against the fork's common.h.
# Drop this block once the fork rebases past #22397.
#
# We patch the *copy* sitting in turboquant-<flavor>-build/, never the original
# under backend/cpp/llama-cpp/, so the stock llama-cpp build keeps compiling
@@ -70,20 +72,72 @@ else
echo "==> KV allow-list patch OK"
fi
# 2. Define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP at the top of the file so
# the grpc-server option parser skips the two references to
# common_params::checkpoint_min_step (the default assignment and the option
# handler). That field does not exist in the fork yet. Drop this block once
# the fork rebases past the bump that added checkpoint_min_step.
if grep -q '^#define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP' "$SRC"; then
echo "==> $SRC already defines LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP, skipping"
if grep -q 'get_media_marker()' "$SRC"; then
echo "==> patching $SRC to replace get_media_marker() with legacy \"<__media__>\" literal"
# Only one call site today (ModelMetadata), but replace all occurrences to
# stay robust if upstream adds more. Use a temp file to avoid relying on
# sed -i portability (the builder image uses GNU sed, but keeping this
# consistent with the awk block above).
sed 's/get_media_marker()/"<__media__>"/g' "$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> get_media_marker() substitution OK"
else
echo "==> patching $SRC to define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP at the top"
# Insert the define before the very first `#include` so it precedes the
# checkpoint_min_step references.
echo "==> $SRC has no get_media_marker() call, skipping media-marker patch"
fi
if grep -q 'params\.speculative\.draft\.\|params\.speculative\.ngram_simple\.' "$SRC"; then
echo "==> patching $SRC to revert common_params_speculative refs to pre-#22397 flat layout"
# Each substitution is the exact post-refactor path → legacy flat field.
# Order doesn't matter because the source paths are disjoint, but we keep
# the most-specific (mparams.path) first for readability.
sed -E \
-e 's/params\.speculative\.draft\.mparams\.path/params.speculative.mparams_dft.path/g' \
-e 's/params\.speculative\.draft\.n_max/params.speculative.n_max/g' \
-e 's/params\.speculative\.draft\.n_min/params.speculative.n_min/g' \
-e 's/params\.speculative\.draft\.p_min/params.speculative.p_min/g' \
-e 's/params\.speculative\.draft\.p_split/params.speculative.p_split/g' \
-e 's/params\.speculative\.draft\.n_gpu_layers/params.speculative.n_gpu_layers/g' \
-e 's/params\.speculative\.draft\.n_ctx/params.speculative.n_ctx/g' \
-e 's/params\.speculative\.ngram_simple\.size_n/params.speculative.ngram_size_n/g' \
-e 's/params\.speculative\.ngram_simple\.size_m/params.speculative.ngram_size_m/g' \
-e 's/params\.speculative\.ngram_simple\.min_hits/params.speculative.ngram_min_hits/g' \
"$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> speculative field rename OK"
else
echo "==> $SRC has no post-#22397 speculative field refs, skipping spec rename patch"
fi
# 4. Revert the `ctx_server.impl->model_tgt` rename introduced by upstream
# ggml-org/llama.cpp#22838 (parallel drafting). The turboquant fork still
# exposes the field as `model` on `server_context_impl`. The two call sites
# are in the Rerank and ModelMetadata RPC handlers.
if grep -q 'ctx_server\.impl->model_tgt' "$SRC"; then
echo "==> patching $SRC to revert ctx_server.impl->model_tgt -> ctx_server.impl->model"
sed -E 's/ctx_server\.impl->model_tgt/ctx_server.impl->model/g' "$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> model_tgt rename OK"
else
echo "==> $SRC has no ctx_server.impl->model_tgt refs, skipping model_tgt rename patch"
fi
# 5. Define LOCALAI_LEGACY_LLAMA_CPP_SPEC at the top of the file so the
# grpc-server option parser skips the new option-handler blocks (ngram_mod,
# ngram_map_k, ngram_map_k4v, ngram_cache, draft.cache_type_*, draft.cpuparams*,
# draft.tensor_buft_overrides) introduced for the post-#22838 layout, the
# draft.tensor_buft_overrides sentinel termination, and the
# common_params::checkpoint_min_step default/option (added with the
# 35c9b1f3 bump). Those blocks reference struct fields that simply do not
# exist in the fork.
if grep -q '^#define LOCALAI_LEGACY_LLAMA_CPP_SPEC' "$SRC"; then
echo "==> $SRC already defines LOCALAI_LEGACY_LLAMA_CPP_SPEC, skipping"
else
echo "==> patching $SRC to define LOCALAI_LEGACY_LLAMA_CPP_SPEC at the top"
# Insert the define before the very first `#include` so it precedes all the
# speculative-decoding code paths.
awk '
!done && /^#include/ {
print "#define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP 1"
print "#define LOCALAI_LEGACY_LLAMA_CPP_SPEC 1"
print "// ^ injected by backend/cpp/turboquant/patch-grpc-server.sh"
print ""
done = 1
@@ -91,13 +145,13 @@ else
{ print }
END {
if (!done) {
print "patch-grpc-server.sh: no #include anchor found to insert LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP" > "/dev/stderr"
print "patch-grpc-server.sh: no #include anchor found to insert LOCALAI_LEGACY_LLAMA_CPP_SPEC" > "/dev/stderr"
exit 1
}
}
' "$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP define OK"
echo "==> LOCALAI_LEGACY_LLAMA_CPP_SPEC define OK"
fi
echo "==> all patches applied"

View File

@@ -1,55 +0,0 @@
hip: port the turboquant CUDA additions that ggml's HIP shim doesn't cover
The turboquant fork adds/modifies a few ggml-cuda.cu spots with CUDA APIs
that ggml's HIP (and MUSA) compatibility layer does not provide, breaking
the -gpu-rocm-hipblas-turboquant build:
1. ggml_cuda_copy2d_across_devices() (host-staged cross-device copy for
split mul_mat output) uses the CUDA 3D-peer copy APIs
cudaMemcpy3DPeerParms / make_cudaPitchedPtr / make_cudaExtent /
cudaMemcpy3DPeerAsync. HIP genuinely does not support these (see the
fork's own comment "HIP does not support cudaMemcpy3DPeerAsync"), so
guard the peer fast path with #if !defined(GGML_USE_HIP) &&
!defined(GGML_USE_MUSA) -- matching how the fork already guards the
same API for the sibling 2D copy -- and fall through to the existing
cudaMemcpyAsync staging fallback below (functionally identical,
slightly slower on multi-GPU ROCm).
2. ggml_backend_cuda_device_event_new() creates its event with plain
cudaEventCreate, which ggml's HIP shim does not alias (it only aliases
cudaEventCreateWithFlags). Use cudaEventCreateWithFlags(...,
cudaEventDisableTiming) -- exactly what the rest of this file already
does (cf. lines ~1034, ~3461) and HIP-safe.
CUDA builds are unaffected. Drop the relevant hunk once the fork HIP-ports
these; apply-patches.sh fails fast if an anchor goes stale.
diff --git a/ggml/src/ggml-cuda/ggml-cuda.cu b/ggml/src/ggml-cuda/ggml-cuda.cu
index 0427e6b..6352e6a 100644
--- a/ggml/src/ggml-cuda/ggml-cuda.cu
+++ b/ggml/src/ggml-cuda/ggml-cuda.cu
@@ -1933,6 +1933,7 @@ static cudaError_t ggml_cuda_copy2d_across_devices(
size_t width, size_t height, cudaStream_t dst_stream, cudaStream_t src_stream) {
const auto & info = ggml_cuda_info();
+#if !defined(GGML_USE_HIP) && !defined(GGML_USE_MUSA) // 3D-peer copy types unmapped by ggml's HIP/MUSA shim; use staging fallback below
if (info.peer_access[src_device][dst_device]) {
cudaMemcpy3DPeerParms p = {};
p.dstDevice = dst_device;
@@ -1942,6 +1943,7 @@ static cudaError_t ggml_cuda_copy2d_across_devices(
p.extent = make_cudaExtent(width, height, 1);
return cudaMemcpy3DPeerAsync(&p, dst_stream);
}
+#endif // !defined(GGML_USE_HIP) && !defined(GGML_USE_MUSA)
// Fallback: stage all rows through a single contiguous pinned buffer
int prev_device = ggml_cuda_get_device();
@@ -5714,7 +5716,7 @@ static ggml_backend_event_t ggml_backend_cuda_device_event_new(ggml_backend_dev_
ggml_cuda_set_device(dev_ctx->device);
cudaEvent_t event;
- CUDA_CHECK(cudaEventCreate(&event));
+ CUDA_CHECK(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
return new ggml_backend_event {
/* .device = */ dev,

View File

@@ -14,7 +14,6 @@ import (
"github.com/mudler/xlog"
"github.com/mudler/LocalAI/pkg/grpc/base"
"github.com/mudler/LocalAI/pkg/grpc/grpcerrors"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/httpclient"
)
@@ -146,7 +145,7 @@ func resolveAPIKey(envName, filePath string) (string, error) {
func (c *CloudProxy) PredictRich(opts *pb.PredictOptions) (reply *pb.Reply, err error) {
cfg := c.cfg.Load()
if cfg == nil {
return nil, grpcerrors.ModelNotLoaded("cloud-proxy")
return nil, errors.New("cloud-proxy: model not loaded")
}
if cfg.mode != modeTranslate {
return nil, fmt.Errorf("cloud-proxy: Predict only valid in translate mode (have %s)", cfg.mode)
@@ -176,7 +175,7 @@ func (c *CloudProxy) PredictRich(opts *pb.PredictOptions) (reply *pb.Reply, err
func (c *CloudProxy) PredictStreamRich(opts *pb.PredictOptions, results chan<- *pb.Reply) (err error) {
cfg := c.cfg.Load()
if cfg == nil {
return grpcerrors.ModelNotLoaded("cloud-proxy")
return errors.New("cloud-proxy: model not loaded")
}
if cfg.mode != modeTranslate {
return fmt.Errorf("cloud-proxy: PredictStream only valid in translate mode (have %s)", cfg.mode)
@@ -270,7 +269,7 @@ func (c *CloudProxy) Forward(ctx context.Context, in <-chan *pb.ForwardRequest,
cfg := c.cfg.Load()
if cfg == nil {
return grpcerrors.ModelNotLoaded("cloud-proxy")
return errors.New("cloud-proxy: model not loaded")
}
if cfg.mode != modePassthrough {
return fmt.Errorf("cloud-proxy: Forward only valid in passthrough mode (have %s)", cfg.mode)

View File

@@ -14,7 +14,7 @@ target_include_directories(gocrispasr PRIVATE
# whisper. crispasr is the referencer; the backend static libs supply the
# per-architecture symbols; ggml is the math/runtime base.
target_link_libraries(gocrispasr PRIVATE
crispasr-lib
crispasr
parakeet canary canary_ctc cohere granite_speech granite_nle
voxtral voxtral4b qwen3_asr qwen3_tts orpheus chatterbox indextts
kokoro voxcpm2_tts m2m100 t5_translate wav2vec2-ggml vibevoice

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# CrispASR version (release tag)
CRISPASR_REPO?=https://github.com/CrispStrobe/CrispASR
CRISPASR_VERSION?=f7838a306687f22c281d29c250f879a4ab3df2d7
CRISPASR_VERSION?=05e60432bcb5bc2113f8c395a41e86497c11504a
SO_TARGET?=libgocrispasr.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -1,6 +1,6 @@
# parakeet-cpp backend Makefile.
#
# Upstream pin lives below as PARAKEET_VERSION?=e270af73b94c9a5c37ec516230219ed4580e1db6
# Upstream pin lives below as PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543
# (.github/bump_deps.sh) can find and update it - matches the
# whisper.cpp / ds4 / vibevoice-cpp convention.
#
@@ -15,7 +15,7 @@
# That's what the L0 smoke test uses. The default target below does the
# proper clone-at-pin + cmake build so CI doesn't need a side-checkout.
PARAKEET_VERSION?=e270af73b94c9a5c37ec516230219ed4580e1db6
PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543
PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp
GOCMD?=go

View File

@@ -7,12 +7,8 @@ import "time"
type batchRequest struct {
pcm []float32
decoder int32
// language is the per-request target locale ("" means the model default).
// parakeet.cpp's batched C-API takes ONE target_lang for the whole batch,
// so the dispatcher only coalesces requests that share a language.
language string
tag string
reply chan batchReply
tag string
reply chan batchReply
}
// batchReply carries one per-item JSON object string (an element of the C-API's
@@ -47,25 +43,13 @@ func newBatcher(maxSize int, maxWait time.Duration, runBatch func([]*batchReques
// run is the dispatcher loop: accumulate submitted requests until either maxSize
// is reached or maxWait elapses since the first queued request, then dispatch.
// Exits when stop is closed (draining any partially-filled batch first).
//
// A batch carries ONE language (parakeet.cpp's batched C-API takes a single
// target_lang), so a request whose language differs from the batch leader is
// not coalesced: it is held in carry and becomes the leader of the next batch.
// carry is therefore never dropped and its caller never deadlocks: every batch
// (including a lone carry on stop) is dispatched, and runBatch replies to all.
func (b *batcher) run(stop <-chan struct{}) {
var carry *batchRequest
for {
var first *batchRequest
if carry != nil {
// A mismatched request from the previous fill leads this batch.
first, carry = carry, nil
} else {
select {
case first = <-b.submit:
case <-stop:
return
}
select {
case first = <-b.submit:
case <-stop:
return
}
batch := []*batchRequest{first}
@@ -80,22 +64,12 @@ func (b *batcher) run(stop <-chan struct{}) {
for len(batch) < b.maxSize {
select {
case r := <-b.submit:
if r.language != first.language {
// Different language: carry it to the next batch so this
// batch stays single-language, then dispatch what we have.
carry = r
break fill
}
batch = append(batch, r)
case <-timer.C:
break fill
case <-stop:
timer.Stop()
b.runBatch(batch)
// Don't strand a carried request's caller on shutdown.
if carry != nil {
b.runBatch([]*batchRequest{carry})
}
return
}
}

View File

@@ -105,60 +105,4 @@ var _ = Describe("batcher", func() {
go func() { <-rep }()
Eventually(dispatched, "2s").Should(Receive(Equal(1)))
})
It("never coalesces requests with different languages into one batch", func() {
// parakeet.cpp's batched C-API takes ONE target_lang per batch, so the
// dispatcher must keep every dispatched batch single-language. Submit a
// mix of languages and assert (a) no batch ever carries more than one
// distinct language and (b) every submitted request still gets a reply
// (the mismatched carry-over is never dropped).
var mu sync.Mutex
var langsPerBatch [][]string
run := func(reqs []*batchRequest) {
seen := map[string]struct{}{}
var distinct []string
for _, r := range reqs {
if _, ok := seen[r.language]; !ok {
seen[r.language] = struct{}{}
distinct = append(distinct, r.language)
}
}
mu.Lock()
langsPerBatch = append(langsPerBatch, distinct)
mu.Unlock()
echoReply(reqs)
}
// Large window + size so the fill loop stays open across submits and the
// language constraint (not the timer) is what splits the batches.
b := newBatcher(16, 200*time.Millisecond, run)
stop := make(chan struct{})
go b.run(stop)
defer close(stop)
langs := []string{"en", "en", "de", "de", "en", "fr", "fr"}
const N = 7
var wg sync.WaitGroup
got := make([]string, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
rep := make(chan batchReply, 1)
b.submit <- &batchRequest{tag: string(rune('a' + i)), language: langs[i], reply: rep}
got[i] = (<-rep).json
}(i)
}
wg.Wait()
mu.Lock()
defer mu.Unlock()
// Invariant: every dispatched batch is single-language.
for _, distinct := range langsPerBatch {
Expect(len(distinct)).To(Equal(1), "a batch coalesced more than one language: %v", distinct)
}
// Liveness: every request got a reply (carry-over never stranded).
for i := 0; i < N; i++ {
Expect(got[i]).To(Equal(string(rune('a' + i))))
}
})
})

View File

@@ -15,7 +15,6 @@ import (
"github.com/go-audio/wav"
"github.com/mudler/LocalAI/pkg/grpc/base"
"github.com/mudler/LocalAI/pkg/grpc/grpcerrors"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/xlog"
@@ -48,13 +47,6 @@ var (
// side reads them as const float*/const int*.
CppTranscribePcmBatchJSON func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32) uintptr
// CppTranscribePcmBatchJSONLang is the multilingual variant of the batched
// JSON entry point: identical, plus a trailing target_lang. "" (the model
// default, "auto") is passed for non-prompt models, which ignore it; an
// unknown locale on a prompt model returns 0 and sets last_error. Present
// only in newer libparakeet.so; nil falls back to CppTranscribePcmBatchJSON.
CppTranscribePcmBatchJSONLang func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32, targetLang string) uintptr
// Cache-aware streaming (RNN-T) entry points. stream_begin returns 0 for
// non-streaming models. feed/finalize return a malloc'd char* (uintptr,
// freed via CppFreeString); feed writes 1 to *eouOut on an <EOU>/<EOB>.
@@ -62,18 +54,6 @@ var (
CppStreamFeed func(s uintptr, pcm []float32, nSamples int32, eouOut unsafe.Pointer) uintptr
CppStreamFinalize func(s uintptr) uintptr
CppStreamFree func(s uintptr)
// CppStreamBeginLang is the multilingual variant of stream_begin: identical,
// plus a trailing target_lang ("" means the model default). Present only in
// newer libparakeet.so; nil falls back to CppStreamBegin.
CppStreamBeginLang func(ctx uintptr, targetLang string) uintptr
// Streaming JSON variants (ABI v4): feed/finalize returning a malloc'd char*
// JSON document {text,eou,frame_sec,words} (uintptr, freed via CppFreeString)
// so streaming segments can carry per-word timestamps. Present only in newer
// libparakeet.so; nil falls back to the text-only CppStreamFeed/Finalize path.
CppStreamFeedJSON func(s uintptr, pcm []float32, nSamples int32) uintptr
CppStreamFinalizeJSON func(s uintptr) uintptr
)
// streamChunkSamples is how much 16 kHz mono PCM we hand to stream_feed per
@@ -91,26 +71,9 @@ const streamChunkSamples = 16000
//
// "start"/"end"/"t" are seconds; "conf" is confidence in (0,1].
type transcriptJSON struct {
Text string `json:"text"`
FrameSec float64 `json:"frame_sec"`
Words []transcriptWord `json:"words"`
Tokens []transcriptToken `json:"tokens"`
}
// streamFeedJSON mirrors the document returned by
// parakeet_capi_stream_feed_json / parakeet_capi_stream_finalize_json (ABI v4):
//
// {"text":"...","eou":0,"frame_sec":0.080000,
// "words":[{"w":"...","start":0.480,"end":0.640,"conf":0.9100}, ...]}
//
// "text" is the newly-finalized text since the last call; "eou" is 1 when an
// <EOU>/<EOB> fired this feed; "words" are the words finalized this call with
// absolute (stream-relative) start/end seconds.
type streamFeedJSON struct {
Text string `json:"text"`
Eou int `json:"eou"`
FrameSec float64 `json:"frame_sec"`
Words []transcriptWord `json:"words"`
Text string `json:"text"`
Words []transcriptWord `json:"words"`
Tokens []transcriptToken `json:"tokens"`
}
type transcriptWord struct {
@@ -139,10 +102,6 @@ type ParakeetCpp struct {
engineMu sync.Mutex // sole guard of the one C engine (dispatcher + streaming)
bat *batcher
batStop chan struct{}
// segmentGapFrames is NeMo's segment_gap_threshold in ENCODER FRAMES (model
// YAML option, default 0=off). When >0 it adds NeMo's silence-gap split on
// top of the punctuation split; converted to seconds via the JSON frame_sec.
segmentGapFrames int
}
// Load is the LocalAI gRPC entry point for LoadModel: it calls
@@ -172,11 +131,6 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error {
if maxWaitMs < 0 {
maxWaitMs = 0
}
// NeMo's segment_gap_threshold (encoder frames, default 0=off). Off by
// default matches NeMo's default (punctuation-only segments); when set it
// additionally splits segments on inter-word silence (see transcriptResultFromDoc).
p.segmentGapFrames = optInt(opts, "segment_gap_threshold", 0)
if CppTranscribePcmBatchJSON != nil {
p.batStop = make(chan struct{})
p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch)
@@ -232,19 +186,8 @@ func (p *ParakeetCpp) runBatch(reqs []*batchRequest) {
if len(reqs) > 0 {
dec = reqs[0].decoder
}
// All requests in a batch share one language (the batcher coalesces only
// same-language requests), so any element's language describes the batch.
lang := ""
if len(reqs) > 0 {
lang = reqs[0].language
}
p.engineMu.Lock()
var cstr uintptr
if CppTranscribePcmBatchJSONLang != nil {
cstr = CppTranscribePcmBatchJSONLang(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec, lang)
} else {
cstr = CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec)
}
cstr := CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec)
p.engineMu.Unlock()
if cstr == 0 {
err := fmt.Errorf("parakeet-cpp: batch transcribe failed: %s", CppLastError(p.ctxPtr))
@@ -282,31 +225,21 @@ func (p *ParakeetCpp) runBatch(reqs []*batchRequest) {
// OpenAI API, whose default is segment-level); token ids always populate
// Segment.Tokens.
//
// translate/diarize/prompt/temperature/threads are not applicable to parakeet
// and are ignored; language is honored on the batched + streaming paths (see
// opts.GetLanguage() below); streaming is handled by AudioTranscriptionStream
// translate/diarize/prompt/temperature/language/threads are not applicable to
// parakeet and are ignored; streaming is handled by AudioTranscriptionStream
// (L2).
func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) {
if p.ctxPtr == 0 {
return pb.TranscriptResult{}, grpcerrors.ModelNotLoaded("parakeet-cpp")
return pb.TranscriptResult{}, errors.New("parakeet-cpp: model not loaded")
}
if opts.Dst == "" {
return pb.TranscriptResult{}, errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required")
}
// Fallback when the batched C-API is unavailable: transcribe from a file
// path (original behavior, no batching). The C library's audio loader only
// understands 16 kHz mono WAV/PCM, so convert the input first - otherwise
// any non-WAV upload (MP3, etc.) fails with "failed to load audio". This
// mirrors what every other audio backend (whisper, crispasr) does via
// utils.AudioToWav before handing the file to the engine.
// Fallback when the batched C-API is unavailable: transcribe directly from
// the file path (original behavior, no batching).
if p.bat == nil {
converted, cleanup, err := convertToWavMono16k(opts.Dst)
if err != nil {
return pb.TranscriptResult{}, err
}
defer cleanup()
cstr := CppTranscribePathJSON(p.ctxPtr, converted, 0)
cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0)
if cstr == 0 {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", CppLastError(p.ctxPtr))
}
@@ -316,7 +249,7 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts, p.segmentGapFrames), nil
return transcriptResultFromDoc(doc, opts), nil
}
// Batched path: decode to PCM, submit to the batcher, wait for this request's
@@ -328,7 +261,7 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip
}
rep := make(chan batchReply, 1)
select {
case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, language: opts.GetLanguage(), reply: rep}:
case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, reply: rep}:
case <-ctx.Done():
return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled")
}
@@ -345,169 +278,34 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip
if err := json.Unmarshal([]byte(res.json), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts, p.segmentGapFrames), nil
return transcriptResultFromDoc(doc, opts), nil
}
// segmentSeparators is NeMo's default segment_seperators (sentence-ending
// punctuation). Splitting on these matches NeMo's default segment timestamps.
var segmentSeparators = []rune{'.', '?', '!'}
// transcriptResultFromDoc maps a decoded transcriptJSON to a TranscriptResult,
// grouping words into NeMo-faithful segments (see splitWordsIntoSegments). The
// optional gapFrames (NeMo's segment_gap_threshold, in encoder FRAMES; 0=off)
// additionally splits on inter-word silence; it is converted to a seconds gap
// with the document's frame_sec. Per-segment word timings are attached only when
// the caller requested word granularity; token ids populate each segment's
// Tokens by time-window membership. Shared by the batched and direct paths.
func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest, gapFrames int) pb.TranscriptResult {
// synthesising a single whole-clip segment and attaching word timings only when
// the caller requested word granularity. Shared by the batched and direct paths.
func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest) pb.TranscriptResult {
text := strings.TrimSpace(doc.Text)
// Frame-unit gap threshold -> seconds (NeMo segment_gap_threshold). 0 = off.
gapSeconds := 0.0
if gapFrames > 0 {
if doc.FrameSec > 0 {
gapSeconds = float64(gapFrames) * doc.FrameSec
} else {
xlog.Warn("parakeet-cpp: segment_gap_threshold set but libparakeet.so " +
"did not report frame_sec; falling back to punctuation-only segments")
}
words := make([]*pb.TranscriptWord, 0, len(doc.Words))
for _, w := range doc.Words {
words = append(words, &pb.TranscriptWord{Start: secondsToNanos(w.Start), End: secondsToNanos(w.End), Text: w.W})
}
groups := splitWordsIntoSegments(doc.Words, segmentSeparators, gapSeconds)
if len(groups) == 0 {
// No words (edge case): single whole-clip text segment.
return pb.TranscriptResult{
Text: text,
Segments: []*pb.TranscriptSegment{{Id: 0, Text: text}},
}
tokens := make([]int32, 0, len(doc.Tokens))
for _, t := range doc.Tokens {
tokens = append(tokens, t.ID)
}
wantWords := wordsRequested(opts.TimestampGranularities)
segments := make([]*pb.TranscriptSegment, 0, len(groups))
for id, group := range groups {
parts := make([]string, len(group))
for i, gw := range group {
parts[i] = gw.W
}
seg := &pb.TranscriptSegment{
Id: int32(id),
Start: secondsToNanos(group[0].Start),
End: secondsToNanos(group[len(group)-1].End),
Text: strings.TrimSpace(strings.Join(parts, " ")),
Tokens: tokensInWindow(doc.Tokens, group[0].Start, group[len(group)-1].End),
}
if wantWords {
ws := make([]*pb.TranscriptWord, len(group))
for i, gw := range group {
ws[i] = &pb.TranscriptWord{Start: secondsToNanos(gw.Start), End: secondsToNanos(gw.End), Text: gw.W}
}
seg.Words = ws
}
segments = append(segments, seg)
var segStart, segEnd int64
if len(words) > 0 {
segStart = words[0].Start
segEnd = words[len(words)-1].End
}
return pb.TranscriptResult{Text: text, Segments: segments}
seg := &pb.TranscriptSegment{Id: 0, Start: segStart, End: segEnd, Text: text, Tokens: tokens}
if wordsRequested(opts.TimestampGranularities) {
seg.Words = words
}
return pb.TranscriptResult{Text: text, Segments: []*pb.TranscriptSegment{seg}}
}
// splitWordsIntoSegments groups words into segments exactly as NeMo's
// get_segment_offsets does (nemo/collections/asr/parts/utils/timestamp_utils.py).
// Walking the words, it closes a segment when (1) the gap rule is enabled
// (gapSeconds > 0) and the segment already has words and the gap from the
// previous word's end to this word's start is >= gapSeconds - the current word
// then STARTS a new segment - or, checked only when the gap rule did not apply
// (NeMo's elif), (2) the word ends with (or is) a separator, which closes the
// segment INCLUDING that word. Trailing words flush into a final segment.
// gapSeconds <= 0 disables the gap rule, matching NeMo's default
// segment_gap_threshold=None (punctuation-only segments).
func splitWordsIntoSegments(words []transcriptWord, separators []rune, gapSeconds float64) [][]transcriptWord {
var segments [][]transcriptWord
var cur []transcriptWord
for i, word := range words {
gapActive := gapSeconds > 0 && len(cur) > 0
if gapActive && (word.Start-words[i-1].End) >= gapSeconds {
segments = append(segments, cur)
cur = []transcriptWord{word}
continue
}
if !gapActive && endsWithSeparator(word.W, separators) {
cur = append(cur, word)
segments = append(segments, cur)
cur = nil
continue
}
cur = append(cur, word)
}
if len(cur) > 0 {
segments = append(segments, cur)
}
return segments
}
// endsWithSeparator reports whether w's last rune is in separators (matching
// NeMo's `word[-1] in delims or word in delims`).
func endsWithSeparator(w string, separators []rune) bool {
r := []rune(strings.TrimSpace(w))
if len(r) == 0 {
return false
}
last := r[len(r)-1]
for _, s := range separators {
if last == s {
return true
}
}
return false
}
// tokensInWindow returns the ids of tokens whose timestamp t falls in
// [start, end] (inclusive), assigning each token to the segment that spans its
// time. The last segment's end is the last word end, so the final token is
// included.
func tokensInWindow(tokens []transcriptToken, start, end float64) []int32 {
var ids []int32
for _, t := range tokens {
if t.T >= start && t.T <= end {
ids = append(ids, t.ID)
}
}
return ids
}
// streamSegmenter accumulates streaming words into per-utterance segments. EOU
// is the model's own utterance boundary; each closed segment takes its start/end
// from its first/last accumulated word.
type streamSegmenter struct {
segs []*pb.TranscriptSegment
cur []transcriptWord
nextID int32
}
func (s *streamSegmenter) add(doc streamFeedJSON) {
s.cur = append(s.cur, doc.Words...)
if doc.Eou != 0 {
s.flush()
}
}
func (s *streamSegmenter) flush() {
if len(s.cur) == 0 {
return
}
parts := make([]string, len(s.cur))
for i, w := range s.cur {
parts[i] = w.W
}
s.segs = append(s.segs, &pb.TranscriptSegment{
Id: s.nextID,
Start: secondsToNanos(s.cur[0].Start),
End: secondsToNanos(s.cur[len(s.cur)-1].End),
Text: strings.TrimSpace(strings.Join(parts, " ")),
})
s.nextID++
s.cur = nil
}
func (s *streamSegmenter) segments() []*pb.TranscriptSegment { return s.segs }
// wordsRequested reports whether the caller asked for word-level timestamps.
// The OpenAI transcription API gates word timings behind
// timestamp_granularities[] containing "word" and defaults to segment-level
@@ -544,7 +342,7 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
defer close(results)
if p.ctxPtr == 0 {
return grpcerrors.ModelNotLoaded("parakeet-cpp")
return errors.New("parakeet-cpp: model not loaded")
}
if opts.Dst == "" {
return errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required")
@@ -553,12 +351,7 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
return status.Error(codes.Canceled, "transcription cancelled")
}
var stream uintptr
if CppStreamBeginLang != nil {
stream = CppStreamBeginLang(p.ctxPtr, opts.GetLanguage())
} else {
stream = CppStreamBegin(p.ctxPtr)
}
stream := CppStreamBegin(p.ctxPtr)
if stream == 0 {
// Not a cache-aware streaming model: run a normal offline
// transcription and emit it as one delta + a closing final result.
@@ -587,14 +380,6 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
return err
}
// ABI v4: when the streaming JSON entry points are present, drive them so the
// per-utterance segments carry per-word start/end timestamps. Falls through to
// the text-only loop below against an older libparakeet.so. Runs under the
// engineMu already held above.
if CppStreamFeedJSON != nil {
return p.streamJSON(ctx, stream, data, duration, results)
}
var (
full strings.Builder
segText strings.Builder
@@ -671,102 +456,21 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
return nil
}
// streamJSON drives the ABI v4 streaming JSON entry points: each feed/finalize
// returns a {text,eou,frame_sec,words} document. The newly-finalized text is
// emitted as a delta (unchanged streaming contract) while words are accumulated
// into per-utterance segments (closed on EOU) so the closing FinalResult carries
// timestamped segments. Runs under engineMu (already held by the caller).
func (p *ParakeetCpp) streamJSON(ctx context.Context, stream uintptr, data []float32,
duration float32, results chan *pb.TranscriptStreamResponse) error {
var (
full strings.Builder
seg streamSegmenter
)
// consume frees the malloc'd char* (a 0 return is an error), parses the JSON,
// emits the delta, and routes words through the segmenter.
consume := func(ret uintptr) error {
if ret == 0 {
msg := CppLastError(p.ctxPtr)
if msg == "" {
msg = "unknown error"
}
return fmt.Errorf("parakeet-cpp: stream feed/finalize failed: %s", msg)
}
raw := goStringFromCPtr(ret)
CppFreeString(ret)
var doc streamFeedJSON
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return fmt.Errorf("parakeet-cpp: decode stream json: %w", err)
}
if doc.Text != "" {
full.WriteString(doc.Text)
results <- &pb.TranscriptStreamResponse{Delta: doc.Text}
}
seg.add(doc)
return nil
}
for off := 0; off < len(data); off += streamChunkSamples {
if err := ctx.Err(); err != nil {
return status.Error(codes.Canceled, "transcription cancelled")
}
end := min(off+streamChunkSamples, len(data))
chunk := data[off:end]
if err := consume(CppStreamFeedJSON(stream, chunk, int32(len(chunk)))); err != nil {
return err
}
}
if err := consume(CppStreamFinalizeJSON(stream)); err != nil {
return err
}
seg.flush() // close any trailing utterance that never saw an EOU
text := strings.TrimSpace(full.String())
segments := seg.segments()
if len(segments) == 0 && text != "" {
segments = append(segments, &pb.TranscriptSegment{Id: 0, Text: text})
}
results <- &pb.TranscriptStreamResponse{
FinalResult: &pb.TranscriptResult{
Text: text,
Segments: segments,
Duration: duration,
},
}
return nil
}
// decodeWavMono16k converts any input audio to 16 kHz mono PCM and returns the
// float samples plus the clip duration in seconds. Mirrors the whisper
// backend: utils.AudioToWav (ffmpeg) normalises rate/channels, go-audio
// decodes the PCM.
// convertToWavMono16k converts an arbitrary audio file to a 16 kHz mono WAV in
// a fresh temp dir and returns the path together with a cleanup func the caller
// must defer. WAV inputs already at 16 kHz/mono/16-bit are passed through by
// utils.AudioToWav (hardlink/copy), everything else is transcoded via ffmpeg.
// Used by the direct (non-batched) transcription path, which hands a file path
// to the C library's WAV-only audio loader.
func convertToWavMono16k(path string) (string, func(), error) {
dir, err := os.MkdirTemp("", "parakeet")
if err != nil {
return "", func() {}, err
}
cleanup := func() { _ = os.RemoveAll(dir) }
converted := filepath.Join(dir, "converted.wav")
if err := utils.AudioToWav(path, converted); err != nil {
cleanup()
return "", func() {}, err
}
return converted, cleanup, nil
}
func decodeWavMono16k(path string) ([]float32, float32, error) {
converted, cleanup, err := convertToWavMono16k(path)
dir, err := os.MkdirTemp("", "parakeet")
if err != nil {
return nil, 0, err
}
defer cleanup()
defer func() { _ = os.RemoveAll(dir) }()
converted := filepath.Join(dir, "converted.wav")
if err := utils.AudioToWav(path, converted); err != nil {
return nil, 0, err
}
fh, err := os.Open(converted)
if err != nil {

View File

@@ -3,14 +3,11 @@ package main
import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"github.com/ebitengine/purego"
"github.com/go-audio/audio"
"github.com/go-audio/wav"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -53,10 +50,6 @@ func ensureLibLoaded() {
purego.RegisterLibFunc(&CppStreamFeed, lib, "parakeet_capi_stream_feed")
purego.RegisterLibFunc(&CppStreamFinalize, lib, "parakeet_capi_stream_finalize")
purego.RegisterLibFunc(&CppStreamFree, lib, "parakeet_capi_stream_free")
if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_feed_json"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppStreamFeedJSON, lib, "parakeet_capi_stream_feed_json")
purego.RegisterLibFunc(&CppStreamFinalizeJSON, lib, "parakeet_capi_stream_finalize_json")
}
purego.RegisterLibFunc(&CppFreeString, lib, "parakeet_capi_free_string")
purego.RegisterLibFunc(&CppLastError, lib, "parakeet_capi_last_error")
})
@@ -77,24 +70,6 @@ func fixturesOrSkip() (string, string) {
return modelPath, audioPath
}
// writeMono16kWav writes `samples` frames of 16 kHz mono 16-bit silence to
// path. The result is already in AudioToWav's target format, so the conversion
// helper copies it through without invoking ffmpeg.
func writeMono16kWav(path string, samples int) {
GinkgoHelper()
f, err := os.Create(path)
Expect(err).ToNot(HaveOccurred())
enc := wav.NewEncoder(f, 16000, 16, 1, 1)
buf := &audio.IntBuffer{
Format: &audio.Format{NumChannels: 1, SampleRate: 16000},
SourceBitDepth: 16,
Data: make([]int, samples),
}
Expect(enc.Write(buf)).To(Succeed())
Expect(enc.Close()).To(Succeed())
Expect(f.Close()).To(Succeed())
}
var _ = Describe("ParakeetCpp", func() {
Context("AudioTranscription", func() {
It("transcribes a WAV via the parakeet C-API", func() {
@@ -111,22 +86,13 @@ var _ = Describe("ParakeetCpp", func() {
Expect(err).ToNot(HaveOccurred())
Expect(strings.TrimSpace(res.Text)).ToNot(BeEmpty(),
"expected non-empty transcript for %s", audioPath)
// NeMo-faithful segmentation: one or more punctuation-delimited
// segments, each with text and a monotonically-advancing time span.
Expect(res.Segments).ToNot(BeEmpty(), "expected at least one segment")
var prevEnd int64
for i, seg := range res.Segments {
Expect(strings.TrimSpace(seg.Text)).ToNot(BeEmpty(),
"segment %d must have text", i)
Expect(seg.End).To(BeNumerically(">=", seg.Start),
"segment %d end must not precede its start", i)
Expect(seg.Start).To(BeNumerically(">=", prevEnd),
"segments must be in time order")
prevEnd = seg.End
// Default (no granularities) is segment-level: no per-word timings.
Expect(seg.Words).To(BeEmpty(),
"word timings are opt-in via timestamp_granularities")
}
Expect(res.Segments).To(HaveLen(1),
"synthesises a single whole-clip segment")
Expect(res.Segments[0].Text).To(Equal(res.Text),
"single segment text must equal the top-level text")
// Default (no granularities) is segment-level: no per-word timings.
Expect(res.Segments[0].Words).To(BeEmpty(),
"word timings are opt-in via timestamp_granularities")
})
It("emits word-level timestamps when granularity=word", func() {
@@ -142,61 +108,15 @@ var _ = Describe("ParakeetCpp", func() {
TimestampGranularities: []string{"word"},
})
Expect(err).ToNot(HaveOccurred())
Expect(res.Segments).ToNot(BeEmpty())
// With word granularity every segment carries its own words, and each
// segment's span tracks its first/last word; word starts advance
// monotonically across the whole transcript.
totalWords := 0
var prevStart int64 = -1
for i, seg := range res.Segments {
Expect(seg.Words).ToNot(BeEmpty(),
"segment %d must carry per-word timestamps with granularity=word", i)
Expect(seg.Start).To(Equal(seg.Words[0].Start),
"segment %d start tracks its first word", i)
Expect(seg.End).To(Equal(seg.Words[len(seg.Words)-1].End),
"segment %d end tracks its last word", i)
for _, w := range seg.Words {
Expect(w.End).To(BeNumerically(">=", w.Start))
Expect(w.Start).To(BeNumerically(">=", prevStart))
prevStart = w.Start
totalWords++
}
}
Expect(totalWords).To(BeNumerically(">", 0))
Expect(res.Segments[0].Words[0].Start).To(BeNumerically(">=", int64(0)))
})
})
Context("convertToWavMono16k", func() {
// The non-batched transcription path hands a file path to the C
// library's WAV-only audio loader, so it must convert first.
// utils.AudioToWav passes an already-16kHz/mono/16-bit WAV through
// without ffmpeg, which lets us exercise the helper (and the
// regression: the direct path used to skip conversion entirely)
// without a model, the C library, or ffmpeg.
It("returns a decodable 16kHz mono WAV copy and cleans it up", func() {
dir := GinkgoT().TempDir()
src := filepath.Join(dir, "input.wav")
writeMono16kWav(src, 16000) // 1s of silence at 16 kHz
converted, cleanup, err := convertToWavMono16k(src)
Expect(err).ToNot(HaveOccurred())
// It must produce a fresh temp file, not return the original path.
Expect(converted).ToNot(Equal(src))
Expect(converted).To(BeAnExistingFile())
pcm, _, err := decodeWavMono16k(converted)
Expect(err).ToNot(HaveOccurred())
Expect(pcm).To(HaveLen(16000), "round-trips the sample count")
cleanup()
Expect(converted).ToNot(BeAnExistingFile(), "cleanup removes the temp dir")
})
It("errors on a non-existent input rather than passing the path through", func() {
_, _, err := convertToWavMono16k(filepath.Join(GinkgoT().TempDir(), "missing.mp3"))
Expect(err).To(HaveOccurred())
Expect(res.Segments).To(HaveLen(1))
seg := res.Segments[0]
Expect(seg.Words).ToNot(BeEmpty(),
"expected per-word timestamps with granularity=word")
// Monotonic, non-negative timings spanning the segment.
Expect(seg.Words[0].Start).To(BeNumerically(">=", int64(0)))
Expect(seg.End).To(BeNumerically(">=", seg.Start))
Expect(seg.Words[len(seg.Words)-1].End).To(Equal(seg.End),
"segment end tracks the last word")
})
})

View File

@@ -65,25 +65,6 @@ func main() {
purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json")
}
// Per-request language variants (multilingual nemotron). Same probe pattern:
// present only in libparakeet.so built with multilingual support, so the
// backend still loads against an older library and falls back to the
// non-lang batched + streaming entry points (model default / "auto").
if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json_lang"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppTranscribePcmBatchJSONLang, lib, "parakeet_capi_transcribe_pcm_batch_json_lang")
}
if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_begin_lang"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppStreamBeginLang, lib, "parakeet_capi_stream_begin_lang")
}
// Streaming JSON entry points (ABI v4): surface per-word timestamps on the
// streaming path. Same probe pattern; absent in older libparakeet.so, where
// the backend falls back to the text-only streaming feed.
if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_feed_json"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppStreamFeedJSON, lib, "parakeet_capi_stream_feed_json")
purego.RegisterLibFunc(&CppStreamFinalizeJSON, lib, "parakeet_capi_stream_finalize_json")
}
fmt.Fprintf(os.Stderr, "[parakeet-cpp] ABI=%d\n", CppAbiVersion())
flag.Parse()

View File

@@ -1,127 +0,0 @@
package main
import (
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func tw(text string, start, end float64) transcriptWord {
return transcriptWord{W: text, Start: start, End: end}
}
var _ = Describe("splitWordsIntoSegments (NeMo get_segment_offsets parity)", func() {
seps := []rune{'.', '?', '!'}
It("splits on sentence-ending punctuation, including the delimiter word", func() {
words := []transcriptWord{tw("hello", 0, 0.4), tw("world.", 0.4, 0.8), tw("bye", 1.0, 1.3)}
segs := splitWordsIntoSegments(words, seps, 0)
Expect(segs).To(HaveLen(2))
Expect(segs[0]).To(HaveLen(2))
Expect(segs[0][1].W).To(Equal("world."))
Expect(segs[1]).To(HaveLen(1))
Expect(segs[1][0].W).To(Equal("bye"))
})
It("keeps a single segment with no terminal punctuation and gap off", func() {
words := []transcriptWord{tw("a", 0, 0.2), tw("b", 0.2, 0.4), tw("c", 5.0, 5.2)}
segs := splitWordsIntoSegments(words, seps, 0)
Expect(segs).To(HaveLen(1))
})
It("splits on the gap rule when enabled, the gapped word starting the next segment", func() {
words := []transcriptWord{tw("a", 0, 0.2), tw("b", 0.2, 0.4), tw("c", 5.0, 5.2)}
segs := splitWordsIntoSegments(words, seps, 1.0) // c is 4.6s after b
Expect(segs).To(HaveLen(2))
Expect(segs[0]).To(HaveLen(2)) // a b
Expect(segs[1]).To(HaveLen(1)) // c
Expect(segs[1][0].W).To(Equal("c"))
})
It("checks the gap rule before punctuation (NeMo elif order)", func() {
// "b." would terminate, but c is far after it -> gap closes [a b.] at b.
words := []transcriptWord{tw("a", 0, 0.2), tw("b.", 0.2, 0.4), tw("c", 9.0, 9.2)}
segs := splitWordsIntoSegments(words, seps, 1.0)
Expect(segs).To(HaveLen(2))
Expect(segs[0]).To(HaveLen(2))
Expect(segs[1][0].W).To(Equal("c"))
})
It("still splits on punctuation when the gap rule is enabled but does not fire", func() {
words := []transcriptWord{tw("hi.", 0, 0.4), tw("bye", 0.4, 0.8)}
segs := splitWordsIntoSegments(words, seps, 5.0) // gap never reached
Expect(segs).To(HaveLen(2))
Expect(segs[0][0].W).To(Equal("hi."))
})
It("returns nothing for empty input", func() {
Expect(splitWordsIntoSegments(nil, seps, 0)).To(BeEmpty())
})
})
var _ = Describe("transcriptResultFromDoc (multi-segment)", func() {
doc := transcriptJSON{
Text: "hello world. bye now",
FrameSec: 0.08,
Words: []transcriptWord{
{W: "hello", Start: 0.0, End: 0.4},
{W: "world.", Start: 0.4, End: 0.8},
{W: "bye", Start: 1.0, End: 1.3},
{W: "now", Start: 1.3, End: 1.6},
},
Tokens: []transcriptToken{{ID: 1, T: 0.1}, {ID: 2, T: 0.5}, {ID: 3, T: 1.1}, {ID: 4, T: 1.4}},
}
It("emits one segment per punctuation-delimited group with start/end", func() {
res := transcriptResultFromDoc(doc, &pb.TranscriptRequest{}, 0)
Expect(res.Segments).To(HaveLen(2))
Expect(res.Segments[0].Text).To(Equal("hello world."))
Expect(res.Segments[0].Start).To(Equal(int64(0)))
Expect(res.Segments[0].End).To(Equal(secondsToNanos(0.8)))
Expect(res.Segments[1].Text).To(Equal("bye now"))
Expect(res.Segments[1].Start).To(Equal(secondsToNanos(1.0)))
Expect(res.Segments[1].Id).To(Equal(int32(1)))
})
It("assigns tokens to the segment whose time window contains them", func() {
res := transcriptResultFromDoc(doc, &pb.TranscriptRequest{}, 0)
Expect(res.Segments[0].Tokens).To(Equal([]int32{1, 2}))
Expect(res.Segments[1].Tokens).To(Equal([]int32{3, 4}))
})
It("attaches per-segment words only when word granularity requested", func() {
plain := transcriptResultFromDoc(doc, &pb.TranscriptRequest{}, 0)
Expect(plain.Segments[0].Words).To(BeEmpty())
withWords := transcriptResultFromDoc(doc, &pb.TranscriptRequest{TimestampGranularities: []string{"word"}}, 0)
Expect(withWords.Segments[0].Words).To(HaveLen(2))
})
It("falls back to a single text segment when there are no words", func() {
res := transcriptResultFromDoc(transcriptJSON{Text: "hi"}, &pb.TranscriptRequest{}, 0)
Expect(res.Segments).To(HaveLen(1))
Expect(res.Segments[0].Text).To(Equal("hi"))
})
})
var _ = Describe("streaming segment assembly", func() {
It("closes a segment with start/end from its words on EOU", func() {
acc := &streamSegmenter{}
acc.add(streamFeedJSON{Text: "hello world", Eou: 1, Words: []transcriptWord{
{W: "hello", Start: 0.0, End: 0.4}, {W: "world", Start: 0.4, End: 0.9},
}})
segs := acc.segments()
Expect(segs).To(HaveLen(1))
Expect(segs[0].Text).To(Equal("hello world"))
Expect(segs[0].Start).To(Equal(int64(0)))
Expect(segs[0].End).To(Equal(secondsToNanos(0.9)))
})
It("buffers words across feeds until EOU", func() {
acc := &streamSegmenter{}
acc.add(streamFeedJSON{Text: "hi", Eou: 0, Words: []transcriptWord{{W: "hi", Start: 0, End: 0.3}}})
Expect(acc.segments()).To(BeEmpty())
acc.add(streamFeedJSON{Text: "there", Eou: 1, Words: []transcriptWord{{W: "there", Start: 0.3, End: 0.7}}})
Expect(acc.segments()).To(HaveLen(1))
Expect(acc.segments()[0].Text).To(Equal("hi there"))
})
})

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# qwen3-tts.cpp version
QWEN3TTS_REPO?=https://github.com/predict-woo/qwen3-tts.cpp
QWEN3TTS_CPP_VERSION?=136e5d36c17083da0321fd96512dc7b263f94a44
QWEN3TTS_CPP_VERSION?=7a762e2ad4bacc6fdda81d81bf10a09ffb546f29
SO_TARGET?=libgoqwen3ttscpp.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/mudler/LocalAI/pkg/grpc/base"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
@@ -22,43 +21,6 @@ type Qwen3TtsCpp struct {
threads int
}
// languageNameAliases maps common full language names to the canonical
// two-letter code understood by the C++ language_to_id table.
var languageNameAliases = map[string]string{
"english": "en",
"russian": "ru",
"chinese": "zh",
"japanese": "ja",
"korean": "ko",
"german": "de",
"french": "fr",
"spanish": "es",
"italian": "it",
"portuguese": "pt",
}
// normalizeLanguage coerces a caller-supplied language into the canonical code
// the model expects. It lowercases, trims, strips any region/locale suffix
// (en-US, en_US, ja.JP -> en/ja), and resolves common full names (english -> en).
// An empty input stays empty so the C++ side applies its English default; an
// unrecognized value is returned normalized so C++ can log it and default.
func normalizeLanguage(lang string) string {
lang = strings.ToLower(strings.TrimSpace(lang))
if lang == "" {
return ""
}
// Strip region/locale suffix: keep the segment before the first separator.
if i := strings.IndexAny(lang, "-_."); i >= 0 {
lang = lang[:i]
}
if code, ok := languageNameAliases[lang]; ok {
return code
}
return lang
}
func (q *Qwen3TtsCpp) Load(opts *pb.ModelOptions) error {
// ModelFile is the model directory path (containing GGUF files)
modelDir := opts.ModelFile
@@ -92,7 +54,7 @@ func (q *Qwen3TtsCpp) TTS(req *pb.TTSRequest) error {
dst := req.Dst
language := ""
if req.Language != nil {
language = normalizeLanguage(*req.Language)
language = *req.Language
}
// Synthesis parameters with sensible defaults

View File

@@ -1,53 +0,0 @@
package main
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestLanguageNormalization(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "qwen3-tts-cpp language normalization")
}
var _ = Describe("normalizeLanguage", func() {
DescribeTable("maps caller input to the canonical model language code",
func(input, expected string) {
Expect(normalizeLanguage(input)).To(Equal(expected))
},
// Canonical codes pass through unchanged
Entry("canonical en", "en", "en"),
Entry("canonical zh", "zh", "zh"),
Entry("canonical pt", "pt", "pt"),
// Case-insensitive
Entry("uppercase", "EN", "en"),
Entry("mixed case", "Ja", "ja"),
// Surrounding whitespace
Entry("trims whitespace", " en ", "en"),
// Region/locale stripping
Entry("BCP-47 region", "en-US", "en"),
Entry("underscore region", "en_US", "en"),
Entry("dotted locale", "ja.JP", "ja"),
Entry("region + case", "ZH-CN", "zh"),
// Full-name aliases
Entry("english name", "english", "en"),
Entry("chinese name cased", "Chinese", "zh"),
Entry("japanese name", "japanese", "ja"),
Entry("russian name", "russian", "ru"),
Entry("portuguese name", "portuguese", "pt"),
// Empty stays empty (C++ applies the English default)
Entry("empty", "", ""),
Entry("whitespace only", " ", ""),
// Unknown values pass through normalized so C++ can log + default
Entry("unknown code", "klingon", "klingon"),
Entry("unknown with region", "xx-YY", "xx"),
)
})

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=b3d56d0ba1bd437886079e339118e8e75bb79ee7
STABLEDIFFUSION_GGML_VERSION?=7948df8ac1070f5f6881b8d34675821893eb97d6
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -386,7 +386,6 @@ int load_model(const char *model, char *model_path, char* options[], int threads
const char *llm_vision_path = "";
const char *diffusion_model_path = stableDiffusionModel;
const char *high_noise_diffusion_model_path = "";
const char *uncond_diffusion_model_path = "";
const char *taesd_path = "";
const char *control_net_path = "";
const char *embedding_dir = "";
@@ -473,7 +472,6 @@ int load_model(const char *model, char *model_path, char* options[], int threads
if (!strcmp(optname, "llm_vision_path")) llm_vision_path = strdup(optval);
if (!strcmp(optname, "diffusion_model_path")) diffusion_model_path = strdup(optval);
if (!strcmp(optname, "high_noise_diffusion_model_path")) high_noise_diffusion_model_path = strdup(optval);
if (!strcmp(optname, "uncond_diffusion_model_path")) uncond_diffusion_model_path = strdup(optval);
if (!strcmp(optname, "taesd_path")) taesd_path = strdup(optval);
if (!strcmp(optname, "control_net_path")) control_net_path = strdup(optval);
if (!strcmp(optname, "embedding_dir")) {
@@ -573,7 +571,6 @@ int load_model(const char *model, char *model_path, char* options[], int threads
ctx_params.llm_vision_path = llm_vision_path;
ctx_params.diffusion_model_path = diffusion_model_path;
ctx_params.high_noise_diffusion_model_path = high_noise_diffusion_model_path;
ctx_params.uncond_diffusion_model_path = uncond_diffusion_model_path;
ctx_params.vae_path = vae_path;
ctx_params.audio_vae_path = audio_vae_path;
ctx_params.embeddings_connectors_path = embeddings_connectors_path;

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=a8ec021f2750a473ff4a8f3883bc9fdf5feafa84
WHISPER_CPP_VERSION?=23ee03506a91ac3d3f0071b40e66a430eebdfa1d
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -37,20 +37,6 @@ def is_int(s):
except ValueError:
return False
def coerce_param_value(value):
"""Coerce a TTSRequest.params value (string on the wire) to the type the
Chatterbox generate() kwargs expect (float/int/bool), matching how static
YAML options are coerced at load time. Non-string values pass through."""
if not isinstance(value, str):
return value
if is_float(value):
return float(value)
if is_int(value):
return int(value)
if value.lower() in ["true", "false"]:
return value.lower() == "true"
return value
def split_text_at_word_boundary(text, max_length=250):
"""
Split text at word boundaries without truncating words.
@@ -205,14 +191,6 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
# add options to kwargs
kwargs.update(self.options)
# Merge per-request params (TTSRequest.params), overriding the static
# YAML options. This exposes Chatterbox generation knobs (e.g.
# exaggeration, cfg_weight, temperature) per request. Values arrive as
# strings on the wire and are coerced to float/int/bool.
if hasattr(request, "params") and request.params:
for key, value in request.params.items():
kwargs[key] = coerce_param_value(value)
# Check if text exceeds 250 characters
# (chatterbox does not support long text)
# https://github.com/resemble-ai/chatterbox/issues/60

View File

@@ -47,26 +47,6 @@ def is_int(s):
return False
def coerce_param_value(value):
"""Coerce a string param value (from the TTSRequest.params map, which is
string-typed on the wire) into the most specific Python type the model
generation kwargs expect: bool, int, float, else the original string."""
if not isinstance(value, str):
return value
lowered = value.strip().lower()
if lowered in ("true", "false"):
return lowered == "true"
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
# If MAX_WORKERS are specified in the environment use it, otherwise default to 1
@@ -342,19 +322,6 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
return backend_pb2.Result(message="Model loaded successfully", success=True)
def _effective_instruct(self, request):
"""Resolve the instruction/style string for this request, preferring the
per-request TTSRequest.instructions value and falling back to the static
YAML `instruct` option. Empty string means "no instruction"."""
req_instruct = (
request.instructions
if hasattr(request, "instructions") and request.instructions
else ""
)
if req_instruct:
return req_instruct
return self.options.get("instruct", "") or ""
def _detect_mode(self, request):
"""Detect which mode to use based on request parameters."""
# Priority: VoiceClone > VoiceDesign > CustomVoice
@@ -371,8 +338,8 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
if self.audio_path or self.voices:
return "VoiceClone"
# VoiceDesign: instruct provided per-request or via YAML option
if self._effective_instruct(request):
# VoiceDesign: instruct option is provided
if "instruct" in self.options and self.options["instruct"]:
return "VoiceDesign"
# Default to CustomVoice
@@ -723,20 +690,10 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
if do_sample is not None:
generation_kwargs["do_sample"] = do_sample
# Prefer the per-request instruction (TTSRequest.instructions) over the
# static YAML `instruct` option. This lets clients set a different style
# (CustomVoice emotion) or designed voice (VoiceDesign) per request.
instruct = self._effective_instruct(request)
instruct = self.options.get("instruct", "")
if instruct is not None and instruct != "":
generation_kwargs["instruct"] = instruct
# Merge any per-request backend-specific params (TTSRequest.params).
# Values arrive as strings on the wire; coerce to int/float/bool so the
# model receives the types it expects. These override YAML-derived kwargs.
if hasattr(request, "params") and request.params:
for key, value in request.params.items():
generation_kwargs[key] = coerce_param_value(value)
# Generate audio based on mode
if mode == "VoiceClone":
# VoiceClone mode

View File

@@ -1,4 +1,4 @@
grpcio==1.81.0
grpcio==1.80.0
protobuf==7.35.0
certifi
setuptools

View File

@@ -3,5 +3,5 @@
# on a cu130 host. Pull the cu130-flavoured wheel from vLLM's per-tag index
# instead — the cublas13 case in install.sh adds --index-strategy=unsafe-best-match
# so uv consults this index alongside PyPI.
--extra-index-url https://wheels.vllm.ai/0.22.1/cu130
vllm==0.22.1
--extra-index-url https://wheels.vllm.ai/0.22.0/cu130
vllm==0.22.0

View File

@@ -1,4 +1,4 @@
grpcio==1.81.0
grpcio==1.80.0
protobuf
certifi
setuptools

View File

@@ -102,12 +102,7 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
xlog.Info("Distributed instance", "id", cfg.Distributed.InstanceID)
// Connect to NATS
natsAuth := cfg.Distributed.NatsAuthConfig()
if natsAuth.RequireAuth && (natsAuth.ServiceUserJWT == "" || natsAuth.ServiceUserSeed == "") {
return nil, fmt.Errorf("LOCALAI_NATS_REQUIRE_AUTH requires LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED")
}
natsOpts := cfg.Distributed.NatsMessagingOptions("", "")
natsClient, err := messaging.New(cfg.Distributed.NatsURL, natsOpts...)
natsClient, err := messaging.New(cfg.Distributed.NatsURL)
if err != nil {
return nil, fmt.Errorf("connecting to NATS: %w", err)
}

View File

@@ -23,9 +23,9 @@ import (
"github.com/mudler/LocalAI/core/services/routing/pii"
"github.com/mudler/LocalAI/core/services/routing/router"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/signals"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/signals"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
@@ -308,31 +308,10 @@ func New(opts ...config.AppOption) (*Application, error) {
application.galleryService.SetNATSClient(distSvc.Nats)
if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil {
// Clean up stale in-progress operations from previous crashed instances
if _, err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
xlog.Warn("Failed to clean stale gallery operations", "error", err)
}
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
// Reap stale ops periodically, not just at boot: an op orphaned by
// a replica that died mid-install (its foreground handler goroutine
// gone) would otherwise linger "processing" in the UI until the next
// restart. 30m matches the install/upgrade ceiling so a genuinely
// slow op is never reaped out from under itself.
gsvc := application.galleryService
go func() {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-options.Context.Done():
return
case <-ticker.C:
if _, err := gsvc.ReapStaleOperations(30 * time.Minute); err != nil {
xlog.Warn("Failed to reap stale gallery operations", "error", err)
}
}
}
}()
}
// Hydrate from the store first so the wildcard subscriber finds an
// already-populated statuses map for any operations still in flight

View File

@@ -214,9 +214,7 @@ func (uc *UpgradeChecker) runCheck(ctx context.Context) {
"from", info.InstalledVersion, "to", info.AvailableVersion)
var err error
if bm != nil {
// Background auto-upgrade: no live admin watching a progress bar,
// so opID is empty and the distributed path skips progress streaming.
err = bm.UpgradeBackend(ctx, "", name, nil)
err = bm.UpgradeBackend(ctx, name, nil)
} else {
err = gallery.UpgradeBackend(ctx, uc.systemState, uc.modelLoader,
uc.galleries, name, nil, uc.appConfig.RequireBackendIntegrity)

View File

@@ -123,14 +123,14 @@ var _ = Describe("X-LocalAI-Node ctx propagation contract", func() {
})
It("ModelTTS forwards the request context to the SmartRouter", func() {
_, _, err := backend.ModelTTS(reqCtx, "hello", "", "", "", nil, loader, appCfg, modelCfg)
_, _, err := backend.ModelTTS(reqCtx, "hello", "", "", loader, appCfg, modelCfg)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("router short-circuit (test)"))
stampViaRouterCtx()
})
It("ModelTTSStream forwards the request context to the SmartRouter", func() {
err := backend.ModelTTSStream(reqCtx, "hello", "", "", "", nil, loader, appCfg, modelCfg, func([]byte) error { return nil })
err := backend.ModelTTSStream(reqCtx, "hello", "", "", loader, appCfg, modelCfg, func([]byte) error { return nil })
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("router short-circuit (test)"))
stampViaRouterCtx()

View File

@@ -239,13 +239,13 @@ func grpcModelOpts(c config.ModelConfig, modelPath string) *pb.ModelOptions {
if c.Backend == "cloud-proxy" {
opts.Proxy = &pb.ProxyOptions{
UpstreamUrl: c.Proxy.UpstreamURL,
Mode: c.Proxy.Mode,
Provider: c.Proxy.Provider,
ApiKeyEnv: c.Proxy.APIKeyEnv,
ApiKeyFile: c.Proxy.APIKeyFile,
UpstreamModel: c.Proxy.UpstreamModel,
RequestTimeoutSeconds: int32(c.Proxy.RequestTimeoutSeconds),
UpstreamUrl: c.Proxy.UpstreamURL,
Mode: c.Proxy.Mode,
Provider: c.Proxy.Provider,
ApiKeyEnv: c.Proxy.APIKeyEnv,
ApiKeyFile: c.Proxy.APIKeyFile,
UpstreamModel: c.Proxy.UpstreamModel,
RequestTimeoutSeconds: int32(c.Proxy.RequestTimeoutSeconds),
}
}
@@ -323,12 +323,6 @@ func gRPCPredictOpts(c config.ModelConfig, modelPath string) *pb.PredictOptions
metadata["enable_thinking"] = "true"
}
}
// Forward the effective reasoning effort so the backend can pass it to the
// jinja chat template (chat_template_kwargs.reasoning_effort) — the lever
// models like gpt-oss / LFM2.5 actually read, distinct from enable_thinking.
if c.ReasoningEffort != "" {
metadata["reasoning_effort"] = c.ReasoningEffort
}
pbOpts.Metadata = metadata
// Logprobs and TopLogprobs are set by the caller if provided

View File

@@ -75,25 +75,3 @@ var _ = Describe("gRPCPredictOpts enable_thinking metadata", func() {
Expect(opts.Metadata).ToNot(HaveKey("enable_thinking"))
})
})
// Guards forwarding the effective reasoning_effort into PredictOptions.Metadata,
// where the backend passes it to the jinja chat template (chat_template_kwargs)
// so models like gpt-oss / LFM2.5 honor it.
var _ = Describe("gRPCPredictOpts reasoning_effort metadata", func() {
withEffort := func(effort string) config.ModelConfig {
cfg := config.ModelConfig{}
cfg.SetDefaults()
cfg.ReasoningEffort = effort
return cfg
}
It("forwards reasoning_effort when set", func() {
opts := gRPCPredictOpts(withEffort("none"), "/tmp/models")
Expect(opts.Metadata).To(HaveKeyWithValue("reasoning_effort", "none"))
})
It("omits reasoning_effort when empty", func() {
opts := gRPCPredictOpts(withEffort(""), "/tmp/models")
Expect(opts.Metadata).ToNot(HaveKey("reasoning_effort"))
})
})

View File

@@ -20,32 +20,11 @@ import (
"github.com/mudler/LocalAI/pkg/utils"
)
// newTTSRequest assembles the gRPC TTSRequest from the per-request inputs. The
// optional instructions string is only attached when non-empty so backends can
// distinguish "no per-request instruction" (fall back to YAML) from an explicit
// empty one. params is forwarded as-is (nil when unset).
func newTTSRequest(text, modelPath, voice, dst, language, instructions string, params map[string]string) *proto.TTSRequest {
req := &proto.TTSRequest{
Text: text,
Model: modelPath,
Voice: voice,
Dst: dst,
Language: &language,
Params: params,
}
if instructions != "" {
req.Instructions = &instructions
}
return req
}
func ModelTTS(
ctx context.Context,
text,
voice,
language,
instructions string,
params map[string]string,
language string,
loader *model.ModelLoader,
appConfig *config.ApplicationConfig,
modelConfig config.ModelConfig,
@@ -95,9 +74,13 @@ func ModelTTS(
startTime = time.Now()
}
ttsRequest := newTTSRequest(text, modelPath, voice, filePath, language, instructions, params)
res, err := ttsModel.TTS(ctx, ttsRequest)
res, err := ttsModel.TTS(ctx, &proto.TTSRequest{
Text: text,
Model: modelPath,
Voice: voice,
Dst: filePath,
Language: &language,
})
if appConfig.EnableTracing {
errStr := ""
@@ -145,9 +128,7 @@ func ModelTTSStream(
ctx context.Context,
text,
voice,
language,
instructions string,
params map[string]string,
language string,
loader *model.ModelLoader,
appConfig *config.ApplicationConfig,
modelConfig config.ModelConfig,
@@ -196,10 +177,12 @@ func ModelTTSStream(
var totalPCMBytes int
snippetCapped := false
// Streaming TTS writes to the HTTP response, not a file, so dst is empty.
ttsRequest := newTTSRequest(text, modelPath, voice, "", language, instructions, params)
err = ttsModel.TTSStream(ctx, ttsRequest, func(reply *proto.Reply) {
err = ttsModel.TTSStream(ctx, &proto.TTSRequest{
Text: text,
Model: modelPath,
Voice: voice,
Language: &language,
}, func(reply *proto.Reply) {
// First message contains sample rate info
if !headerSent && len(reply.Message) > 0 {
var info map[string]any

View File

@@ -1,42 +0,0 @@
package backend
// Specs for the TTSRequest assembly that carries the per-request
// instructions/params from the OpenAI `instructions` field (and the LocalAI
// `params` extension) through to the gRPC boundary. Before this plumbing the
// instruction value was dropped before reaching the backend; these specs pin
// that it now survives, and that the empty case stays backward compatible.
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("newTTSRequest", func() {
It("attaches the instructions when a per-request value is set", func() {
req := newTTSRequest("hi", "/m", "alloy", "/out.wav", "en", "cheerful narrator", nil)
Expect(req.Instructions).ToNot(BeNil())
Expect(req.GetInstructions()).To(Equal("cheerful narrator"))
Expect(req.GetText()).To(Equal("hi"))
Expect(req.GetVoice()).To(Equal("alloy"))
Expect(req.GetDst()).To(Equal("/out.wav"))
Expect(req.GetLanguage()).To(Equal("en"))
})
It("leaves instructions unset when empty so backends fall back to YAML", func() {
req := newTTSRequest("hi", "/m", "", "/out.wav", "", "", nil)
Expect(req.Instructions).To(BeNil())
Expect(req.GetInstructions()).To(Equal(""))
})
It("forwards per-request params through to the backend", func() {
params := map[string]string{"exaggeration": "0.7", "cfg_weight": "0.3"}
req := newTTSRequest("hi", "/m", "", "/out.wav", "", "", params)
Expect(req.GetParams()).To(HaveKeyWithValue("exaggeration", "0.7"))
Expect(req.GetParams()).To(HaveKeyWithValue("cfg_weight", "0.3"))
})
It("leaves params nil when none are supplied", func() {
req := newTTSRequest("hi", "/m", "", "/out.wav", "", "", nil)
Expect(req.GetParams()).To(BeNil())
})
})

View File

@@ -52,28 +52,10 @@ type AgentWorkerCMD struct {
Subject string `env:"LOCALAI_AGENT_SUBJECT" default:"agent.execute" help:"NATS subject for agent execution" group:"distributed"`
Queue string `env:"LOCALAI_AGENT_QUEUE" default:"agent-workers" help:"NATS queue group name" group:"distributed"`
NatsJWT string `env:"LOCALAI_NATS_JWT" help:"NATS user JWT override (defaults to nats_jwt from registration)" group:"distributed"`
NatsUserSeed string `env:"LOCALAI_NATS_USER_SEED" help:"NATS user seed override (defaults to nats_user_seed from registration)" group:"distributed"`
NatsServiceJWT string `env:"LOCALAI_NATS_SERVICE_JWT" help:"Fallback NATS service JWT when registration does not mint agent JWT" group:"distributed"`
NatsServiceSeed string `env:"LOCALAI_NATS_SERVICE_SEED" help:"Fallback NATS service seed paired with LOCALAI_NATS_SERVICE_JWT" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT+seed to connect" group:"distributed"`
// DistributedRequireAuth is the umbrella switch; for the agent worker (which
// has no file-transfer server) it implies NATS auth is required.
DistributedRequireAuth bool `env:"LOCALAI_DISTRIBUTED_REQUIRE_AUTH" default:"false" help:"Umbrella switch implying --nats-require-auth (agent workers have no file-transfer server)" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI)" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
// Timeouts
MCPCIJobTimeout string `env:"LOCALAI_MCP_CI_JOB_TIMEOUT" default:"10m" help:"Timeout for MCP CI job execution" group:"distributed"`
}
// natsAuthRequired reports whether NATS JWT credentials must be present — the
// granular flag or the umbrella (LOCALAI_DISTRIBUTED_REQUIRE_AUTH).
func (cmd *AgentWorkerCMD) natsAuthRequired() bool {
return cmd.NatsRequireAuth || cmd.DistributedRequireAuth
}
func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
xlog.Info("Starting agent worker", "nats", sanitize.URL(cmd.NatsURL), "register_to", cmd.RegisterTo)
@@ -99,30 +81,15 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
registrationBody["token"] = cmd.RegistrationToken
}
// Context cancelled on shutdown — used by registration waits, heartbeat, and
// other background goroutines.
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()
// Acquire credentials via (re)registration. When the bus requires auth and no
// static fallback is configured, wait through admin approval until the
// frontend mints credentials rather than starting unauthenticated.
credMgr := workerregistry.NewNATSCredentialManager(
func(ctx context.Context) (*workerregistry.RegisterResponse, error) {
return regClient.RegisterFull(ctx, registrationBody)
},
cmd.natsAuthRequired() && cmd.NatsJWT == "" && cmd.NatsServiceJWT == "",
)
res, err := credMgr.Acquire(shutdownCtx)
nodeID, apiToken, err := regClient.RegisterWithRetry(context.Background(), registrationBody, 10)
if err != nil {
return fmt.Errorf("registration failed: %w", err)
}
nodeID := res.ID
xlog.Info("Registered with frontend", "nodeID", nodeID, "frontend", cmd.RegisterTo)
// Use provisioned API token if none was set
if cmd.APIToken == "" {
cmd.APIToken = res.APIToken
cmd.APIToken = apiToken
}
// Start heartbeat
@@ -131,40 +98,14 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
xlog.Warn("invalid heartbeat interval, using default 10s", "input", cmd.HeartbeatInterval, "error", err)
}
heartbeatInterval = cmp.Or(heartbeatInterval, 10*time.Second)
// Context cancelled on shutdown — used by heartbeat and other background goroutines
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()
go regClient.HeartbeatLoop(shutdownCtx, nodeID, heartbeatInterval, func() map[string]any { return map[string]any{} })
// Resolve NATS credentials with precedence: explicit env override, then
// frontend-minted (auto-refreshed before expiry), then service fallback.
// Each static source must supply JWT and seed together.
natsTLS := messaging.TLSFiles{CA: cmd.NatsTLSCA, Cert: cmd.NatsTLSCert, Key: cmd.NatsTLSKey}
var natsOpts []messaging.Option
switch {
case cmd.NatsJWT != "" || cmd.NatsUserSeed != "":
if (cmd.NatsJWT == "") != (cmd.NatsUserSeed == "") {
return fmt.Errorf("LOCALAI_NATS_JWT and LOCALAI_NATS_USER_SEED must be set together")
}
natsOpts = append(natsOpts, messaging.WithUserJWT(cmd.NatsJWT, cmd.NatsUserSeed))
case credMgr.HasCredentials():
natsOpts = append(natsOpts, messaging.WithUserJWTProvider(credMgr.Provider()))
go func() {
if err := credMgr.RefreshLoop(shutdownCtx); err != nil {
xlog.Error("NATS credential refresh permanently failed; shutting down agent worker", "error", err)
shutdownCancel()
}
}()
case cmd.NatsServiceJWT != "" || cmd.NatsServiceSeed != "":
if (cmd.NatsServiceJWT == "") != (cmd.NatsServiceSeed == "") {
return fmt.Errorf("LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED must be set together")
}
natsOpts = append(natsOpts, messaging.WithUserJWT(cmd.NatsServiceJWT, cmd.NatsServiceSeed))
case cmd.natsAuthRequired():
return fmt.Errorf("NATS JWT+seed required: enable frontend minting or set LOCALAI_NATS_* env vars")
}
if natsTLS.Enabled() {
natsOpts = append(natsOpts, messaging.WithTLS(natsTLS))
}
natsClient, err := messaging.New(cmd.NatsURL, natsOpts...)
// Connect to NATS
natsClient, err := messaging.New(cmd.NatsURL)
if err != nil {
return fmt.Errorf("connecting to NATS: %w", err)
}
@@ -242,25 +183,17 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
xlog.Info("Agent worker ready, waiting for jobs", "subject", cmd.Subject, "queue", cmd.Queue)
// Wait for an OS signal or an internal fatal condition (e.g. NATS
// credentials became unrenewable), so the worker restarts and re-acquires
// rather than lingering unable to serve.
// Wait for shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
var runErr error
select {
case <-sigCh:
case <-shutdownCtx.Done():
runErr = fmt.Errorf("agent worker shutting down: NATS credentials unavailable")
xlog.Error("Internal shutdown requested", "error", runErr)
}
<-sigCh
xlog.Info("Shutting down agent worker")
shutdownCancel() // stop heartbeat loop immediately
dispatcher.Stop()
mcpTools.CloseAllMCPSessions()
regClient.GracefulDeregister(nodeID)
return runErr
return nil
}
// handleMCPToolRequest handles a NATS request-reply for MCP tool execution.

View File

@@ -154,21 +154,11 @@ type RunCMD struct {
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
RegistrationRequireAuth bool `env:"LOCALAI_REGISTRATION_REQUIRE_AUTH" default:"false" help:"Fail startup when distributed mode is enabled but LOCALAI_REGISTRATION_TOKEN is empty (node endpoints and worker file-transfer server would otherwise be unauthenticated)" group:"distributed"`
DistributedRequireAuth bool `env:"LOCALAI_DISTRIBUTED_REQUIRE_AUTH" default:"false" help:"Umbrella switch: require BOTH NATS JWT credentials and a registration token when distributed mode is enabled (implies --nats-require-auth and --registration-require-auth)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
DistributedPrefixCache bool `env:"LOCALAI_DISTRIBUTED_PREFIX_CACHE" default:"true" help:"Enable prefix-cache-aware routing in distributed mode (default true). When false, routing falls back to round-robin." group:"distributed"`
DistributedPrefixCacheTTL string `env:"LOCALAI_DISTRIBUTED_PREFIX_CACHE_TTL" help:"Idle-timeout for prefix-cache index entries; also drives the background eviction cadence (every TTL/2). Default 5m." group:"distributed"`
BackendInstallTimeout string `env:"LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT" help:"NATS round-trip timeout for backend.install requests sent to worker nodes (default 15m). Increase for slow links pulling multi-GB images." group:"distributed"`
BackendUpgradeTimeout string `env:"LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT" help:"NATS round-trip timeout for backend.upgrade requests (default 15m)." group:"distributed"`
NatsAccountSeed string `env:"LOCALAI_NATS_ACCOUNT_SEED" help:"NATS account signing seed (SU...) used to mint per-node worker JWTs at registration" group:"distributed"`
NatsServiceJWT string `env:"LOCALAI_NATS_SERVICE_JWT" help:"NATS user JWT for the frontend (and agent workers) to publish control-plane messages" group:"distributed"`
NatsServiceSeed string `env:"LOCALAI_NATS_SERVICE_SEED" help:"NATS user signing seed (SU...) paired with LOCALAI_NATS_SERVICE_JWT" group:"distributed"`
NatsWorkerJWTTTL string `env:"LOCALAI_NATS_WORKER_JWT_TTL" help:"Lifetime of minted per-node NATS JWTs (e.g. 24h, default 24h)" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT credentials (service JWT + account seed) when distributed mode is enabled" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI); use with tls:// in --nats-url" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
ExposeNodeHeader bool `env:"LOCALAI_EXPOSE_NODE_HEADER" default:"false" help:"Set the X-LocalAI-Node response header on inference responses (OpenAI chat/completions/embeddings, Anthropic /v1/messages, Ollama /api/chat,/api/generate,/api/embed) with the ID of the worker that served the request. Disabled by default: the node ID reveals internal topology and should not be exposed on a public endpoint. Best-effort: under heavy concurrency the header may reflect a recent routing decision rather than this exact request's." group:"distributed"`
Version bool
@@ -293,40 +283,6 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if r.RegistrationToken != "" {
opts = append(opts, config.WithRegistrationToken(r.RegistrationToken))
}
if r.RegistrationRequireAuth {
opts = append(opts, config.EnableRegistrationRequireAuth)
}
if r.DistributedRequireAuth {
opts = append(opts, config.EnableDistributedRequireAuth)
}
if r.NatsAccountSeed != "" {
opts = append(opts, config.WithNatsAccountSeed(r.NatsAccountSeed))
}
if r.NatsServiceJWT != "" {
opts = append(opts, config.WithNatsServiceJWT(r.NatsServiceJWT))
}
if r.NatsServiceSeed != "" {
opts = append(opts, config.WithNatsServiceSeed(r.NatsServiceSeed))
}
if r.NatsWorkerJWTTTL != "" {
d, err := time.ParseDuration(r.NatsWorkerJWTTTL)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_WORKER_JWT_TTL %q: %w", r.NatsWorkerJWTTTL, err)
}
opts = append(opts, config.WithNatsWorkerJWTTTL(d))
}
if r.NatsRequireAuth {
opts = append(opts, config.EnableNatsRequireAuth)
}
if r.NatsTLSCA != "" {
opts = append(opts, config.WithNatsTLSCA(r.NatsTLSCA))
}
if r.NatsTLSCert != "" {
opts = append(opts, config.WithNatsTLSCert(r.NatsTLSCert))
}
if r.NatsTLSKey != "" {
opts = append(opts, config.WithNatsTLSKey(r.NatsTLSKey))
}
if r.AutoApproveNodes {
opts = append(opts, config.EnableAutoApproveNodes)
}

View File

@@ -62,7 +62,7 @@ func (t *TTSCMD) Run(ctx *cliContext.Context) error {
options.Backend = t.Backend
options.Model = t.Model
filePath, _, err := backend.ModelTTS(context.Background(), text, t.Voice, t.Language, "", nil, ml, opts, options)
filePath, _, err := backend.ModelTTS(context.Background(), text, t.Voice, t.Language, ml, opts, options)
if err != nil {
return err
}

View File

@@ -96,7 +96,7 @@ func (r *VLLMDistributed) Run(ctx *cliContext.Context) error {
FrontendURL: r.RegisterTo,
RegistrationToken: r.RegistrationToken,
}
nodeID, _, _, _, regErr := regClient.RegisterWithRetry(context.Background(), r.registrationBody(), 10)
nodeID, _, regErr := regClient.RegisterWithRetry(context.Background(), r.registrationBody(), 10)
if regErr != nil {
return fmt.Errorf("registering with frontend: %w", regErr)
}

View File

@@ -58,77 +58,65 @@ func (c *RegistrationClient) setAuth(req *http.Request) {
// RegisterResponse is the JSON body returned by /api/node/register.
type RegisterResponse struct {
ID string `json:"id"`
Status string `json:"status,omitempty"` // "pending" until an admin approves the node
APIToken string `json:"api_token,omitempty"`
NatsJWT string `json:"nats_jwt,omitempty"`
NatsUserSeed string `json:"nats_user_seed,omitempty"`
ID string `json:"id"`
APIToken string `json:"api_token,omitempty"`
}
// RegisterFull sends a single registration request and returns the full
// response (node ID, approval status, and optional API token / NATS creds).
// Re-registration is idempotent: the frontend preserves the node row and mints
// a fresh NATS JWT each call, so this doubles as the credential-refresh call.
func (c *RegistrationClient) RegisterFull(ctx context.Context, body map[string]any) (*RegisterResponse, error) {
// Register sends a single registration request and returns the node ID and
// (optionally) an auto-provisioned API token.
func (c *RegistrationClient) Register(ctx context.Context, body map[string]any) (string, string, error) {
jsonBody, _ := json.Marshal(body)
url := c.baseURL() + "/api/node/register"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
return "", "", fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
c.setAuth(req)
resp, err := c.httpClient().Do(req)
if err != nil {
return nil, fmt.Errorf("posting to %s: %w", url, err)
return "", "", fmt.Errorf("posting to %s: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("registration failed with status %d", resp.StatusCode)
return "", "", fmt.Errorf("registration failed with status %d", resp.StatusCode)
}
var result RegisterResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decoding response: %w", err)
return "", "", fmt.Errorf("decoding response: %w", err)
}
return &result, nil
}
// Register sends a single registration request and returns the node ID and
// optional credentials (API token for agent workers, NATS JWT when configured).
func (c *RegistrationClient) Register(ctx context.Context, body map[string]any) (nodeID, apiToken, natsJWT, natsSeed string, err error) {
res, err := c.RegisterFull(ctx, body)
if err != nil {
return "", "", "", "", err
}
return res.ID, res.APIToken, res.NatsJWT, res.NatsUserSeed, nil
return result.ID, result.APIToken, nil
}
// RegisterWithRetry retries registration with exponential backoff.
func (c *RegistrationClient) RegisterWithRetry(ctx context.Context, body map[string]any, maxRetries int) (nodeID, apiToken, natsJWT, natsSeed string, err error) {
func (c *RegistrationClient) RegisterWithRetry(ctx context.Context, body map[string]any, maxRetries int) (string, string, error) {
backoff := 2 * time.Second
maxBackoff := 30 * time.Second
var nodeID, apiToken string
var err error
for attempt := 1; attempt <= maxRetries; attempt++ {
nodeID, apiToken, natsJWT, natsSeed, err = c.Register(ctx, body)
nodeID, apiToken, err = c.Register(ctx, body)
if err == nil {
return nodeID, apiToken, natsJWT, natsSeed, nil
return nodeID, apiToken, nil
}
if attempt == maxRetries {
return "", "", "", "", fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
return "", "", fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}
xlog.Warn("Registration failed, retrying", "attempt", attempt, "next_retry", backoff, "error", err)
select {
case <-ctx.Done():
return "", "", "", "", ctx.Err()
return "", "", ctx.Err()
case <-time.After(backoff):
}
backoff = min(backoff*2, maxBackoff)
}
return nodeID, apiToken, natsJWT, natsSeed, err
return nodeID, apiToken, err
}
// Heartbeat sends a single heartbeat POST with the given body.

View File

@@ -1,200 +0,0 @@
package workerregistry
import (
"context"
"fmt"
"sync"
"time"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/mudler/xlog"
)
// statusPending mirrors nodes.StatusPending. It is duplicated rather than
// imported so the lightweight registration client does not pull in the nodes
// package (and its gorm/DB dependencies).
const statusPending = "pending"
// defaultMaxAttempts bounds how many times Acquire registers (and how many
// consecutive times RefreshLoop may fail) before giving up. It is high enough
// to ride out a slow admin approval or a transient frontend outage, but finite
// so an unauthorized/unapprovable worker exits and surfaces the problem (via a
// non-zero exit and the resulting restart) rather than waiting forever.
const defaultMaxAttempts = 100
// RegisterFunc performs one idempotent registration round-trip.
type RegisterFunc func(ctx context.Context) (*RegisterResponse, error)
// NATSCredentialManager acquires NATS credentials at startup — waiting through
// admin approval when required — and refreshes them before the minted JWT
// expires, by re-registering (which mints a fresh JWT). The live NATS
// connection adopts a refreshed JWT on its next reconnect via Provider. Safe
// for concurrent use.
//
// It addresses two failure modes: a worker that needs credentials but registers
// while still pending approval (it would otherwise give up and never connect),
// and a long-running worker whose 24h JWT expires with no way to renew it.
type NATSCredentialManager struct {
register RegisterFunc
requireCreds bool // block until credentials are present (frontend minting in use)
// Tunables; defaults set by NewNATSCredentialManager, overridable in tests.
initialBackoff time.Duration
maxBackoff time.Duration
maxAttempts int // bound on Acquire attempts / consecutive refresh failures (<=0 = unlimited)
refreshLead float64 // refresh once this fraction of the JWT lifetime has elapsed
refreshRetry time.Duration
expiryOf func(jwt string) (time.Time, bool)
mu sync.RWMutex
jwt string
seed string
nodeID string
}
// NewNATSCredentialManager builds a manager over register. When requireCreds is
// true, Acquire blocks until the node is approved and credentials are minted.
func NewNATSCredentialManager(register RegisterFunc, requireCreds bool) *NATSCredentialManager {
return &NATSCredentialManager{
register: register,
requireCreds: requireCreds,
initialBackoff: 2 * time.Second,
maxBackoff: 30 * time.Second,
maxAttempts: defaultMaxAttempts,
refreshLead: 0.75,
refreshRetry: 30 * time.Second,
expiryOf: jwtExpiry,
}
}
// jwtExpiry decodes the expiry of a minted user JWT. ok is false when the token
// is empty/undecodable or carries no expiry (e.g. a non-expiring service JWT).
func jwtExpiry(token string) (time.Time, bool) {
if token == "" {
return time.Time{}, false
}
uc, err := natsauth.DecodeUserClaims(token)
if err != nil || uc.Expires == 0 {
return time.Time{}, false
}
return time.Unix(uc.Expires, 0), true
}
func (m *NATSCredentialManager) store(res *RegisterResponse) {
m.mu.Lock()
defer m.mu.Unlock()
m.nodeID = res.ID
if res.NatsJWT != "" && res.NatsUserSeed != "" {
m.jwt, m.seed = res.NatsJWT, res.NatsUserSeed
}
}
// Current returns the latest NATS credentials (both empty until acquired).
func (m *NATSCredentialManager) Current() (jwt, seed string) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.jwt, m.seed
}
// NodeID returns the node ID from the most recent registration.
func (m *NATSCredentialManager) NodeID() string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.nodeID
}
// Provider returns a callback compatible with messaging.WithUserJWTProvider,
// supplying the current credentials on each (re)connect.
func (m *NATSCredentialManager) Provider() func() (string, string) {
return m.Current
}
// HasCredentials reports whether complete NATS credentials have been obtained.
func (m *NATSCredentialManager) HasCredentials() bool {
jwt, seed := m.Current()
return jwt != "" && seed != ""
}
// Acquire registers and, when requireCreds is set, keeps re-registering with
// exponential backoff until the node is approved (status != pending) and
// credentials are minted. Without requireCreds it returns the first successful
// response (the historical one-shot behavior, preserved for anonymous NATS).
func (m *NATSCredentialManager) Acquire(ctx context.Context) (*RegisterResponse, error) {
backoff := m.initialBackoff
var lastReason error
for attempt := 1; m.maxAttempts <= 0 || attempt <= m.maxAttempts; attempt++ {
res, err := m.register(ctx)
switch {
case err != nil:
lastReason = err
xlog.Warn("Registration failed, retrying", "attempt", attempt, "next_retry", backoff, "error", err)
case !m.requireCreds:
m.store(res)
return res, nil
case res.Status == statusPending:
lastReason = fmt.Errorf("node %s still pending admin approval", res.ID)
xlog.Info("Node pending admin approval; waiting", "node", res.ID, "attempt", attempt, "next_retry", backoff)
case res.NatsJWT == "" || res.NatsUserSeed == "":
lastReason = fmt.Errorf("node %s approved but NATS credentials not minted", res.ID)
xlog.Info("Node approved but NATS credentials not yet minted; waiting", "node", res.ID, "attempt", attempt, "next_retry", backoff)
default:
m.store(res)
return res, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(backoff):
}
backoff = min(backoff*2, m.maxBackoff)
}
return nil, fmt.Errorf("giving up acquiring NATS credentials after %d attempts: %w", m.maxAttempts, lastReason)
}
// RefreshLoop re-registers to mint a fresh JWT before the current one expires,
// updating the credentials returned by Current/Provider so the NATS connection
// adopts them on its next reconnect. It returns nil when ctx is cancelled or
// when the current credential has no expiry (nothing to refresh), and a non-nil
// error after maxAttempts consecutive refresh failures — letting the caller
// exit the worker so it restarts and re-acquires (or surfaces the outage)
// rather than silently drifting toward an expired, unrenewable JWT.
func (m *NATSCredentialManager) RefreshLoop(ctx context.Context) error {
failures := 0
for {
jwt, _ := m.Current()
exp, ok := m.expiryOf(jwt)
if !ok {
xlog.Debug("NATS credential has no expiry; refresh loop exiting")
return nil
}
wait := max(time.Duration(float64(time.Until(exp))*m.refreshLead), 0)
select {
case <-ctx.Done():
return nil
case <-time.After(wait):
}
res, err := m.register(ctx)
if err == nil && res.NatsJWT != "" && res.NatsUserSeed != "" {
m.store(res)
failures = 0
xlog.Info("Refreshed NATS credentials", "node", res.ID)
continue
}
failures++
if err != nil {
xlog.Warn("NATS credential refresh failed; will retry", "attempt", failures, "error", err)
} else {
xlog.Warn("NATS credential refresh returned no credentials; will retry", "attempt", failures)
}
if m.maxAttempts > 0 && failures >= m.maxAttempts {
return fmt.Errorf("NATS credential refresh failed %d times in a row", failures)
}
// Back off before retrying so a persistent failure near expiry does not spin.
select {
case <-ctx.Done():
return nil
case <-time.After(m.refreshRetry):
}
}
}

View File

@@ -1,198 +0,0 @@
package workerregistry
import (
"context"
"sync"
"testing"
"time"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/nats-io/nkeys"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestWorkerRegistry(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "WorkerRegistry")
}
// fakeRegister returns a sequence of canned responses/errors, one per call, and
// records how many times it was invoked. The last entry repeats once exhausted.
type fakeRegister struct {
mu sync.Mutex
steps []step
calls int
}
type step struct {
res *RegisterResponse
err error
}
func (f *fakeRegister) fn() RegisterFunc {
return func(context.Context) (*RegisterResponse, error) {
f.mu.Lock()
defer f.mu.Unlock()
i := f.calls
f.calls++
if i >= len(f.steps) {
i = len(f.steps) - 1
}
return f.steps[i].res, f.steps[i].err
}
}
func (f *fakeRegister) count() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.calls
}
var _ = Describe("NATSCredentialManager", func() {
approved := func(jwt, seed string) *RegisterResponse {
return &RegisterResponse{ID: "node-1", Status: "healthy", NatsJWT: jwt, NatsUserSeed: seed}
}
pending := &RegisterResponse{ID: "node-1", Status: "pending"}
Describe("Acquire (#4 — wait through admin approval)", func() {
It("keeps re-registering until the node is approved and credentials are minted", func() {
f := &fakeRegister{steps: []step{
{res: pending}, // not approved yet
{res: approved("", "")}, // approved but JWT not minted yet
{res: approved("jwt-1", "seed-1")}, // finally minted
}}
m := NewNATSCredentialManager(f.fn(), true /* requireCreds */)
m.initialBackoff = time.Millisecond
m.maxBackoff = time.Millisecond
res, err := m.Acquire(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(res.ID).To(Equal("node-1"))
Expect(f.count()).To(Equal(3))
jwt, seed := m.Current()
Expect(jwt).To(Equal("jwt-1"))
Expect(seed).To(Equal("seed-1"))
Expect(m.HasCredentials()).To(BeTrue())
Expect(m.NodeID()).To(Equal("node-1"))
})
It("returns immediately on the first success when credentials are not required (anonymous NATS)", func() {
f := &fakeRegister{steps: []step{{res: pending}}}
m := NewNATSCredentialManager(f.fn(), false /* requireCreds */)
res, err := m.Acquire(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(res.Status).To(Equal("pending"))
Expect(f.count()).To(Equal(1))
Expect(m.HasCredentials()).To(BeFalse())
})
It("aborts when the context is cancelled while waiting for approval", func() {
f := &fakeRegister{steps: []step{{res: pending}}}
m := NewNATSCredentialManager(f.fn(), true)
m.initialBackoff = 10 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := m.Acquire(ctx)
Expect(err).To(MatchError(context.Canceled))
})
It("gives up after a bounded number of attempts so the worker exits and alerts", func() {
f := &fakeRegister{steps: []step{{res: pending}}} // never approved
m := NewNATSCredentialManager(f.fn(), true)
m.initialBackoff = time.Millisecond
m.maxBackoff = time.Millisecond
m.maxAttempts = 5
_, err := m.Acquire(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("after 5 attempts"))
Expect(err.Error()).To(ContainSubstring("pending admin approval"))
Expect(f.count()).To(Equal(5))
})
})
Describe("RefreshLoop (#5 — renew before the JWT expires)", func() {
It("re-registers before expiry and updates the credentials served to new connections", func() {
f := &fakeRegister{steps: []step{{res: approved("jwt-2", "seed-2")}}}
m := NewNATSCredentialManager(f.fn(), true)
m.refreshLead = 0.5
m.refreshRetry = time.Millisecond
// jwt-1 expires soon; jwt-2 is long-lived so the loop then idles.
m.expiryOf = func(jwt string) (time.Time, bool) {
switch jwt {
case "jwt-1":
return time.Now().Add(40 * time.Millisecond), true
case "jwt-2":
return time.Now().Add(time.Hour), true
default:
return time.Time{}, false
}
}
m.store(approved("jwt-1", "seed-1"))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = m.RefreshLoop(ctx) }()
Eventually(func() string {
jwt, _ := m.Current()
return jwt
}, "2s", "10ms").Should(Equal("jwt-2"))
})
It("returns an error after the bounded number of consecutive failures so the caller can exit", func() {
f := &fakeRegister{steps: []step{{err: context.DeadlineExceeded}}} // refresh always fails
m := NewNATSCredentialManager(f.fn(), true)
m.refreshLead = 0.5
m.refreshRetry = time.Millisecond
m.maxAttempts = 3
m.expiryOf = func(string) (time.Time, bool) { return time.Now().Add(time.Millisecond), true }
m.store(approved("jwt-1", "seed-1"))
errCh := make(chan error, 1)
go func() { errCh <- m.RefreshLoop(context.Background()) }()
Eventually(errCh, "2s").Should(Receive(MatchError(ContainSubstring("3 times in a row"))))
})
It("exits promptly when the current credential has no expiry (nothing to refresh)", func() {
f := &fakeRegister{steps: []step{{res: approved("x", "y")}}}
m := NewNATSCredentialManager(f.fn(), true)
m.expiryOf = func(string) (time.Time, bool) { return time.Time{}, false }
m.store(approved("static", "seed"))
done := make(chan struct{})
go func() { _ = m.RefreshLoop(context.Background()); close(done) }()
Eventually(done, "1s").Should(BeClosed())
Expect(f.count()).To(Equal(0)) // never tried to re-register
})
})
Describe("jwtExpiry default", func() {
It("decodes the expiry of a real minted worker JWT", func() {
akp, err := nkeys.CreateAccount()
Expect(err).ToNot(HaveOccurred())
seed, err := akp.Seed()
Expect(err).ToNot(HaveOccurred())
cfg := natsauth.Config{AccountSeed: string(seed), WorkerJWTTTL: time.Hour}
token, _, err := cfg.MintWorkerJWT("node-1", "backend")
Expect(err).ToNot(HaveOccurred())
exp, ok := jwtExpiry(token)
Expect(ok).To(BeTrue())
Expect(exp).To(BeTemporally("~", time.Now().Add(time.Hour), 2*time.Minute))
})
It("reports no expiry for an empty or undecodable token", func() {
_, ok := jwtExpiry("")
Expect(ok).To(BeFalse())
_, ok = jwtExpiry("not-a-jwt")
Expect(ok).To(BeFalse())
})
})
})

View File

@@ -22,11 +22,9 @@ const (
UsecaseRerank = "rerank"
UsecaseDetection = "detection"
UsecaseVAD = "vad"
UsecaseAudioTransform = "audio_transform"
UsecaseDiarization = "diarization"
UsecaseRealtimeAudio = "realtime_audio"
UsecaseFaceRecognition = "face_recognition"
UsecaseSpeakerRecognition = "speaker_recognition"
UsecaseAudioTransform = "audio_transform"
UsecaseDiarization = "diarization"
UsecaseRealtimeAudio = "realtime_audio"
)
// GRPCMethod identifies a Backend service RPC from backend.proto.
@@ -49,11 +47,6 @@ const (
MethodAudioTransform GRPCMethod = "AudioTransform"
MethodDiarize GRPCMethod = "Diarize"
MethodAudioToAudioStream GRPCMethod = "AudioToAudioStream"
MethodFaceVerify GRPCMethod = "FaceVerify"
MethodFaceAnalyze GRPCMethod = "FaceAnalyze"
MethodVoiceVerify GRPCMethod = "VoiceVerify"
MethodVoiceEmbed GRPCMethod = "VoiceEmbed"
MethodVoiceAnalyze GRPCMethod = "VoiceAnalyze"
)
// UsecaseInfo describes a single known_usecase value and how it maps
@@ -161,16 +154,6 @@ var UsecaseInfoMap = map[string]UsecaseInfo{
GRPCMethod: MethodAudioToAudioStream,
Description: "Self-contained any-to-any audio model for the Realtime API — accepts microphone audio and emits speech + transcript (+ optional function calls) from a single backend via the AudioToAudioStream RPC.",
},
UsecaseFaceRecognition: {
Flag: FLAG_FACE_RECOGNITION,
GRPCMethod: MethodFaceVerify,
Description: "Face recognition — verify identity, analyze attributes (age/gender/emotion) via FaceVerify and FaceAnalyze RPCs.",
},
UsecaseSpeakerRecognition: {
Flag: FLAG_SPEAKER_RECOGNITION,
GRPCMethod: MethodVoiceVerify,
Description: "Speaker recognition — verify identity, embed and analyze voice via VoiceVerify, VoiceEmbed and VoiceAnalyze RPCs.",
},
}
// BackendCapability describes which gRPC methods and usecases a backend supports.
@@ -488,21 +471,6 @@ var BackendCapabilities = map[string]BackendCapability{
DefaultUsecases: []string{UsecaseDetection},
Description: "RF-DETR C++ object detection",
},
// --- Face and speaker recognition backends ---
"insightface": {
GRPCMethods: []GRPCMethod{MethodEmbedding, MethodDetect, MethodFaceVerify, MethodFaceAnalyze},
PossibleUsecases: []string{UsecaseEmbeddings, UsecaseDetection, UsecaseFaceRecognition},
DefaultUsecases: []string{UsecaseFaceRecognition},
AcceptsImages: true,
Description: "InsightFace — face detection, embedding, verification and attribute analysis",
},
"speaker-recognition": {
GRPCMethods: []GRPCMethod{MethodVoiceVerify, MethodVoiceEmbed, MethodVoiceAnalyze},
PossibleUsecases: []string{UsecaseSpeakerRecognition},
DefaultUsecases: []string{UsecaseSpeakerRecognition},
Description: "Speaker recognition — voice identity verification and analysis",
},
"silero-vad": {
GRPCMethods: []GRPCMethod{MethodVAD},
PossibleUsecases: []string{UsecaseVAD},

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/mudler/xlog"
)
@@ -18,29 +16,7 @@ type DistributedConfig struct {
NatsURL string // --nats-url / LOCALAI_NATS_URL
StorageURL string // --storage-url / LOCALAI_STORAGE_URL (S3 endpoint)
RegistrationToken string // --registration-token / LOCALAI_REGISTRATION_TOKEN (required token for node registration)
// RegistrationRequireAuth fails startup when distributed mode is enabled but
// RegistrationToken is empty. The default (false) keeps the historical
// fail-open behavior with a loud warning; production should set it so the
// node-register endpoints and the worker file-transfer server cannot run
// unauthenticated. Mirrors NatsRequireAuth for the NATS bus.
RegistrationRequireAuth bool // LOCALAI_REGISTRATION_REQUIRE_AUTH
// RequireAuth is the umbrella switch (LOCALAI_DISTRIBUTED_REQUIRE_AUTH) for
// distributed-mode auth: when true it implies BOTH NatsRequireAuth and
// RegistrationRequireAuth, so a single knob locks down the bus and the
// registration/file-transfer layer together. The granular flags remain
// available to enforce just one layer.
RequireAuth bool // LOCALAI_DISTRIBUTED_REQUIRE_AUTH
AutoApproveNodes bool // --auto-approve-nodes / LOCALAI_AUTO_APPROVE_NODES (skip admin approval for new workers)
// NATS JWT auth (optional; see pkg/natsauth and docs/features/distributed-mode.md)
NatsAccountSeed string // LOCALAI_NATS_ACCOUNT_SEED — account signing seed to mint per-node worker JWTs
NatsServiceJWT string // LOCALAI_NATS_SERVICE_JWT — user JWT for frontends / agent workers
NatsServiceSeed string // LOCALAI_NATS_SERVICE_SEED — signing seed paired with service JWT
NatsWorkerJWTTTL time.Duration // LOCALAI_NATS_WORKER_JWT_TTL — minted worker JWT lifetime (default 24h)
NatsRequireAuth bool // LOCALAI_NATS_REQUIRE_AUTH — fail startup if NATS credentials are missing
NatsTLSCA string // LOCALAI_NATS_TLS_CA — PEM file for private CA (server verify)
NatsTLSCert string // LOCALAI_NATS_TLS_CERT — client cert for NATS mTLS
NatsTLSKey string // LOCALAI_NATS_TLS_KEY — client key paired with NatsTLSCert
AutoApproveNodes bool // --auto-approve-nodes / LOCALAI_AUTO_APPROVE_NODES (skip admin approval for new workers)
// S3 configuration (used when StorageURL is set)
StorageBucket string // --storage-bucket / LOCALAI_STORAGE_BUCKET
@@ -100,23 +76,10 @@ func (c DistributedConfig) Validate() error {
(c.StorageAccessKey == "" && c.StorageSecretKey != "") {
return fmt.Errorf("storage-access-key and storage-secret-key must both be set or both empty")
}
// The registration token guards both the node HTTP register/heartbeat
// endpoints and the worker file-transfer server (which fails open on an
// empty token). Enforce it when registration auth is required (the granular
// flag or the umbrella); otherwise warn.
// Warn about missing registration token (not an error)
if c.RegistrationToken == "" {
if c.RegistrationAuthRequired() {
return fmt.Errorf("registration auth is required (LOCALAI_REGISTRATION_REQUIRE_AUTH or LOCALAI_DISTRIBUTED_REQUIRE_AUTH) but LOCALAI_REGISTRATION_TOKEN is empty")
}
xlog.Warn("distributed mode running without registration token — node endpoints and the worker file-transfer server are unprotected; set LOCALAI_REGISTRATION_TOKEN, or LOCALAI_DISTRIBUTED_REQUIRE_AUTH=true to fail closed")
xlog.Warn("distributed mode running without registration token — node endpoints are unprotected")
}
if err := c.NatsAuthConfig().Validate(); err != nil {
return err
}
if err := c.NatsTLSFiles().Validate(); err != nil {
return err
}
c.NatsAuthConfig().WarnIfInsecure(true)
// Check for negative durations
for name, d := range map[string]time.Duration{
FlagMCPToolTimeout: c.MCPToolTimeout,
@@ -160,76 +123,6 @@ func WithRegistrationToken(token string) AppOption {
}
}
func WithNatsAccountSeed(seed string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsAccountSeed = seed
}
}
func WithNatsServiceJWT(jwt string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsServiceJWT = jwt
}
}
func WithNatsServiceSeed(seed string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsServiceSeed = seed
}
}
func WithNatsWorkerJWTTTL(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsWorkerJWTTTL = d
}
}
var EnableNatsRequireAuth = func(o *ApplicationConfig) {
o.Distributed.NatsRequireAuth = true
}
// EnableRegistrationRequireAuth makes an empty registration token a hard error
// in distributed mode (see DistributedConfig.RegistrationRequireAuth).
var EnableRegistrationRequireAuth = func(o *ApplicationConfig) {
o.Distributed.RegistrationRequireAuth = true
}
// EnableDistributedRequireAuth is the umbrella switch implying both
// NatsRequireAuth and RegistrationRequireAuth (see DistributedConfig.RequireAuth).
var EnableDistributedRequireAuth = func(o *ApplicationConfig) {
o.Distributed.RequireAuth = true
}
// RegistrationAuthRequired reports whether an empty registration token must be
// treated as a fatal misconfiguration — the granular flag or the umbrella.
func (c DistributedConfig) RegistrationAuthRequired() bool {
return c.RegistrationRequireAuth || c.RequireAuth
}
// NatsAuthRequired reports whether NATS JWT credentials must be present — the
// granular flag or the umbrella.
func (c DistributedConfig) NatsAuthRequired() bool {
return c.NatsRequireAuth || c.RequireAuth
}
func WithNatsTLSCA(path string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsTLSCA = path
}
}
func WithNatsTLSCert(path string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsTLSCert = path
}
}
func WithNatsTLSKey(path string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsTLSKey = path
}
}
func WithStorageURL(url string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.StorageURL = url
@@ -324,44 +217,6 @@ const (
// DefaultMaxUploadSize is the default maximum upload body size (50 GB).
const DefaultMaxUploadSize int64 = 50 << 30
// NatsTLSFiles returns NATS TLS/mTLS PEM paths for the messaging client.
func (c DistributedConfig) NatsTLSFiles() messaging.TLSFiles {
return messaging.TLSFiles{
CA: c.NatsTLSCA,
Cert: c.NatsTLSCert,
Key: c.NatsTLSKey,
}
}
// NatsMessagingOptions builds messaging client options (JWT + TLS) for distributed components.
// Pass explicit userJWT/userSeed when set (e.g. worker overrides); empty uses service JWT from config.
func (c DistributedConfig) NatsMessagingOptions(userJWT, userSeed string) []messaging.Option {
var opts []messaging.Option
jwt, seed := userJWT, userSeed
if jwt == "" && seed == "" {
auth := c.NatsAuthConfig()
jwt, seed = auth.ServiceUserJWT, auth.ServiceUserSeed
}
if jwt != "" && seed != "" {
opts = append(opts, messaging.WithUserJWT(jwt, seed))
}
if tls := c.NatsTLSFiles(); tls.Enabled() {
opts = append(opts, messaging.WithTLS(tls))
}
return opts
}
// NatsAuthConfig builds pkg/natsauth settings from distributed configuration.
func (c DistributedConfig) NatsAuthConfig() natsauth.Config {
return natsauth.Config{
AccountSeed: c.NatsAccountSeed,
ServiceUserJWT: c.NatsServiceJWT,
ServiceUserSeed: c.NatsServiceSeed,
WorkerJWTTTL: c.NatsWorkerJWTTTL,
RequireAuth: c.NatsAuthRequired(),
}
}
// BackendInstallTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendInstallTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendInstallTimeout, DefaultBackendInstallTimeout)

View File

@@ -88,66 +88,3 @@ var _ = Describe("DistributedConfig.Validate negative-duration errors", func() {
Expect(c.Validate()).To(Succeed())
})
})
var _ = Describe("DistributedConfig.Validate registration auth", func() {
It("rejects an empty registration token when RequireAuth is set", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RegistrationRequireAuth: true,
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("LOCALAI_REGISTRATION_REQUIRE_AUTH"))
Expect(err.Error()).To(ContainSubstring("LOCALAI_REGISTRATION_TOKEN"))
})
It("accepts a set registration token when RequireAuth is set", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RegistrationToken: "s3cret",
RegistrationRequireAuth: true,
}
Expect(c.Validate()).To(Succeed())
})
It("warns but succeeds with an empty token when RequireAuth is unset", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
}
Expect(c.Validate()).To(Succeed())
})
It("rejects an empty token when the umbrella RequireAuth is set", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RequireAuth: true,
// Provide NATS creds so only the registration-token gap remains.
NatsServiceJWT: "jwt",
NatsServiceSeed: "seed",
NatsAccountSeed: "acct",
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("LOCALAI_DISTRIBUTED_REQUIRE_AUTH"))
Expect(err.Error()).To(ContainSubstring("LOCALAI_REGISTRATION_TOKEN"))
})
It("the umbrella implies NATS auth is required", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RegistrationToken: "tok", // registration layer satisfied
RequireAuth: true, // umbrella → NATS creds now required
}
Expect(c.NatsAuthRequired()).To(BeTrue())
Expect(c.RegistrationAuthRequired()).To(BeTrue())
// Missing NATS service JWT/seed must now be fatal.
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("LOCALAI_NATS_REQUIRE_AUTH"))
})
})

View File

@@ -39,21 +39,7 @@ func llamaCppDefaults(cfg *ModelConfig, modelPath string) {
}
}()
// Startup parses every model's GGUF header to guess defaults. We only need
// scalar metadata (architecture, head/ff counts, chat_template, token IDs,
// MTP head) plus array *lengths* — never the array *contents*. Two options
// keep this cheap, which matters when many models live on slow storage such
// as a Docker volume (see https://github.com/mudler/LocalAI/issues/9790):
//
// - SkipLargeMetadata: seek past large array-valued metadata (the tokenizer
// vocab: tokenizer.ggml.tokens/scores/merges, often >100k entries) instead
// of reading and allocating every element. Lengths stay populated.
// - UseMMap: read the header via a memory map so faulting in a few pages
// replaces hundreds of thousands of tiny read() syscalls (measured ~524k
// -> 8 for a 256k-token vocab), the dominant cost on slow filesystems.
//
// The mapping is released when ParseGGUFFile returns.
f, err := gguf.ParseGGUFFile(guessPath, gguf.UseMMap(), gguf.SkipLargeMetadata())
f, err := gguf.ParseGGUFFile(guessPath)
if err == nil {
guessGGUFFromFile(cfg, f, 0)
}

View File

@@ -1,76 +1,13 @@
package config_test
import (
"bytes"
"encoding/binary"
"os"
"path/filepath"
. "github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/schema"
gguf "github.com/gpustack/gguf-parser-go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// GGUF metadata value type tags (see github.com/gpustack/gguf-parser-go).
const (
ggufTypeUint32 uint32 = 4
ggufTypeString uint32 = 8
ggufTypeArray uint32 = 9
)
// writeTestGGUF emits a minimal but valid little-endian GGUF v3 header carrying
// the scalar metadata the llama-cpp hook guesses from plus a large string vocab
// array (tokenizer.ggml.tokens). The big array is exactly what SkipLargeMetadata
// + UseMMap are expected to avoid reading element-by-element, so it must survive a
// round-trip through the real hook without corrupting the guessed defaults.
func writeTestGGUF(path, chatTemplate string, vocab int) error {
wStr := func(b *bytes.Buffer, s string) {
binary.Write(b, binary.LittleEndian, uint64(len(s)))
b.WriteString(s)
}
kvStr := func(b *bytes.Buffer, k, v string) {
wStr(b, k)
binary.Write(b, binary.LittleEndian, ggufTypeString)
wStr(b, v)
}
kvU32 := func(b *bytes.Buffer, k string, v uint32) {
wStr(b, k)
binary.Write(b, binary.LittleEndian, ggufTypeUint32)
binary.Write(b, binary.LittleEndian, v)
}
var meta bytes.Buffer
kvStr(&meta, "general.architecture", "llama")
kvStr(&meta, "general.name", "ReproModel")
kvU32(&meta, "llama.context_length", 4096)
kvU32(&meta, "llama.attention.head_count", 32)
kvU32(&meta, "llama.feed_forward_length", 11008)
kvU32(&meta, "llama.block_count", 32)
kvU32(&meta, "tokenizer.ggml.bos_token_id", 1)
kvStr(&meta, "tokenizer.chat_template", chatTemplate)
// large array value — the one the optimization skips reading
wStr(&meta, "tokenizer.ggml.tokens")
binary.Write(&meta, binary.LittleEndian, ggufTypeArray)
binary.Write(&meta, binary.LittleEndian, ggufTypeString)
binary.Write(&meta, binary.LittleEndian, uint64(vocab))
for i := 0; i < vocab; i++ {
wStr(&meta, "token")
}
var out bytes.Buffer
binary.Write(&out, binary.LittleEndian, gguf.GGUFMagicGGUFLe)
binary.Write(&out, binary.LittleEndian, uint32(3)) // version
binary.Write(&out, binary.LittleEndian, uint64(0)) // tensor count
binary.Write(&out, binary.LittleEndian, uint64(9)) // metadata kv count
out.Write(meta.Bytes())
return os.WriteFile(path, out.Bytes(), 0o644)
}
var _ = Describe("Backend hooks and parser defaults", func() {
Context("MatchParserDefaults", func() {
It("matches Qwen3 family", func() {
@@ -200,58 +137,6 @@ var _ = Describe("Backend hooks and parser defaults", func() {
})
})
Context("llamaCppDefaults GGUF guessing", func() {
// Regression coverage for https://github.com/mudler/LocalAI/issues/9790:
// the hook reads GGUF headers with SkipLargeMetadata + UseMMap to avoid
// pulling the whole tokenizer vocab off (slow) disk on every startup. This
// verifies that skipping the vocab array still yields the correct guessed
// defaults from the remaining scalar metadata.
const chatTemplate = "{{ bos_token }}{% for m in messages %}{{ m.content }}{% endfor %}"
It("guesses defaults from a GGUF whose large vocab is skipped", func() {
dir := GinkgoT().TempDir()
modelFile := "repro.gguf"
Expect(writeTestGGUF(filepath.Join(dir, modelFile), chatTemplate, 50000)).To(Succeed())
// A pre-set context size short-circuits the GGUF run-estimate, which
// needs full tensor info this header-only fixture deliberately omits;
// the metadata-reading path the optimization touches is unaffected.
ctxSize := 4096
cfg := &ModelConfig{
Backend: "llama-cpp",
LLMConfig: LLMConfig{ContextSize: &ctxSize},
PredictionOptions: schema.PredictionOptions{
BasicModelRequest: schema.BasicModelRequest{Model: modelFile},
},
}
cfg.SetDefaults(ModelPath(dir))
// chat_template is a scalar string, not part of the skipped array,
// so it must be captured verbatim.
Expect(cfg.GetModelTemplate()).To(Equal(chatTemplate))
// scalar-derived defaults are still applied
Expect(cfg.ContextSize).NotTo(BeNil())
Expect(cfg.NGPULayers).NotTo(BeNil())
Expect(cfg.TemplateConfig.UseTokenizerTemplate).To(BeTrue())
Expect(cfg.KnownUsecaseStrings).To(ContainElement("FLAG_CHAT"))
})
It("falls back to the default context size when the GGUF is unreadable", func() {
dir := GinkgoT().TempDir()
Expect(os.WriteFile(filepath.Join(dir, "bad.gguf"), []byte("not a gguf"), 0o644)).To(Succeed())
cfg := &ModelConfig{
Backend: "llama-cpp",
PredictionOptions: schema.PredictionOptions{
BasicModelRequest: schema.BasicModelRequest{Model: "bad.gguf"},
},
}
cfg.SetDefaults(ModelPath(dir))
Expect(cfg.ContextSize).NotTo(BeNil())
})
})
Context("PromptCacheAll default", func() {
It("defaults to true when omitted from YAML", func() {
cfg := &ModelConfig{}

View File

@@ -128,22 +128,6 @@ func DefaultRegistry() map[string]FieldMetaOverride {
Advanced: true,
Order: 21,
},
"reasoning_effort": {
Section: "llm",
Label: "Reasoning Effort",
Description: "Default reasoning effort, forwarded to the backend as the reasoning_effort chat_template_kwarg (jinja models like gpt-oss / LFM2.5 honor it). A per-request reasoning_effort overrides it. 'none' also turns thinking off.",
Component: "select",
Options: []FieldOption{
{Value: "", Label: "Unset (model default)"},
{Value: "none", Label: "none (disable thinking)"},
{Value: "minimal", Label: "minimal"},
{Value: "low", Label: "low"},
{Value: "medium", Label: "medium"},
{Value: "high", Label: "high"},
},
Advanced: true,
Order: 22,
},
"cache_type_k": {
Section: "llm",
Label: "KV Cache Type (K)",
@@ -293,21 +277,6 @@ func DefaultRegistry() map[string]FieldMetaOverride {
AutocompleteProvider: ProviderModelsVAD,
Order: 63,
},
"pipeline.reasoning_effort": {
Section: "pipeline",
Label: "Reasoning Effort",
Description: "Reasoning effort for the pipeline's LLM, forwarded to the backend as the reasoning_effort chat_template_kwarg (jinja models like gpt-oss / LFM2.5 honor it). Overrides the LLM model's own reasoning_effort. 'none' also turns thinking off.",
Component: "select",
Options: []FieldOption{
{Value: "", Label: "Default (model config)"},
{Value: "none", Label: "none (disable thinking)"},
{Value: "minimal", Label: "minimal"},
{Value: "low", Label: "low"},
{Value: "medium", Label: "medium"},
{Value: "high", Label: "high"},
},
Order: 64,
},
// --- Functions ---
"function.grammar.parallel_calls": {

View File

@@ -63,13 +63,6 @@ type ModelConfig struct {
FunctionsConfig functions.FunctionsConfig `yaml:"function,omitempty" json:"function,omitempty"`
ReasoningConfig reasoning.Config `yaml:"reasoning,omitempty" json:"reasoning,omitempty"`
// ReasoningEffort is the default reasoning effort (none|minimal|low|medium|high)
// for this model. A per-request reasoning_effort overrides it. It is forwarded
// to the backend as the reasoning_effort chat_template_kwarg (see
// gRPCPredictOpts), so jinja-templated models that key on it — e.g. gpt-oss
// (Harmony) or LFM2.5 — honor it; "none" also toggles enable_thinking off.
ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"`
FeatureFlag FeatureFlag `yaml:"feature_flags,omitempty" json:"feature_flags,omitempty"` // Feature Flag registry. We move fast, and features may break on a per model/backend basis. Registry for (usually temporary) flags that indicate aborting something early.
// LLM configs (GPT4ALL, Llama.cpp, ...)
LLMConfig `yaml:",inline" json:",inline"`
@@ -494,40 +487,6 @@ type Pipeline struct {
LLM string `yaml:"llm,omitempty" json:"llm,omitempty"`
Transcription string `yaml:"transcription,omitempty" json:"transcription,omitempty"`
VAD string `yaml:"vad,omitempty" json:"vad,omitempty"`
// ReasoningEffort sets the reasoning effort (none|minimal|low|medium|high) for
// the pipeline's LLM without editing the LLM model config. Overrides the LLM's
// own reasoning_effort. Unset leaves the LLM model config in charge.
ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"`
}
// ApplyReasoningEffort resolves the effective reasoning effort — a per-request
// value (requestEffort) overrides the config's own ReasoningEffort default —
// stores it on the config so gRPCPredictOpts forwards it to the backend as the
// reasoning_effort chat_template_kwarg, and maps it onto the enable_thinking
// toggle the backend also reads:
// - "none" always disables thinking.
// - any explicit level enables it, UNLESS the config already disabled reasoning
// (an operator's explicit disable wins over a request asking to think).
//
// An empty requestEffort keeps the config's own default. With no effort set
// anywhere it is a no-op, leaving the model's reasoning settings untouched.
func (c *ModelConfig) ApplyReasoningEffort(requestEffort string) {
effort := requestEffort
if effort == "" {
effort = c.ReasoningEffort
}
c.ReasoningEffort = effort
switch strings.ToLower(effort) {
case "none":
disable := true
c.ReasoningConfig.DisableReasoning = &disable
case "minimal", "low", "medium", "high":
if c.ReasoningConfig.DisableReasoning == nil || !*c.ReasoningConfig.DisableReasoning {
enable := false
c.ReasoningConfig.DisableReasoning = &enable
}
}
}
// @Description File configuration for model downloads

View File

@@ -30,26 +30,11 @@ func MTPSpecOptions() []string {
return out
}
// isDraftOnlyAssistantArch reports whether an architecture names a standalone
// MTP *draft* model rather than a self-speculating trunk. Upstream's Gemma4 MTP
// (ggml-org/llama.cpp#23398) registers the head as a separate `gemma4-assistant`
// architecture whose GGUF still carries `nextn_predict_layers`, but which cannot
// run alone: it requires a paired target context (`ctx_other`). Such archs must
// not trigger the embedded-head self-speculation defaults. The `-assistant`
// suffix is upstream's naming convention for these draft-only checkpoints.
func isDraftOnlyAssistantArch(arch string) bool {
return strings.HasSuffix(arch, "-assistant")
}
// HasEmbeddedMTPHead reports whether the parsed GGUF declares a self-speculating
// Multi-Token Prediction head. Detection reads `<arch>.nextn_predict_layers`,
// which is what `gguf_writer.add_nextn_predict_layers(n)` emits in upstream's
// HasEmbeddedMTPHead reports whether the parsed GGUF declares a Multi-Token
// Prediction head. Detection reads `<arch>.nextn_predict_layers`, which is
// what `gguf_writer.add_nextn_predict_layers(n)` emits in upstream's
// `conversion/qwen.py` MTP mixin. A positive layer count means the head is
// present in the same GGUF as the trunk.
//
// Draft-only assistant architectures (e.g. Gemma4's `gemma4-assistant`) carry
// the same key but are separate draft checkpoints meant to be paired with a
// target model, so they are deliberately excluded here.
func HasEmbeddedMTPHead(f *gguf.GGUFFile) (uint32, bool) {
if f == nil {
return 0, false
@@ -58,9 +43,6 @@ func HasEmbeddedMTPHead(f *gguf.GGUFFile) (uint32, bool) {
if arch == "" {
return 0, false
}
if isDraftOnlyAssistantArch(arch) {
return 0, false
}
v, ok := f.Header.MetadataKV.Get(arch + ".nextn_predict_layers")
if !ok {
return 0, false

View File

@@ -3,33 +3,10 @@ package config_test
import (
. "github.com/mudler/LocalAI/core/config"
gguf "github.com/gpustack/gguf-parser-go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// ggufWithArch fabricates a minimal in-memory GGUF carrying the given
// `general.architecture` and a positive `<arch>.nextn_predict_layers` count,
// so HasEmbeddedMTPHead can be exercised without a real model file.
func ggufWithArch(arch string, nextn uint32) *gguf.GGUFFile {
return &gguf.GGUFFile{
Header: gguf.GGUFHeader{
MetadataKV: gguf.GGUFMetadataKVs{
{
Key: "general.architecture",
ValueType: gguf.GGUFMetadataValueTypeString,
Value: arch,
},
{
Key: arch + ".nextn_predict_layers",
ValueType: gguf.GGUFMetadataValueTypeUint32,
Value: nextn,
},
},
},
}
}
var _ = Describe("MTP auto-defaults", func() {
Context("MTPSpecOptions", func() {
It("returns the upstream-recommended speculative tuple", func() {
@@ -105,20 +82,5 @@ var _ = Describe("MTP auto-defaults", func() {
Expect(ok).To(BeFalse())
Expect(n).To(BeZero())
})
It("detects a same-GGUF embedded head (DeepSeek/Qwen style)", func() {
n, ok := HasEmbeddedMTPHead(ggufWithArch("qwen3moe", 1))
Expect(ok).To(BeTrue())
Expect(n).To(Equal(uint32(1)))
})
It("ignores a gemma4-assistant draft-only model", func() {
// The assistant GGUF carries nextn_predict_layers but is a separate
// draft model that requires a paired target (ctx_other); it cannot
// self-speculate, so it must not trigger the embedded-head defaults.
n, ok := HasEmbeddedMTPHead(ggufWithArch("gemma4-assistant", 48))
Expect(ok).To(BeFalse())
Expect(n).To(BeZero())
})
})
})

View File

@@ -1,52 +0,0 @@
package config_test
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
)
// ApplyReasoningEffort resolves the effective reasoning effort (request value
// overrides the model config default), stores it on the config so it reaches the
// backend, and maps it onto the enable_thinking toggle.
var _ = Describe("ModelConfig.ApplyReasoningEffort", func() {
It("uses the request value over the config default", func() {
c := &config.ModelConfig{ReasoningEffort: "high"}
c.ApplyReasoningEffort("none")
Expect(c.ReasoningEffort).To(Equal("none"))
Expect(c.ReasoningConfig.DisableReasoning).ToNot(BeNil())
Expect(*c.ReasoningConfig.DisableReasoning).To(BeTrue())
})
It("falls back to the config default when the request omits it", func() {
c := &config.ModelConfig{ReasoningEffort: "none"}
c.ApplyReasoningEffort("")
Expect(c.ReasoningEffort).To(Equal("none"))
Expect(c.ReasoningConfig.DisableReasoning).ToNot(BeNil())
Expect(*c.ReasoningConfig.DisableReasoning).To(BeTrue())
})
It("enables thinking for an explicit effort level", func() {
c := &config.ModelConfig{}
c.ApplyReasoningEffort("medium")
Expect(c.ReasoningEffort).To(Equal("medium"))
Expect(c.ReasoningConfig.DisableReasoning).ToNot(BeNil())
Expect(*c.ReasoningConfig.DisableReasoning).To(BeFalse())
})
It("does not let a level override an operator's config-level disable", func() {
disabled := true
c := &config.ModelConfig{}
c.ReasoningConfig.DisableReasoning = &disabled
c.ApplyReasoningEffort("high")
Expect(*c.ReasoningConfig.DisableReasoning).To(BeTrue())
})
It("is a no-op on the toggle when no effort is set anywhere", func() {
c := &config.ModelConfig{}
c.ApplyReasoningEffort("")
Expect(c.ReasoningEffort).To(Equal(""))
Expect(c.ReasoningConfig.DisableReasoning).To(BeNil())
})
})

View File

@@ -420,9 +420,8 @@ func API(application *application.Application) (*echo.Echo, error) {
remoteUnloader = d.Router.Unloader()
}
}
natsCfg := distCfg.NatsAuthConfig()
routes.RegisterNodeSelfServiceRoutes(e, registry, distCfg.RegistrationToken, distCfg.AutoApproveNodes, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, natsCfg)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, application.GalleryService(), opcache, application.ApplicationConfig(), adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken, natsCfg)
routes.RegisterNodeSelfServiceRoutes(e, registry, distCfg.RegistrationToken, distCfg.AutoApproveNodes, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, application.GalleryService(), opcache, application.ApplicationConfig(), adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)
// Distributed SSE routes (job progress + agent events via NATS)
if d := application.Distributed(); d != nil {

View File

@@ -37,7 +37,7 @@ func TTSEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig
xlog.Debug("elevenlabs TTS request received", "modelName", input.ModelID)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Text, voiceID, input.LanguageCode, "", nil, ml, appConfig, *cfg)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Text, voiceID, input.LanguageCode, ml, appConfig, *cfg)
if err != nil {
return err
}

View File

@@ -28,7 +28,6 @@ import (
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/LocalAI/pkg/httpclient"
"github.com/mudler/LocalAI/pkg/natsauth"
)
// nodeError builds a schema.ErrorResponse for node endpoints.
@@ -90,7 +89,7 @@ type RegisterNodeRequest struct {
// RegisterNodeEndpoint registers a new backend node.
// expectedToken is the registration token configured on the frontend (may be empty to disable auth).
// autoApprove controls whether new nodes go directly to "healthy" or require admin approval.
func RegisterNodeEndpoint(registry *nodes.NodeRegistry, expectedToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string, natsCfg natsauth.Config) echo.HandlerFunc {
func RegisterNodeEndpoint(registry *nodes.NodeRegistry, expectedToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string) echo.HandlerFunc {
return func(c echo.Context) error {
var req RegisterNodeRequest
if err := c.Bind(&req); err != nil {
@@ -218,15 +217,13 @@ func RegisterNodeEndpoint(registry *nodes.NodeRegistry, expectedToken string, au
}
}
attachNatsJWT(response, node, natsCfg)
return c.JSON(http.StatusCreated, response)
}
}
// ApproveNodeEndpoint approves a pending node, setting its status to healthy.
// For agent workers, it also provisions an API key so they can call the inference API.
func ApproveNodeEndpoint(registry *nodes.NodeRegistry, authDB *gorm.DB, hmacSecret string, natsCfg natsauth.Config) echo.HandlerFunc {
func ApproveNodeEndpoint(registry *nodes.NodeRegistry, authDB *gorm.DB, hmacSecret string) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context()
id := c.Param("id")
@@ -256,26 +253,10 @@ func ApproveNodeEndpoint(registry *nodes.NodeRegistry, authDB *gorm.DB, hmacSecr
}
}
attachNatsJWT(response, node, natsCfg)
return c.JSON(http.StatusOK, response)
}
}
// attachNatsJWT adds a per-node NATS user JWT to a register/approve response when minting is enabled.
func attachNatsJWT(response map[string]any, node *nodes.BackendNode, natsCfg natsauth.Config) {
if !natsCfg.CanMintWorkers() || node == nil || node.Status == nodes.StatusPending {
return
}
jwt, seed, err := natsCfg.MintWorkerJWT(node.ID, node.NodeType)
if err != nil {
xlog.Warn("Failed to mint NATS JWT for node", "node", node.Name, "id", node.ID, "error", err)
return
}
response["nats_jwt"] = jwt
response["nats_user_seed"] = seed
}
// provisionAgentWorkerKey creates a dedicated user and API key for an agent worker node.
// Returns the plaintext API key on success.
func provisionAgentWorkerKey(ctx context.Context, authDB *gorm.DB, registry *nodes.NodeRegistry, node *nodes.BackendNode, hmacSecret string) (string, error) {

View File

@@ -12,8 +12,6 @@ import (
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/testutil"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/nats-io/nkeys"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -65,7 +63,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusCreated))
@@ -76,29 +74,6 @@ var _ = Describe("Node HTTP handlers", func() {
Expect(resp["status"]).To(Equal(nodes.StatusHealthy))
})
It("returns nats_jwt when account seed is configured", func() {
akp, err := nkeys.CreateAccount()
Expect(err).ToNot(HaveOccurred())
seed, err := akp.Seed()
Expect(err).ToNot(HaveOccurred())
e := echo.New()
body := `{"name":"worker-nats","address":"10.0.0.2:50051"}`
req := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(body))
req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
natsCfg := natsauth.Config{AccountSeed: string(seed)}
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusCreated))
var resp map[string]any
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
Expect(resp["nats_jwt"]).ToNot(BeEmpty())
})
It("returns 400 when name is missing", func() {
e := echo.New()
body := `{"address":"10.0.0.1:50051"}`
@@ -107,7 +82,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -127,7 +102,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -146,7 +121,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -165,7 +140,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -184,7 +159,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "correct-token", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "correct-token", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusUnauthorized))
})
@@ -197,7 +172,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", false, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", false, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusCreated))
@@ -220,7 +195,7 @@ var _ = Describe("Node HTTP handlers", func() {
req1 := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(body1))
req1.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
rec1 := httptest.NewRecorder()
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(e.NewContext(req1, rec1))).To(Succeed())
Expect(rec1.Code).To(Equal(http.StatusCreated))

View File

@@ -59,7 +59,7 @@ func TTSEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig
c.Response().Header().Set("Connection", "keep-alive")
// Stream audio chunks as they're generated
err := backend.ModelTTSStream(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, input.Instructions, input.Params, ml, appConfig, *cfg, func(audioChunk []byte) error {
err := backend.ModelTTSStream(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, ml, appConfig, *cfg, func(audioChunk []byte) error {
_, writeErr := c.Response().Write(audioChunk)
if writeErr != nil {
return writeErr
@@ -75,7 +75,7 @@ func TTSEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig
}
// Non-streaming TTS (existing behavior)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, input.Instructions, input.Params, ml, appConfig, *cfg)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, ml, appConfig, *cfg)
if err != nil {
return err
}

View File

@@ -103,12 +103,7 @@ func applyAutoparserOverride(
// blocks like "<think></think>" that some models emit when reasoning
// is disabled.
if deltaReasoning == "" && deltaContent != "" {
// Complete-response extraction: only honor a prefilled <think> start
// token when deltaContent actually closes the reasoning block. Without
// it the model answered directly and the whole answer must stay in
// content rather than be swallowed as unclosed reasoning. See
// reason.ExtractReasoningComplete.
deltaReasoning, deltaContent = reason.ExtractReasoningComplete(deltaContent, thinkingStartToken, reasoningConfig)
deltaReasoning, deltaContent = reason.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, reasoningConfig)
}
xlog.Debug("[ChatDeltas] non-SSE no-tools: overriding result with C++ autoparser deltas",
"content_len", len(deltaContent), "reasoning_len", len(deltaReasoning))

View File

@@ -186,114 +186,6 @@ var _ = Describe("applyAutoparserOverride", func() {
Expect(result).To(Equal(existing))
})
})
// Regression tests for the prefilled-thinking-token path (thinkingStartToken
// != ""). This is the configuration the gallery qwen3 family runs in: the
// chat template injects <think> into the prompt, so DetectThinkingStartToken
// returns "<think>" and the model's output begins *inside* a reasoning block
// — it emits a closing </think> but no opening tag.
//
// The defensive Go-side fallback prepends the start token so the standard
// extractor can pair it with the model's </think>. But on a *complete*
// response that contains NO closing tag (the model answered directly with no
// reasoning at all), prepending <think> manufactures an unclosed block that
// swallows the entire answer into reasoning, leaving content empty. That is
// the bug: short/direct answers (session names, JSON summaries) come back
// with an empty content field.
Context("autoparser delivered content with empty reasoning and a prefilled thinking token", func() {
const startToken = "<think>"
It("keeps a tag-less direct answer as content instead of swallowing it as reasoning", func() {
// Model answered directly: no <think>, no </think> anywhere.
chatDeltas := []*pb.ChatDelta{
{Content: "hello", ReasoningContent: ""},
}
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
Expect(result).To(HaveLen(1))
Expect(result[0].Message.Content).ToNot(BeNil())
Expect(*(result[0].Message.Content.(*string))).To(Equal("hello"),
"a complete answer with no closing reasoning tag must stay in content")
Expect(result[0].Message.Reasoning).To(BeNil(),
"no reasoning block was emitted, so Reasoning must not be set")
})
It("keeps a tag-less JSON answer as content (the summary case)", func() {
raw := `{"short":"Tests pass","long":"go test ./... succeeded."}`
chatDeltas := []*pb.ChatDelta{
{Content: raw, ReasoningContent: ""},
}
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
Expect(result).To(HaveLen(1))
Expect(*(result[0].Message.Content.(*string))).To(Equal(raw))
Expect(result[0].Message.Reasoning).To(BeNil())
})
It("still splits reasoning when the model emits the closing tag (prefill paired with </think>)", func() {
// The legitimate prefill case: <think> was in the prompt, so the
// output carries only the closing tag. The closing tag is the proof
// that a reasoning block exists, so extraction must run.
raw := "The user wants a greeting.\n</think>\n\nHello there!"
chatDeltas := []*pb.ChatDelta{
{Content: raw, ReasoningContent: ""},
}
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
Expect(result).To(HaveLen(1))
content := *(result[0].Message.Content.(*string))
Expect(content).To(ContainSubstring("Hello there!"))
Expect(content).ToNot(ContainSubstring("</think>"))
Expect(content).ToNot(ContainSubstring("The user wants a greeting"))
Expect(result[0].Message.Reasoning).ToNot(BeNil())
Expect(*result[0].Message.Reasoning).To(ContainSubstring("The user wants a greeting"))
})
It("still splits a fully-tagged <think>…</think> block with a prefill token set", func() {
raw := "<think>Reasoning here.</think>Final answer."
chatDeltas := []*pb.ChatDelta{
{Content: raw, ReasoningContent: ""},
}
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
Expect(result).To(HaveLen(1))
Expect(*(result[0].Message.Content.(*string))).To(Equal("Final answer."))
Expect(result[0].Message.Reasoning).ToNot(BeNil())
Expect(*result[0].Message.Reasoning).To(ContainSubstring("Reasoning here"))
})
// End-to-end regression for the real production failure: a request with
// enable_thinking=false against a <think>-capable model (qwen3 family).
//
// In non-thinking mode the model emits no reasoning block, so llama.cpp's
// autoparser correctly returns ChatDeltas with Content set and
// ReasoningContent EMPTY (verified against stock llama-server: the same
// model with chat_template_kwargs.enable_thinking=false returns
// reasoning_content=null and content="hello"). But thinkingStartToken is
// detected per-model from the enable_thinking=TRUE render
// (grpc-server renders with enable_thinking=true; DetectThinkingStartToken
// does not evaluate the jinja {% if enable_thinking %} conditional), so it
// is "<think>" even for this non-thinking request. The old code prepended
// it and swallowed the answer. This is the case that broke session
// summaries and auto-titles and was NOT covered before.
It("preserves content for a non-thinking-mode request (enable_thinking=false, empty reasoning_content)", func() {
// What llama.cpp's autoparser actually returns in non-thinking mode.
chatDeltas := []*pb.ChatDelta{
{Content: `{"short":"Go tests passed for internal/session"}`, ReasoningContent: ""},
}
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
Expect(result).To(HaveLen(1))
Expect(*(result[0].Message.Content.(*string))).To(Equal(`{"short":"Go tests passed for internal/session"}`),
"non-thinking-mode answers must reach the client intact, not be swallowed as reasoning")
Expect(result[0].Message.Reasoning).To(BeNil())
})
})
})
var _ = Describe("mergeToolCallDeltas", func() {

View File

@@ -1579,7 +1579,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
// ExtractReasoningWithConfig is a no-op when no tag pair matches,
// so it's safe to apply unconditionally in the no-reasoning branch.
if deltaReasoning == "" && deltaContent != "" {
deltaReasoning, deltaContent = reasoning.ExtractReasoningComplete(deltaContent, thinkingStartToken, config.ReasoningConfig)
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, config.ReasoningConfig)
}
reasoningText = deltaReasoning
responseWithoutReasoning = deltaContent
@@ -1587,7 +1587,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
cleanedResponse = deltaContent
toolCalls = deltaToolCalls
} else {
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningComplete(rawResponse, thinkingStartToken, config.ReasoningConfig)
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, config.ReasoningConfig)
textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig)
cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig)
toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig)

View File

@@ -44,10 +44,10 @@ type wrappedModel struct {
// deps in. nil-safe: with classifierRegistry == nil the per-turn
// routing block in Predict is skipped, preserving today's "one LLM
// for the whole session" behaviour.
routerDeps *middleware.ClassifierDeps
routerStore router.DecisionStore
routerSessionID string
routerUserID string
routerDeps *middleware.ClassifierDeps
routerStore router.DecisionStore
routerSessionID string
routerUserID string
}
// anyToAnyModel represent a model which supports Any-to-Any operations
@@ -119,11 +119,6 @@ func (m *wrappedModel) Predict(ctx context.Context, messages schema.Messages, im
}
}
// Surface the resolved reasoning effort to the Go-side template path too
// (jinja models get it via backend metadata in gRPCPredictOpts; Go-templated
// models like gpt-oss read it from the template's .ReasoningEffort).
input.ReasoningEffort = turnCfg.ReasoningEffort
var predInput string
var funcs []functions.Function
if !turnCfg.TemplateConfig.UseTokenizerTemplate {
@@ -318,7 +313,7 @@ func newRealtimeDecisionID() string {
}
func (m *wrappedModel) TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error) {
return backend.ModelTTS(ctx, text, voice, language, "", nil, m.modelLoader, m.appConfig, *m.TTSConfig)
return backend.ModelTTS(ctx, text, voice, language, m.modelLoader, m.appConfig, *m.TTSConfig)
}
func (m *wrappedModel) PredictConfig() *config.ModelConfig {
@@ -454,9 +449,6 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model
return nil, fmt.Errorf("failed to validate config: %w", err)
}
// Let the pipeline set the LLM's reasoning effort (cfgLLM is a per-session copy).
applyPipelineReasoning(cfgLLM, *pipeline)
cfgTTS, err := cl.LoadModelConfigFileByName(pipeline.TTS, ml.ModelPath)
if err != nil {

View File

@@ -1,16 +0,0 @@
package openai
import "github.com/mudler/LocalAI/core/config"
// applyPipelineReasoning sets the reasoning effort for a realtime pipeline's LLM
// from the pipeline config, without editing the underlying LLM model config. The
// pipeline value overrides the LLM's own reasoning_effort; when the pipeline does
// not set it, the LLM model config's reasoning_effort (if any) is used. The LLM
// config passed in is the per-session copy returned by the config loader, so this
// does not affect other users of the same model.
func applyPipelineReasoning(llm *config.ModelConfig, pipeline config.Pipeline) {
if llm == nil {
return
}
llm.ApplyReasoningEffort(pipeline.ReasoningEffort)
}

View File

@@ -1,33 +0,0 @@
package openai
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
)
// applyPipelineReasoning lets a realtime pipeline set the reasoning effort for
// its LLM (forwarded to the backend as reasoning_effort) without editing the LLM
// model config. The pipeline value overrides the LLM's own reasoning_effort.
var _ = Describe("applyPipelineReasoning", func() {
It("applies the pipeline reasoning_effort to the LLM config", func() {
llm := &config.ModelConfig{}
applyPipelineReasoning(llm, config.Pipeline{ReasoningEffort: "none"})
Expect(llm.ReasoningEffort).To(Equal("none"))
Expect(llm.ReasoningConfig.DisableReasoning).ToNot(BeNil())
Expect(*llm.ReasoningConfig.DisableReasoning).To(BeTrue())
})
It("falls back to the LLM's own reasoning_effort when the pipeline is unset", func() {
llm := &config.ModelConfig{ReasoningEffort: "high"}
applyPipelineReasoning(llm, config.Pipeline{})
Expect(llm.ReasoningEffort).To(Equal("high"))
Expect(llm.ReasoningConfig.DisableReasoning).ToNot(BeNil())
Expect(*llm.ReasoningConfig.DisableReasoning).To(BeFalse())
})
It("is nil-safe", func() {
applyPipelineReasoning(nil, config.Pipeline{ReasoningEffort: "low"})
})
})

View File

@@ -1356,7 +1356,7 @@ func handleOpenResponsesNonStream(c echo.Context, responseID string, createdAt i
thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig)
// Extract reasoning from result before cleaning
reasoningContent, cleanedResult := reason.ExtractReasoningComplete(result, thinkingStartToken, cfg.ReasoningConfig)
reasoningContent, cleanedResult := reason.ExtractReasoningWithConfig(result, thinkingStartToken, cfg.ReasoningConfig)
// Parse tool calls if using functions
var outputItems []schema.ORItemField
@@ -1996,7 +1996,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
finalCleanedResult = extractor.CleanedContent()
}
if finalReasoning == "" && finalCleanedResult == "" {
finalReasoning, finalCleanedResult = reason.ExtractReasoningComplete(result, thinkingStartToken, cfg.ReasoningConfig)
finalReasoning, finalCleanedResult = reason.ExtractReasoningWithConfig(result, thinkingStartToken, cfg.ReasoningConfig)
}
// Close reasoning item if it exists and wasn't closed yet
@@ -2493,7 +2493,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
finalCleanedResult = extractor.CleanedContent()
}
if finalReasoning == "" && finalCleanedResult == "" {
finalReasoning, finalCleanedResult = reason.ExtractReasoningComplete(result, thinkingStartToken, cfg.ReasoningConfig)
finalReasoning, finalCleanedResult = reason.ExtractReasoningWithConfig(result, thinkingStartToken, cfg.ReasoningConfig)
}
// Close reasoning item if it exists and wasn't closed yet

View File

@@ -310,13 +310,25 @@ func mergeOpenAIRequestAndModelConfig(config *config.ModelConfig, input *schema.
config.Temperature = input.Temperature
}
// Resolve the effective reasoning effort (request overrides the model config
// default), store it so gRPCPredictOpts forwards it to the backend as the
// reasoning_effort chat_template_kwarg (what gpt-oss / LFM2.5 read), and map
// it onto the enable_thinking toggle. "none" disables thinking (the #10072
// use case); a level enables it unless the config already disabled reasoning
// (an operator's explicit disable wins over a request asking to think).
config.ApplyReasoningEffort(input.ReasoningEffort)
// Map the per-request reasoning_effort onto the reasoning toggle the
// backend reads (enable_thinking metadata, set in gRPCPredictOpts).
// "none" disables thinking for this request - the use case from #10072,
// running a single Qwen3-style model and turning reasoning off per
// request. Any explicit effort level enables thinking, UNLESS the model
// config explicitly disabled it (DisableReasoning==true wins): an
// operator who deliberately turned reasoning off should not be overridden
// by a request. A value of "none" always disables, since that never
// conflicts with a config that also disables.
switch strings.ToLower(input.ReasoningEffort) {
case "none":
disable := true
config.ReasoningConfig.DisableReasoning = &disable
case "minimal", "low", "medium", "high":
if config.ReasoningConfig.DisableReasoning == nil || !*config.ReasoningConfig.DisableReasoning {
enable := false
config.ReasoningConfig.DisableReasoning = &enable
}
}
// Collapse the modern max_completion_tokens alias into the
// legacy Maxtokens field so downstream code reads exactly one.

View File

@@ -216,12 +216,6 @@ export function useChat(initialModel = '') {
audio_url: { url: `data:${file.type};base64,${file.base64}` },
})
userFiles.push({ name: file.name, type: 'audio' })
} else if (file.type?.startsWith('video/')) {
messageContent.push({
type: 'video_url',
video_url: { url: `data:${file.type};base64,${file.base64}` },
})
userFiles.push({ name: file.name, type: 'video' })
} else {
// Text/PDF files - append to content
if (file.textContent) {

View File

@@ -506,10 +506,7 @@ export default function Backends() {
<tbody>
{backends.map((b, idx) => {
const op = getBackendOp(b)
// A failed op is intentionally kept in the operations list so the
// OperationsBar can surface the error + Dismiss; it must NOT render
// as a perpetual "Installing..." spinner here (mirrors Models.jsx).
const isProcessing = !!op && !op.error
const isProcessing = !!op
const isExpanded = expandedRow === idx
return (

View File

@@ -265,7 +265,7 @@ function UserMessageContent({ content, files }) {
<div className="chat-message-files">
{files.map((f, i) => (
<span key={i} className="chat-file-inline">
<i className={`fas ${f.type === 'image' ? 'fa-image' : f.type === 'audio' ? 'fa-headphones' : f.type === 'video' ? 'fa-film' : 'fa-file'}`} />
<i className={`fas ${f.type === 'image' ? 'fa-image' : f.type === 'audio' ? 'fa-headphones' : 'fa-file'}`} />
{f.name}
</span>
))}
@@ -274,9 +274,6 @@ function UserMessageContent({ content, files }) {
{Array.isArray(content) && content.filter(c => c.type === 'image_url').map((img, i) => (
<img key={i} src={img.image_url.url} alt="attached" className="chat-inline-image" />
))}
{Array.isArray(content) && content.filter(c => c.type === 'video_url').map((vid, i) => (
<video key={i} src={vid.video_url.url} controls className="chat-inline-video" />
))}
</>
)
}
@@ -714,7 +711,7 @@ export default function Chat() {
for (const file of e.target.files) {
const base64 = await fileToBase64(file)
const entry = { name: file.name, type: file.type, base64 }
if (!file.type.startsWith('image/') && !file.type.startsWith('audio/') && !file.type.startsWith('video/')) {
if (!file.type.startsWith('image/') && !file.type.startsWith('audio/')) {
entry.textContent = await file.text().catch(() => '')
}
newFiles.push(entry)
@@ -1247,7 +1244,7 @@ export default function Chat() {
<div className="chat-files">
{files.map((f, i) => (
<span key={i} className="chat-file-badge">
<i className={`fas ${f.type?.startsWith('image/') ? 'fa-image' : f.type?.startsWith('audio/') ? 'fa-headphones' : f.type?.startsWith('video/') ? 'fa-film' : 'fa-file'}`} />
<i className={`fas ${f.type?.startsWith('image/') ? 'fa-image' : f.type?.startsWith('audio/') ? 'fa-headphones' : 'fa-file'}`} />
{f.name}
<button onClick={() => setFiles(prev => prev.filter((_, idx) => idx !== i))}>
<i className="fas fa-xmark" />
@@ -1346,7 +1343,7 @@ export default function Chat() {
ref={fileInputRef}
type="file"
multiple
accept="image/*,audio/*,video/*,application/pdf,.txt,.md,.csv,.json"
accept="image/*,audio/*,application/pdf,.txt,.md,.csv,.json"
style={{ display: 'none' }}
onChange={handleFileChange}
/>

View File

@@ -12,7 +12,6 @@ import ActionMenu from '../components/ActionMenu'
import ResourceRow, { ChevronCell, IconCell, StopPropagationCell } from '../components/ResourceRow'
import { useModels } from '../hooks/useModels'
import { useGalleryEnrichment } from '../hooks/useGalleryEnrichment'
import { useOperations } from '../hooks/useOperations'
import { backendControlApi, modelsApi, backendsApi, systemApi, nodesApi } from '../utils/api'
import { renderMarkdown } from '../utils/markdown'
import { safeHref } from '../utils/url'
@@ -127,7 +126,6 @@ export default function Manage() {
const [activeTab, setActiveTab] = useState(TABS.some(tab => tab.key === initialTab) ? initialTab : 'models')
const { models, loading: modelsLoading, refetch: refetchModels } = useModels()
const { enrichModel, enrichBackend } = useGalleryEnrichment()
const { operations } = useOperations()
const [loadedModelIds, setLoadedModelIds] = useState(new Set())
const [backends, setBackends] = useState([])
const [backendsLoading, setBackendsLoading] = useState(true)
@@ -260,19 +258,14 @@ export default function Manage() {
return `${m}m ago`
})()
// Refresh installed backends + available upgrades when the Backends tab opens
// AND whenever a backend operation settles (operations.length changes as a
// reinstall/upgrade completes and drops off the list). Without the op-settle
// refresh the installed-version cell and the "update available" badge stay
// stale after an upgrade until the user switches tabs - the op looks like it
// "did nothing". Mirrors the operations.length watch Backends.jsx uses.
// Fetch available backend upgrades
useEffect(() => {
if (activeTab !== 'backends') return
fetchBackends()
backendsApi.checkUpgrades()
.then(data => setUpgrades(data || {}))
.catch(() => {})
}, [operations.length, activeTab, fetchBackends])
if (activeTab === 'backends') {
backendsApi.checkUpgrades()
.then(data => setUpgrades(data || {}))
.catch(() => {})
}
}, [activeTab])
const handleStopModel = (modelName) => {
setConfirmDialog({

View File

@@ -10,7 +10,6 @@ import (
"github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/pkg/natsauth"
"gorm.io/gorm"
)
@@ -36,7 +35,7 @@ func nodeReadyMiddleware(registry *nodes.NodeRegistry) echo.MiddlewareFunc {
// token but do not verify per-node identity. A compromised worker can heartbeat/drain/
// deregister other nodes. Future: issue per-node JWT at registration, validate node
// identity on subsequent requests (compare :id param with token subject).
func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, registrationToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string, natsCfg natsauth.Config) {
func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, registrationToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string) {
if registry == nil {
return
}
@@ -45,7 +44,7 @@ func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, r
tokenAuthMw := nodeTokenAuth(registrationToken)
node := e.Group("/api/node", readyMw, tokenAuthMw)
node.POST("/register", localai.RegisterNodeEndpoint(registry, registrationToken, autoApprove, authDB, hmacSecret, natsCfg))
node.POST("/register", localai.RegisterNodeEndpoint(registry, registrationToken, autoApprove, authDB, hmacSecret))
node.POST("/:id/heartbeat", localai.HeartbeatEndpoint(registry))
node.POST("/:id/drain", localai.DrainNodeEndpoint(registry))
node.POST("/:id/resume", localai.ResumeNodeEndpoint(registry))
@@ -61,7 +60,7 @@ func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, r
// backend install path (POST /:id/backends/install). That handler enqueues a
// ManagementOp on the gallery channel rather than blocking on a NATS reply, so
// the browser gets HTTP 202 + jobID immediately instead of waiting up to 3 minutes.
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string, natsCfg natsauth.Config) {
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string) {
if registry == nil {
return
}
@@ -82,7 +81,7 @@ func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloade
admin.DELETE("/:id", localai.DeregisterNodeEndpoint(registry))
admin.POST("/:id/drain", localai.DrainNodeEndpoint(registry))
admin.POST("/:id/resume", localai.ResumeNodeEndpoint(registry))
admin.POST("/:id/approve", localai.ApproveNodeEndpoint(registry, authDB, hmacSecret, natsCfg))
admin.POST("/:id/approve", localai.ApproveNodeEndpoint(registry, authDB, hmacSecret))
// Backend management on workers
admin.GET("/:id/backends", localai.ListBackendsOnNodeEndpoint(unloader))

View File

@@ -60,14 +60,6 @@ type TTSRequest struct {
Format string `json:"response_format,omitempty" yaml:"response_format,omitempty"` // (optional) output format
Stream bool `json:"stream,omitempty" yaml:"stream,omitempty"` // (optional) enable streaming TTS
SampleRate int `json:"sample_rate,omitempty" yaml:"sample_rate,omitempty"` // (optional) desired output sample rate
// Instructions is a free-form, per-request style/voice description. It maps to
// the OpenAI `instructions` field and is forwarded to the backend so expressive
// TTS models (e.g. Qwen3-TTS CustomVoice/VoiceDesign) can vary tone or designed
// voice per request instead of only via the static YAML option.
Instructions string `json:"instructions,omitempty" yaml:"instructions,omitempty"`
// Params carries optional, backend-specific per-request generation parameters
// (LocalAI extension, e.g. Chatterbox exaggeration/cfg_weight/temperature).
Params map[string]string `json:"params,omitempty" yaml:"params,omitempty"`
}
// @Description VAD request body

View File

@@ -180,21 +180,18 @@ func (s *GalleryStore) Cancel(id string) error {
return s.UpdateStatus(id, "cancelled", "")
}
// CleanStale marks abandoned in-progress operations as failed and returns the
// number of rows reaped. Called on startup AND periodically to recover from
// crashed/restarted instances that left records in pending/downloading/
// processing state — an op orphaned after startup would otherwise linger
// "processing" until the next restart.
func (s *GalleryStore) CleanStale(age time.Duration) (int64, error) {
// CleanStale marks abandoned in-progress operations as failed.
// Should be called on startup to recover from crashed instances that
// left records in pending/downloading/processing state.
func (s *GalleryStore) CleanStale(age time.Duration) error {
cutoff := time.Now().Add(-age)
res := s.db.Model(&GalleryOperationRecord{}).
return s.db.Model(&GalleryOperationRecord{}).
Where("updated_at < ? AND status IN ?", cutoff, activeStatuses).
Updates(map[string]any{
"status": "failed",
"error": "stale operation reaped (abandoned by a crashed or restarted instance)",
"error": "stale operation cleaned up on startup",
"updated_at": time.Now(),
})
return res.RowsAffected, res.Error
}).Error
}
// CleanOld removes operations older than the given duration.

View File

@@ -71,7 +71,7 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend,
var err error
if op.Upgrade {
err = g.backendManager.UpgradeBackend(ctx, op.ID, op.GalleryElementName, progressCallback)
err = g.backendManager.UpgradeBackend(ctx, op.GalleryElementName, progressCallback)
} else if op.Delete {
err = g.backendManager.DeleteBackend(op.GalleryElementName)
} else {

View File

@@ -1,106 +0,0 @@
package galleryop_test
import (
"context"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/testutil"
)
// Reproduces "a cancelled/orphaned op resurrects as 'processing' after a pod
// restart". CancelOperation flipped the in-memory status to cancelled and
// broadcast a NATS event, but never persisted the terminal status to the
// gallery store. On the next replica restart the still-"pending" row hydrated
// straight back into processingBackends and the UI spun again. CancelOperation
// must persist the cancellation so it survives a restart.
var _ = Describe("GalleryService.CancelOperation persistence", func() {
It("persists the cancelled status to the gallery store", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
// Seed an in-flight op as if a replica was mid-install.
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "op-cancel",
GalleryElementName: "llama-cpp-development",
OpType: "backend_install",
Status: "pending",
Progress: 0,
})).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svc.SetGalleryStore(store)
// Make the op locally cancellable so CancelOperation proceeds.
svc.StoreCancellation("op-cancel", context.CancelFunc(func() {}))
Expect(svc.CancelOperation("op-cancel")).To(Succeed())
// The persisted row must now be terminal — otherwise it re-hydrates as
// pending on the next restart.
rec, err := store.Get("op-cancel")
Expect(err).ToNot(HaveOccurred())
Expect(rec.Status).To(Equal("cancelled"))
// And a fresh service hydrating from the store must NOT see it as active.
fresh := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
fresh.SetGalleryStore(store)
Expect(fresh.Hydrate()).To(Succeed())
Expect(fresh.GetStatus("op-cancel")).To(BeNil(),
"a cancelled op must not hydrate back as active after a restart")
})
})
// Reproduces "an op orphaned by a replica that died mid-flight stays 'pending'
// forever". CleanStale (which marks abandoned active ops failed) only ran once
// on startup, so an op orphaned AFTER startup was never reaped until the next
// restart. The service must reap stale ops on an interval, not just at boot.
var _ = Describe("GalleryService.ReapStaleOperations", func() {
It("marks abandoned active ops terminal once they pass the age cutoff", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "orphan-op",
GalleryElementName: "llama-cpp-development",
OpType: "backend_install",
Status: "pending",
Progress: 0,
})).To(Succeed())
// Force the row's updated_at into the past so it is older than the cutoff.
Expect(db.Exec(
"UPDATE gallery_operations SET updated_at = ? WHERE id = ?",
time.Now().Add(-1*time.Hour), "orphan-op",
).Error).To(Succeed())
// A fresh, still-progressing op must NOT be reaped.
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "live-op",
GalleryElementName: "vllm-development",
OpType: "backend_install",
Status: "downloading",
Progress: 50,
})).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svc.SetGalleryStore(store)
reaped, err := svc.ReapStaleOperations(30 * time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(reaped).To(Equal(int64(1)))
orphan, err := store.Get("orphan-op")
Expect(err).ToNot(HaveOccurred())
Expect(orphan.Status).To(Equal("failed"))
live, err := store.Get("live-op")
Expect(err).ToNot(HaveOccurred())
Expect(live.Status).To(Equal("downloading"), "a recently-updated op must not be reaped")
})
})

View File

@@ -20,7 +20,7 @@ type BackendManager interface {
InstallBackend(ctx context.Context, op *ManagementOp[gallery.GalleryBackend, any], progressCb ProgressCallback) error
DeleteBackend(name string) error
ListBackends() (gallery.SystemBackends, error)
UpgradeBackend(ctx context.Context, opID, name string, progressCb ProgressCallback) error
UpgradeBackend(ctx context.Context, name string, progressCb ProgressCallback) error
CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error)
// IsDistributed reports whether installs fan out across worker nodes.
// The HTTP layer uses this to refuse hardware-specific (non-meta) installs

View File

@@ -96,10 +96,7 @@ func (b *LocalBackendManager) ListBackends() (gallery.SystemBackends, error) {
return gallery.ListSystemBackends(b.systemState)
}
// UpgradeBackend ignores opID: a single-node install reports progress through
// the local progressCb already; opID only matters for distributed per-node
// streaming (see DistributedBackendManager.UpgradeBackend).
func (b *LocalBackendManager) UpgradeBackend(ctx context.Context, _ string, name string, progressCb ProgressCallback) error {
func (b *LocalBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb ProgressCallback) error {
return gallery.UpgradeBackend(ctx, b.systemState, b.modelLoader, b.backendGalleries, name, progressCb, b.requireBackendIntegrity)
}

View File

@@ -1,107 +0,0 @@
package galleryop_test
import (
"errors"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
)
// These specs reproduce the distributed "Reinstall spins forever" bug:
// processingBackends (the UI spinner source) is built from OpCache.GetStatus,
// which historically returned every cached op unconditionally. Cleanup only
// happened when a client polled /api/backends/job/:uid, but the Manage-page
// Reinstall/Upgrade buttons never poll, so a completed install stayed in
// processingBackends forever. GetStatus must self-evict terminal ops.
var _ = Describe("OpCache.GetStatus eviction", func() {
var (
svc *galleryop.GalleryService
cache *galleryop.OpCache
)
BeforeEach(func() {
svc = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
cache = galleryop.NewOpCache(svc)
})
It("keeps an op that is still processing", func() {
cache.SetBackend("llama-cpp", "uuid-1")
svc.UpdateStatus("uuid-1", &galleryop.OpStatus{Message: "processing backend: llama-cpp", Progress: 0})
processing, _ := cache.GetStatus()
Expect(processing).To(HaveKeyWithValue("llama-cpp", "uuid-1"))
Expect(cache.Exists("llama-cpp")).To(BeTrue())
})
It("evicts a completed op so it no longer shows as processing", func() {
cache.SetBackend("llama-cpp", "uuid-1")
svc.UpdateStatus("uuid-1", &galleryop.OpStatus{Processed: true, Progress: 100, Message: "completed"})
processing, _ := cache.GetStatus()
Expect(processing).NotTo(HaveKey("llama-cpp"))
Expect(cache.Exists("llama-cpp")).To(BeFalse())
})
It("keeps a failed op so the operations panel can surface the error and offer Dismiss", func() {
cache.SetBackend("piper", "uuid-2")
svc.UpdateStatus("uuid-2", &galleryop.OpStatus{Processed: true, Error: errors.New("boom")})
processing, _ := cache.GetStatus()
Expect(processing).To(HaveKeyWithValue("piper", "uuid-2"))
Expect(cache.Exists("piper")).To(BeTrue())
})
It("keeps the ErrWorkerStillInstalling soft-path op (worker still installing in background)", func() {
// Processed=true with no error but progress != 100 and a non-"completed"
// message: the worker timed out the NATS round-trip but is still installing.
// Evicting it would hide an install that may still fail; the reconciler
// confirms the real outcome later.
cache.SetBackend("vllm-development", "uuid-soft")
svc.UpdateStatus("uuid-soft", &galleryop.OpStatus{
Processed: true,
Message: "backend vllm-development: worker still installing in background; reconciler will confirm completion",
})
processing, _ := cache.GetStatus()
Expect(processing).To(HaveKeyWithValue("vllm-development", "uuid-soft"))
Expect(cache.Exists("vllm-development")).To(BeTrue())
})
It("evicts a cancelled op", func() {
cache.SetBackend("vllm", "uuid-3")
svc.UpdateStatus("uuid-3", &galleryop.OpStatus{Processed: true, Cancelled: true, Message: "cancelled"})
processing, _ := cache.GetStatus()
Expect(processing).NotTo(HaveKey("vllm"))
})
It("does not evict an op with no status yet (queued)", func() {
cache.SetBackend("whisper", "uuid-4")
processing, taskTypes := cache.GetStatus()
Expect(processing).To(HaveKeyWithValue("whisper", "uuid-4"))
Expect(taskTypes).To(HaveKeyWithValue("whisper", "Waiting"))
})
// Regression guard: GetStatus is called concurrently by four HTTP handlers
// (~1s poll). An earlier version evicted by deleting from m.Map() — which
// returns the live internal map by reference — causing a fatal
// "concurrent map writes" crash. Run under -race; must not panic or race.
It("is safe under concurrent GetStatus + Set/complete", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
for i := 0; i < 2000; i++ {
_, _ = cache.GetStatus()
}
close(done)
}()
for i := 0; i < 2000; i++ {
id := "uuid-c"
cache.SetBackend("concurrent-backend", id)
// Half the time mark it completed so GetStatus evicts it.
if i%2 == 0 {
svc.UpdateStatus(id, &galleryop.OpStatus{Processed: true, Progress: 100, Message: "completed"})
}
_, _ = cache.GetStatus()
}
<-done
})
})

View File

@@ -408,43 +408,12 @@ func (m *OpCache) Exists(key string) bool {
}
func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
taskTypes := map[string]string{}
processingModelsData := map[string]string{}
processingModelsData := m.Map()
// Iterate a snapshot (Keys() copies) and build a fresh result map. We must
// NOT delete from m.Map() during the range: Map() returns the live internal
// map by reference, so a bare delete here would be an unsynchronized write
// to a map four HTTP handlers read every ~1s — a concurrent-map-write crash.
// Collect evictions and apply them via the locked DeleteUUID after the loop.
var evict []string
for _, k := range m.status.Keys() {
v := m.status.Get(k)
if v == "" {
continue // raced with a concurrent Delete
}
taskTypes := map[string]string{}
for k, v := range processingModelsData {
status := m.galleryService.GetStatus(v)
// Terminal ops must not keep showing as "processing". Cleanup was
// previously only triggered by a client polling /api/backends/job/:uid,
// but the Manage-page Reinstall/Upgrade buttons never poll, so completed
// ops leaked into processingBackends forever and the card spun
// "reinstalling" indefinitely. Evict here on the list read (the UI always
// calls this). DeleteUUID broadcasts the eviction so peer replicas converge.
//
// We evict ONLY a clean success (progress 100 + "completed", matching the
// job-poll's historical delete condition) or a cancellation. Deliberately
// NOT evicted:
// - failed ops (Error != nil): kept so /api/operations can surface the
// error and offer Dismiss.
// - the ErrWorkerStillInstalling soft-path (Processed=true, Error=nil,
// progress != 100): the worker is still installing in the background
// and the reconciler confirms the real outcome later — evicting it
// would hide an install that may still fail.
if status != nil && status.Processed &&
((status.Progress == 100 && status.Message == "completed") || status.Cancelled) {
evict = append(evict, v)
continue
}
processingModelsData[k] = v
taskTypes[k] = "Installation"
if status != nil && status.Deletion {
taskTypes[k] = "Deletion"
@@ -453,10 +422,6 @@ func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
}
}
for _, v := range evict {
m.DeleteUUID(v)
}
return processingModelsData, taskTypes
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
@@ -32,9 +31,9 @@ type GalleryService struct {
// natsClient is the wider MessagingClient (Publisher + subscribe methods)
// when wired by the distributed startup path; broadcastSubs holds the
// progress + cancel subscriptions opened by SubscribeBroadcasts.
natsClient messaging.MessagingClient
galleryStore *distributed.GalleryStore
broadcastSubs []messaging.Subscription
natsClient messaging.MessagingClient
galleryStore *distributed.GalleryStore
broadcastSubs []messaging.Subscription
// OnBackendOpCompleted is fired after every successful install/upgrade/delete
// on the backend channel. The Application wires this to UpgradeChecker.TriggerCheck
@@ -275,29 +274,6 @@ func (g *GalleryService) GetAllStatus() map[string]*OpStatus {
return g.statuses
}
// ReapStaleOperations marks abandoned in-progress operations (pending/
// downloading/processing) older than `age` as failed, so an op orphaned by a
// replica that died mid-flight does not linger as "processing" forever. The
// store's CleanStale runs once on startup; this exposes it for periodic
// invocation (a post-startup orphan is otherwise not reaped until the next
// restart). No-op when no gallery store is wired. Returns rows reaped.
func (g *GalleryService) ReapStaleOperations(age time.Duration) (int64, error) {
g.Lock()
store := g.galleryStore
g.Unlock()
if store == nil {
return 0, nil
}
n, err := store.CleanStale(age)
if err != nil {
return 0, err
}
if n > 0 {
xlog.Info("Reaped stale gallery operations", "count", n)
}
return n, nil
}
// CancelOperation cancels an in-progress operation by its ID.
//
// In distributed mode the UI's cancel click may land on a different replica
@@ -319,7 +295,6 @@ func (g *GalleryService) CancelOperation(id string) error {
}
nc := g.natsClient
store := g.galleryStore
if !localExists && nc == nil {
g.Unlock()
@@ -340,17 +315,6 @@ func (g *GalleryService) CancelOperation(id string) error {
}
g.Unlock()
// Persist the terminal status so the cancel survives a restart. Without
// this the row stays in its active state and re-hydrates straight back into
// processingBackends on the next replica boot — the UI spins again on an op
// the admin already cancelled. The peer that broadcasts wins the write; a
// no-op when standalone (store nil).
if store != nil {
if err := store.Cancel(id); err != nil {
xlog.Warn("Failed to persist gallery operation cancellation", "op_id", id, "error", err)
}
}
// I/O and user-provided callback after Unlock — the cancel-wildcard
// subscriber loops back into applyCancel on this same replica, which
// would otherwise deadlock on g.Mutex.

View File

@@ -2,22 +2,15 @@ package messaging
import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/mudler/LocalAI/pkg/sanitize"
"github.com/mudler/xlog"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
// subscribeConfirmTimeout bounds the server round-trip used to detect whether a
// subscription was rejected (e.g. by JWT permissions) before returning to the caller.
const subscribeConfirmTimeout = 5 * time.Second
// Client wraps a NATS connection and provides helpers for pub/sub and queue subscriptions.
type Client struct {
conn *nats.Conn
@@ -25,13 +18,8 @@ type Client struct {
}
// New creates a new NATS client with auto-reconnect.
func New(url string, opts ...Option) (*Client, error) {
var cfg connectConfig
for _, o := range opts {
o(&cfg)
}
natsOpts := []nats.Option{
func New(url string) (*Client, error) {
nc, err := nats.Connect(url,
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(-1),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
@@ -45,60 +33,7 @@ func New(url string, opts ...Option) (*Client, error) {
nats.ClosedHandler(func(_ *nats.Conn) {
xlog.Info("NATS connection closed")
}),
// Surface async errors (notably permission violations) that NATS would
// otherwise deliver silently. A subscription the server rejects for a
// JWT permission means the worker never receives those messages, so make
// it loud rather than letting the feature fail invisibly.
nats.ErrorHandler(func(_ *nats.Conn, sub *nats.Subscription, err error) {
subject := ""
if sub != nil {
subject = sub.Subject
}
if errors.Is(err, nats.ErrPermissionViolation) {
xlog.Error("NATS permission violation — check JWT pub/sub allow lists", "subject", subject, "error", err)
return
}
xlog.Warn("NATS async error", "subject", subject, "error", err)
}),
}
switch {
case cfg.jwtProvider != nil:
// Fetch creds on every (re)connect so a refresh loop can rotate the JWT
// before expiry; the server expiring the old JWT triggers a reconnect
// that transparently picks up the new one.
natsOpts = append(natsOpts, nats.UserJWT(
func() (string, error) {
jwt, _ := cfg.jwtProvider()
if jwt == "" {
return "", fmt.Errorf("no NATS user JWT available")
}
return jwt, nil
},
func(nonce []byte) ([]byte, error) {
_, seed := cfg.jwtProvider()
kp, err := nkeys.FromSeed([]byte(seed))
if err != nil {
return nil, fmt.Errorf("loading NATS user seed: %w", err)
}
defer kp.Wipe()
return kp.Sign(nonce)
},
))
case cfg.userJWT != "" && cfg.userSeed != "":
natsOpts = append(natsOpts, nats.UserJWTAndSeed(cfg.userJWT, cfg.userSeed))
}
if cfg.tls.Enabled() {
if err := cfg.tls.Validate(); err != nil {
return nil, err
}
tlsOpts, err := cfg.tls.natsOptions()
if err != nil {
return nil, err
}
natsOpts = append(natsOpts, tlsOpts...)
}
nc, err := nats.Connect(url, natsOpts...)
)
if err != nil {
return nil, fmt.Errorf("connecting to NATS at %s: %w", sanitize.URL(url), err)
}
@@ -119,65 +54,21 @@ func (c *Client) Publish(subject string, data any) error {
// Subscribe creates a subscription on the given subject. All subscribers receive every message.
func (c *Client) Subscribe(subject string, handler func([]byte)) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
}
// QueueSubscribe creates a queue subscription. Within the same queue group,
// only one subscriber receives each message (load-balanced).
func (c *Client) QueueSubscribe(subject, queue string, handler func([]byte)) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data)
})
})
}
// confirmSubscription creates a subscription via mk and forces a server
// round-trip so that a permissions violation — which NATS otherwise reports
// only asynchronously — is returned to the caller synchronously. The server
// emits the "-ERR Permissions Violation" for a rejected SUB before the PONG
// that satisfies the flush, so by the time FlushTimeout returns the violation
// is recorded as the connection's last error. Without this, a worker whose JWT
// lacks a subject gets a non-nil subscription that never receives a message,
// turning a permission misconfiguration into a silent failure.
func (c *Client) confirmSubscription(subject string, mk func(*nats.Conn) (*nats.Subscription, error)) (Subscription, error) {
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
return nil, fmt.Errorf("subscribe to %s: nil NATS connection", subject)
}
sub, err := mk(conn)
if err != nil {
return nil, err
}
// A failed flush here means we could not round-trip to the server (not yet
// connected, reconnecting, slow link). RetryOnFailedConnect intentionally
// buffers subscriptions across that gap, so do NOT fail — keep the
// subscription and let it replay on (re)connect; a later permission
// violation is still logged by the async error handler in New.
if err := conn.FlushTimeout(subscribeConfirmTimeout); err != nil {
xlog.Debug("Could not confirm NATS subscription (will replay on connect)", "subject", subject, "error", err)
return sub, nil
}
// Flush succeeded, so any permission violation for this SUB has already been
// recorded as the connection's last error (the server emits it before the
// PONG). LastError is per-connection; match the exact quoted subject the
// server echoes ("Subscription to \"<subject>\"") so a stale violation for
// another subject can't be mis-attributed here.
if lerr := conn.LastError(); lerr != nil &&
errors.Is(lerr, nats.ErrPermissionViolation) &&
strings.Contains(lerr.Error(), `Subscription to "`+subject+`"`) {
_ = sub.Unsubscribe()
return nil, fmt.Errorf("subscription to %s denied by NATS server (check JWT sub allow list): %w", subject, lerr)
}
return sub, nil
defer c.mu.RUnlock()
return c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data)
})
}
// Request sends a request and waits for a reply (request-reply pattern).
@@ -195,15 +86,15 @@ func (c *Client) Request(subject string, data []byte, timeout time.Duration) ([]
// SubscribeReply creates a subscription that supports replying to requests.
// The handler receives the raw request data and the reply subject.
func (c *Client) SubscribeReply(subject string, handler func(data []byte, reply func([]byte))) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
})
}
})
})
}
@@ -211,15 +102,15 @@ func (c *Client) SubscribeReply(subject string, handler func(data []byte, reply
// QueueSubscribeReply creates a queue subscription that supports replying to requests.
// Load-balanced across subscribers in the same queue group, with request-reply support.
func (c *Client) QueueSubscribeReply(subject, queue string, handler func(data []byte, reply func([]byte))) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
})
}
})
})
}

View File

@@ -1,34 +0,0 @@
package messaging
// Option configures NATS client connection behavior.
type Option func(*connectConfig)
// CredentialProvider returns the NATS user JWT and signing seed to use for the
// next (re)connect. It is consulted on every connection attempt, so a refresh
// loop can rotate credentials before they expire and the connection picks them
// up automatically when the server expires the old JWT and triggers a reconnect.
type CredentialProvider func() (jwt, seed string)
type connectConfig struct {
userJWT string
userSeed string
jwtProvider CredentialProvider
tls TLSFiles
}
// WithUserJWT connects using a static NATS user JWT and signing seed (UserJWTAndSeed).
func WithUserJWT(jwt, seed string) Option {
return func(c *connectConfig) {
c.userJWT = jwt
c.userSeed = seed
}
}
// WithUserJWTProvider connects using credentials fetched from provider on each
// (re)connect, enabling JWT rotation without dropping the client. Takes
// precedence over WithUserJWT when both are set.
func WithUserJWTProvider(provider CredentialProvider) Option {
return func(c *connectConfig) {
c.jwtProvider = provider
}
}

View File

@@ -194,14 +194,6 @@ type BackendUpgradeRequest struct {
// but the field lets future per-replica metadata (e.g. progress reporting
// scoped to a slot) ride the same wire without a v3 type.
ReplicaIndex int32 `json:"replica_index,omitempty"`
// OpID identifies the admin-side operation. When non-empty the worker
// publishes BackendInstallProgressEvent values to
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the force-reinstall
// runs, so the master can stream per-node progress for upgrades exactly as
// it already does for installs (an upgrade IS a force-reinstall, so the
// install-progress subject is reused rather than minting a new one — no new
// NATS permission or rolling-update compat surface). Empty on legacy callers.
OpID string `json:"op_id,omitempty"`
}
// BackendUpgradeReply mirrors BackendInstallReply minus Address — upgrade does

View File

@@ -1,68 +0,0 @@
package messaging
import (
"fmt"
"os"
"github.com/nats-io/nats.go"
)
// TLSFiles holds PEM paths for NATS TLS / mTLS. Cert and key must be set together.
// Use tls:// in LOCALAI_NATS_URL; CA and client cert paths are optional extras.
type TLSFiles struct {
CA string // LOCALAI_NATS_TLS_CA — private CA for server verification
Cert string // LOCALAI_NATS_TLS_CERT — client certificate (mTLS)
Key string // LOCALAI_NATS_TLS_KEY — client private key
}
// Enabled reports whether any TLS file path is configured.
func (f TLSFiles) Enabled() bool {
return f.CA != "" || f.Cert != "" || f.Key != ""
}
// Validate checks path pairing and that files exist.
func (f TLSFiles) Validate() error {
if f.Cert != "" && f.Key == "" {
return fmt.Errorf("LOCALAI_NATS_TLS_KEY is required when LOCALAI_NATS_TLS_CERT is set")
}
if f.Key != "" && f.Cert == "" {
return fmt.Errorf("LOCALAI_NATS_TLS_CERT is required when LOCALAI_NATS_TLS_KEY is set")
}
for _, path := range []struct {
name, path string
}{
{"LOCALAI_NATS_TLS_CA", f.CA},
{"LOCALAI_NATS_TLS_CERT", f.Cert},
{"LOCALAI_NATS_TLS_KEY", f.Key},
} {
if path.path == "" {
continue
}
if _, err := os.Stat(path.path); err != nil {
return fmt.Errorf("%s: %w", path.name, err)
}
}
return nil
}
// natsOptions builds nats-go TLS options. Call Validate first.
func (f TLSFiles) natsOptions() ([]nats.Option, error) {
if !f.Enabled() {
return nil, nil
}
opts := []nats.Option{nats.Secure()}
if f.CA != "" {
opts = append(opts, nats.RootCAs(f.CA))
}
if f.Cert != "" {
opts = append(opts, nats.ClientCert(f.Cert, f.Key))
}
return opts, nil
}
// WithTLS configures CA and/or client certificate paths for the NATS connection.
func WithTLS(files TLSFiles) Option {
return func(c *connectConfig) {
c.tls = files
}
}

View File

@@ -1,25 +0,0 @@
package messaging_test
import (
"os"
"path/filepath"
"github.com/mudler/LocalAI/core/services/messaging"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("TLSFiles", func() {
It("requires cert and key together", func() {
Expect((messaging.TLSFiles{Cert: "/tmp/c.pem"}).Validate()).To(HaveOccurred())
Expect((messaging.TLSFiles{Key: "/tmp/k.pem"}).Validate()).To(HaveOccurred())
})
It("validates files exist", func() {
dir := GinkgoT().TempDir()
ca := filepath.Join(dir, "ca.pem")
Expect(os.WriteFile(ca, []byte("x"), 0600)).To(Succeed())
Expect((messaging.TLSFiles{CA: ca}).Validate()).To(Succeed())
})
})

View File

@@ -61,17 +61,6 @@ func StartFileTransferServerWithListener(lis net.Listener, stagingDir, modelsDir
return nil, fmt.Errorf("creating staging dir %s: %w", stagingDir, err)
}
// An empty token makes checkBearerToken fail open: every /v1/files,
// /v1/files-list and /v1/backend-logs request is served unauthenticated,
// granting read/write to the staging/models/data directories to anyone who
// can reach this port. Surface that loudly — the worker process does not
// run DistributedConfig.Validate(), so this is the only signal an operator
// gets. Set LOCALAI_REGISTRATION_TOKEN (and LOCALAI_REGISTRATION_REQUIRE_AUTH
// to fail closed) to protect it.
if token == "" {
xlog.Warn("HTTP file transfer server starting WITHOUT a registration token — read/write to models/staging/data is unauthenticated for anyone who can reach this port; set LOCALAI_REGISTRATION_TOKEN")
}
mux := http.NewServeMux()
// PUT /v1/files/{key} — upload file

View File

@@ -7,7 +7,6 @@ import (
"encoding/hex"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"os"
@@ -894,50 +893,3 @@ func sha256Hex(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
var _ = Describe("StartFileTransferServerWithListener", func() {
start := func(token string) (string, func()) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())
staging := GinkgoT().TempDir()
models := GinkgoT().TempDir()
data := GinkgoT().TempDir()
srv, err := StartFileTransferServerWithListener(lis, staging, models, data, token, 0)
Expect(err).NotTo(HaveOccurred())
base := "http://" + lis.Addr().String()
return base, func() { ShutdownFileTransferServer(srv) }
}
// Exercises the empty-token fail-open warning branch: the server serves
// file requests with no Authorization header at all.
It("serves unauthenticated when started without a token", func() {
base, stop := start("")
defer stop()
resp, err := http.Get(base + "/v1/files/missing.bin")
Expect(err).NotTo(HaveOccurred())
defer func() { _ = resp.Body.Close() }()
// No 401 — the empty token fails open. The file is absent so we get 404.
Expect(resp.StatusCode).To(Equal(http.StatusNotFound))
})
It("rejects requests without the bearer token when a token is set", func() {
base, stop := start("s3cret")
defer stop()
resp, err := http.Get(base + "/v1/files/missing.bin")
Expect(err).NotTo(HaveOccurred())
defer func() { _ = resp.Body.Close() }()
Expect(resp.StatusCode).To(Equal(http.StatusUnauthorized))
})
It("serves the unauthenticated health endpoints regardless of token", func() {
base, stop := start("s3cret")
defer stop()
resp, err := http.Get(base + "/healthz")
Expect(err).NotTo(HaveOccurred())
defer func() { _ = resp.Body.Close() }()
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})
})

View File

@@ -6,7 +6,6 @@ import (
"time"
"github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/LocalAI/pkg/grpc/grpcerrors"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/xlog"
ggrpc "google.golang.org/grpc"
@@ -65,95 +64,64 @@ func (c *InFlightTrackingClient) track(ctx context.Context) func() {
}
}
// reconcile self-heals stale routing: when a backend reports that the model is
// no longer loaded (the process survived but the model was evicted, while the
// registry still lists it as loaded), it drops the replica row so the next
// request triggers a fresh load instead of routing back here. Without this the
// model stays unreachable until the controller restarts. The original error is
// returned unchanged.
func (c *InFlightTrackingClient) reconcile(err error) error {
if !grpcerrors.IsModelNotLoaded(err) {
return err
}
rmCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if rmErr := c.registry.RemoveNodeModel(rmCtx, c.nodeID, c.modelName, c.replicaIndex); rmErr != nil {
xlog.Warn("Failed to drop stale replica after model-not-loaded",
"node", c.nodeID, "model", c.modelName, "replica", c.replicaIndex, "error", rmErr)
} else {
xlog.Warn("Backend reports model not loaded; dropped stale replica so the next request reloads",
"node", c.nodeID, "model", c.modelName, "replica", c.replicaIndex)
}
return err
}
// --- Tracked inference methods ---
func (c *InFlightTrackingClient) Predict(ctx context.Context, in *pb.PredictOptions, opts ...ggrpc.CallOption) (*pb.Reply, error) {
defer c.track(ctx)()
reply, err := c.Backend.Predict(ctx, in, opts...)
return reply, c.reconcile(err)
return c.Backend.Predict(ctx, in, opts...)
}
func (c *InFlightTrackingClient) PredictStream(ctx context.Context, in *pb.PredictOptions, f func(reply *pb.Reply), opts ...ggrpc.CallOption) error {
defer c.track(ctx)()
return c.reconcile(c.Backend.PredictStream(ctx, in, f, opts...))
return c.Backend.PredictStream(ctx, in, f, opts...)
}
func (c *InFlightTrackingClient) Embeddings(ctx context.Context, in *pb.PredictOptions, opts ...ggrpc.CallOption) (*pb.EmbeddingResult, error) {
defer c.track(ctx)()
res, err := c.Backend.Embeddings(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.Embeddings(ctx, in, opts...)
}
func (c *InFlightTrackingClient) GenerateImage(ctx context.Context, in *pb.GenerateImageRequest, opts ...ggrpc.CallOption) (*pb.Result, error) {
defer c.track(ctx)()
res, err := c.Backend.GenerateImage(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.GenerateImage(ctx, in, opts...)
}
func (c *InFlightTrackingClient) GenerateVideo(ctx context.Context, in *pb.GenerateVideoRequest, opts ...ggrpc.CallOption) (*pb.Result, error) {
defer c.track(ctx)()
res, err := c.Backend.GenerateVideo(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.GenerateVideo(ctx, in, opts...)
}
func (c *InFlightTrackingClient) TTS(ctx context.Context, in *pb.TTSRequest, opts ...ggrpc.CallOption) (*pb.Result, error) {
defer c.track(ctx)()
res, err := c.Backend.TTS(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.TTS(ctx, in, opts...)
}
func (c *InFlightTrackingClient) TTSStream(ctx context.Context, in *pb.TTSRequest, f func(reply *pb.Reply), opts ...ggrpc.CallOption) error {
defer c.track(ctx)()
return c.reconcile(c.Backend.TTSStream(ctx, in, f, opts...))
return c.Backend.TTSStream(ctx, in, f, opts...)
}
func (c *InFlightTrackingClient) SoundGeneration(ctx context.Context, in *pb.SoundGenerationRequest, opts ...ggrpc.CallOption) (*pb.Result, error) {
defer c.track(ctx)()
res, err := c.Backend.SoundGeneration(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.SoundGeneration(ctx, in, opts...)
}
func (c *InFlightTrackingClient) AudioTranscription(ctx context.Context, in *pb.TranscriptRequest, opts ...ggrpc.CallOption) (*pb.TranscriptResult, error) {
defer c.track(ctx)()
res, err := c.Backend.AudioTranscription(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.AudioTranscription(ctx, in, opts...)
}
func (c *InFlightTrackingClient) AudioTranscriptionStream(ctx context.Context, in *pb.TranscriptRequest, f func(chunk *pb.TranscriptStreamResponse), opts ...ggrpc.CallOption) error {
defer c.track(ctx)()
return c.reconcile(c.Backend.AudioTranscriptionStream(ctx, in, f, opts...))
return c.Backend.AudioTranscriptionStream(ctx, in, f, opts...)
}
func (c *InFlightTrackingClient) Detect(ctx context.Context, in *pb.DetectOptions, opts ...ggrpc.CallOption) (*pb.DetectResponse, error) {
defer c.track(ctx)()
res, err := c.Backend.Detect(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.Detect(ctx, in, opts...)
}
func (c *InFlightTrackingClient) Rerank(ctx context.Context, in *pb.RerankRequest, opts ...ggrpc.CallOption) (*pb.RerankResult, error) {
defer c.track(ctx)()
res, err := c.Backend.Rerank(ctx, in, opts...)
return res, c.reconcile(err)
return c.Backend.Rerank(ctx, in, opts...)
}

View File

@@ -20,17 +20,9 @@ type fakeInFlightTracker struct {
mu sync.Mutex
increments int
decrements int
removed int
incrementErr error
}
func (f *fakeInFlightTracker) RemoveNodeModel(_ context.Context, _, _ string, _ int) error {
f.mu.Lock()
defer f.mu.Unlock()
f.removed++
return nil
}
func (f *fakeInFlightTracker) IncrementInFlight(_ context.Context, _, _ string, _ int) error {
f.mu.Lock()
defer f.mu.Unlock()
@@ -303,33 +295,4 @@ var _ = Describe("InFlightTrackingClient", func() {
Expect(tracker.decrements).To(Equal(1))
})
})
Describe("stale model reload (self-heal)", func() {
It("removes the replica when the backend reports the model is not loaded", func() {
backend.predictErr = fmt.Errorf("parakeet-cpp: model not loaded")
_, err := client.Predict(context.Background(), &pb.PredictOptions{})
Expect(err).To(HaveOccurred())
Expect(tracker.removed).To(Equal(1))
})
It("keeps the replica on an unrelated error", func() {
backend.predictErr = fmt.Errorf("context deadline exceeded")
_, err := client.Predict(context.Background(), &pb.PredictOptions{})
Expect(err).To(HaveOccurred())
Expect(tracker.removed).To(Equal(0))
})
It("does not remove on success", func() {
_, err := client.Predict(context.Background(), &pb.PredictOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(tracker.removed).To(Equal(0))
})
It("self-heals on a streamed call too", func() {
backend.streamErr = fmt.Errorf("whisper: model not loaded")
err := client.PredictStream(context.Background(), &pb.PredictOptions{}, func(*pb.Reply) {})
Expect(err).To(HaveOccurred())
Expect(tracker.removed).To(Equal(1))
})
})
})

View File

@@ -78,9 +78,6 @@ type ModelLookup interface {
type InFlightTracker interface {
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
// RemoveNodeModel drops a stale replica row so the next request reloads the
// model instead of routing back to a node where it is no longer loaded.
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
}
// NodeManager is used by HTTP endpoints for node registration and lifecycle.

Some files were not shown because too many files have changed in this diff Show More