Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
8cf0448304 chore(deps): bump actions/upload-artifact from 4 to 7
Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 4 to 7.
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v4...v7)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '7'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-03 03:23:39 +00:00
81 changed files with 343 additions and 3882 deletions

View File

@@ -18,7 +18,7 @@ jobs:
if: ${{ github.actor != 'dependabot[bot]' }}
- name: Run Gosec Security Scanner
if: ${{ github.actor != 'dependabot[bot]' }}
uses: securego/gosec@v2.27.1
uses: securego/gosec@v2.22.9
with:
# we let the report trigger content trigger a failure using the GitHub Security features.
args: '-no-fail -fmt sarif -out results.sarif ./...'

View File

@@ -62,7 +62,7 @@ jobs:
PATH="$PATH:/root/go/bin" make --jobs 5 --output-sync=target test-coverage-check
- name: Upload coverage report
if: ${{ always() }}
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v7
with:
name: coverage-linux
path: |

View File

@@ -309,20 +309,13 @@ run-e2e-aio: protogen-go
@echo 'Running e2e AIO tests'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --flake-attempts $(TEST_FLAKES) -v -r ./tests/e2e-aio
# Distributed architecture e2e (PostgreSQL + NATS via testcontainers).
# Includes NatsJWT specs (JWT-enabled NATS). Requires Docker.
# VLLMMultinode is excluded here; use test-e2e-vllm-multinode for that.
test-e2e-distributed: protogen-go
@echo 'Running distributed e2e tests (label Distributed, incl. NatsJWT)'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --label-filter='Distributed && !VLLMMultinode' --flake-attempts $(TEST_FLAKES) -v -r ./tests/e2e/distributed
# vLLM multi-node DP smoke (CPU). Builds local-ai:tests and the
# cpu-vllm backend from the current working tree, then drives a
# head + headless follower via testcontainers-go and asserts a chat
# completion. BuildKit caches both images, so re-runs only rebuild
# what changed. The test lives under tests/e2e/distributed and is
# selected by the VLLMMultinode label so it doesn't run alongside
# test-e2e-distributed.
# the other distributed-suite tests by default.
test-e2e-vllm-multinode: docker-build-e2e extract-backend-vllm protogen-go
@echo 'Running e2e vLLM multi-node DP test'
LOCALAI_IMAGE=local-ai \

View File

@@ -537,15 +537,6 @@ message TTSRequest {
string dst = 3;
string voice = 4;
optional string language = 5;
// instructions is a free-form, per-request style/voice description (maps to
// the OpenAI `instructions` field). Backends that support expressive synthesis
// (e.g. Qwen3-TTS CustomVoice/VoiceDesign) prefer this over the static YAML
// option when set; backends that don't simply ignore it.
optional string instructions = 6;
// params carries optional, backend-specific per-request generation parameters
// (e.g. Chatterbox exaggeration/cfg_weight/temperature). Values are strings and
// coerced by the backend; unset leaves the backend's configured defaults.
map<string, string> params = 7;
}
message VADRequest {

View File

@@ -1,10 +1,10 @@
# ds4 backend Makefile.
#
# Upstream pin lives below as DS4_VERSION?=477c0e82e2699b35a65fd0a1ed6fe66b41087dfe
# Upstream pin lives below as DS4_VERSION?=ba00a8a88c4c5810a3d1fed6b7b8fa2b44b82fdc
# (.github/bump_deps.sh) can find and update it - matches the
# llama-cpp / ik-llama-cpp / turboquant convention.
DS4_VERSION?=477c0e82e2699b35a65fd0a1ed6fe66b41087dfe
DS4_VERSION?=ba00a8a88c4c5810a3d1fed6b7b8fa2b44b82fdc
DS4_REPO?=https://github.com/antirez/ds4
CURRENT_MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=94a220cd6745e6e3f8de62870b66fd5b9bc92700
LLAMA_VERSION?=5dcb71166686799f0d873eab7386234302d05ecf
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# CrispASR version (release tag)
CRISPASR_REPO?=https://github.com/CrispStrobe/CrispASR
CRISPASR_VERSION?=13d54e110e1538e0f0bc3af0680b9ab246cfb48d
CRISPASR_VERSION?=05e60432bcb5bc2113f8c395a41e86497c11504a
SO_TARGET?=libgocrispasr.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -1,6 +1,6 @@
# parakeet-cpp backend Makefile.
#
# Upstream pin lives below as PARAKEET_VERSION?=b11fe5bca78ad8b342dd559a43d76df3984bb447
# Upstream pin lives below as PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543
# (.github/bump_deps.sh) can find and update it - matches the
# whisper.cpp / ds4 / vibevoice-cpp convention.
#
@@ -15,7 +15,7 @@
# That's what the L0 smoke test uses. The default target below does the
# proper clone-at-pin + cmake build so CI doesn't need a side-checkout.
PARAKEET_VERSION?=b11fe5bca78ad8b342dd559a43d76df3984bb447
PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543
PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp
GOCMD?=go

View File

@@ -236,19 +236,10 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip
return pb.TranscriptResult{}, errors.New("parakeet-cpp: TranscriptRequest.dst (audio path) is required")
}
// Fallback when the batched C-API is unavailable: transcribe from a file
// path (original behavior, no batching). The C library's audio loader only
// understands 16 kHz mono WAV/PCM, so convert the input first - otherwise
// any non-WAV upload (MP3, etc.) fails with "failed to load audio". This
// mirrors what every other audio backend (whisper, crispasr) does via
// utils.AudioToWav before handing the file to the engine.
// Fallback when the batched C-API is unavailable: transcribe directly from
// the file path (original behavior, no batching).
if p.bat == nil {
converted, cleanup, err := convertToWavMono16k(opts.Dst)
if err != nil {
return pb.TranscriptResult{}, err
}
defer cleanup()
cstr := CppTranscribePathJSON(p.ctxPtr, converted, 0)
cstr := CppTranscribePathJSON(p.ctxPtr, opts.Dst, 0)
if cstr == 0 {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: transcribe_path_json failed: %s", CppLastError(p.ctxPtr))
}
@@ -469,33 +460,17 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
// float samples plus the clip duration in seconds. Mirrors the whisper
// backend: utils.AudioToWav (ffmpeg) normalises rate/channels, go-audio
// decodes the PCM.
// convertToWavMono16k converts an arbitrary audio file to a 16 kHz mono WAV in
// a fresh temp dir and returns the path together with a cleanup func the caller
// must defer. WAV inputs already at 16 kHz/mono/16-bit are passed through by
// utils.AudioToWav (hardlink/copy), everything else is transcoded via ffmpeg.
// Used by the direct (non-batched) transcription path, which hands a file path
// to the C library's WAV-only audio loader.
func convertToWavMono16k(path string) (string, func(), error) {
dir, err := os.MkdirTemp("", "parakeet")
if err != nil {
return "", func() {}, err
}
cleanup := func() { _ = os.RemoveAll(dir) }
converted := filepath.Join(dir, "converted.wav")
if err := utils.AudioToWav(path, converted); err != nil {
cleanup()
return "", func() {}, err
}
return converted, cleanup, nil
}
func decodeWavMono16k(path string) ([]float32, float32, error) {
converted, cleanup, err := convertToWavMono16k(path)
dir, err := os.MkdirTemp("", "parakeet")
if err != nil {
return nil, 0, err
}
defer cleanup()
defer func() { _ = os.RemoveAll(dir) }()
converted := filepath.Join(dir, "converted.wav")
if err := utils.AudioToWav(path, converted); err != nil {
return nil, 0, err
}
fh, err := os.Open(converted)
if err != nil {

View File

@@ -3,14 +3,11 @@ package main
import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"github.com/ebitengine/purego"
"github.com/go-audio/audio"
"github.com/go-audio/wav"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -73,24 +70,6 @@ func fixturesOrSkip() (string, string) {
return modelPath, audioPath
}
// writeMono16kWav writes `samples` frames of 16 kHz mono 16-bit silence to
// path. The result is already in AudioToWav's target format, so the conversion
// helper copies it through without invoking ffmpeg.
func writeMono16kWav(path string, samples int) {
GinkgoHelper()
f, err := os.Create(path)
Expect(err).ToNot(HaveOccurred())
enc := wav.NewEncoder(f, 16000, 16, 1, 1)
buf := &audio.IntBuffer{
Format: &audio.Format{NumChannels: 1, SampleRate: 16000},
SourceBitDepth: 16,
Data: make([]int, samples),
}
Expect(enc.Write(buf)).To(Succeed())
Expect(enc.Close()).To(Succeed())
Expect(f.Close()).To(Succeed())
}
var _ = Describe("ParakeetCpp", func() {
Context("AudioTranscription", func() {
It("transcribes a WAV via the parakeet C-API", func() {
@@ -141,39 +120,6 @@ var _ = Describe("ParakeetCpp", func() {
})
})
Context("convertToWavMono16k", func() {
// The non-batched transcription path hands a file path to the C
// library's WAV-only audio loader, so it must convert first.
// utils.AudioToWav passes an already-16kHz/mono/16-bit WAV through
// without ffmpeg, which lets us exercise the helper (and the
// regression: the direct path used to skip conversion entirely)
// without a model, the C library, or ffmpeg.
It("returns a decodable 16kHz mono WAV copy and cleans it up", func() {
dir := GinkgoT().TempDir()
src := filepath.Join(dir, "input.wav")
writeMono16kWav(src, 16000) // 1s of silence at 16 kHz
converted, cleanup, err := convertToWavMono16k(src)
Expect(err).ToNot(HaveOccurred())
// It must produce a fresh temp file, not return the original path.
Expect(converted).ToNot(Equal(src))
Expect(converted).To(BeAnExistingFile())
pcm, _, err := decodeWavMono16k(converted)
Expect(err).ToNot(HaveOccurred())
Expect(pcm).To(HaveLen(16000), "round-trips the sample count")
cleanup()
Expect(converted).ToNot(BeAnExistingFile(), "cleanup removes the temp dir")
})
It("errors on a non-existent input rather than passing the path through", func() {
_, _, err := convertToWavMono16k(filepath.Join(GinkgoT().TempDir(), "missing.mp3"))
Expect(err).To(HaveOccurred())
})
})
Context("AudioTranscriptionStream", func() {
It("streams deltas and a closing FinalResult from a cache-aware model", func() {
// Streaming needs a cache-aware streaming model (e.g.

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# qwen3-tts.cpp version
QWEN3TTS_REPO?=https://github.com/predict-woo/qwen3-tts.cpp
QWEN3TTS_CPP_VERSION?=136e5d36c17083da0321fd96512dc7b263f94a44
QWEN3TTS_CPP_VERSION?=7a762e2ad4bacc6fdda81d81bf10a09ffb546f29
SO_TARGET?=libgoqwen3ttscpp.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/mudler/LocalAI/pkg/grpc/base"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
@@ -22,43 +21,6 @@ type Qwen3TtsCpp struct {
threads int
}
// languageNameAliases maps common full language names to the canonical
// two-letter code understood by the C++ language_to_id table.
var languageNameAliases = map[string]string{
"english": "en",
"russian": "ru",
"chinese": "zh",
"japanese": "ja",
"korean": "ko",
"german": "de",
"french": "fr",
"spanish": "es",
"italian": "it",
"portuguese": "pt",
}
// normalizeLanguage coerces a caller-supplied language into the canonical code
// the model expects. It lowercases, trims, strips any region/locale suffix
// (en-US, en_US, ja.JP -> en/ja), and resolves common full names (english -> en).
// An empty input stays empty so the C++ side applies its English default; an
// unrecognized value is returned normalized so C++ can log it and default.
func normalizeLanguage(lang string) string {
lang = strings.ToLower(strings.TrimSpace(lang))
if lang == "" {
return ""
}
// Strip region/locale suffix: keep the segment before the first separator.
if i := strings.IndexAny(lang, "-_."); i >= 0 {
lang = lang[:i]
}
if code, ok := languageNameAliases[lang]; ok {
return code
}
return lang
}
func (q *Qwen3TtsCpp) Load(opts *pb.ModelOptions) error {
// ModelFile is the model directory path (containing GGUF files)
modelDir := opts.ModelFile
@@ -92,7 +54,7 @@ func (q *Qwen3TtsCpp) TTS(req *pb.TTSRequest) error {
dst := req.Dst
language := ""
if req.Language != nil {
language = normalizeLanguage(*req.Language)
language = *req.Language
}
// Synthesis parameters with sensible defaults

View File

@@ -1,53 +0,0 @@
package main
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestLanguageNormalization(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "qwen3-tts-cpp language normalization")
}
var _ = Describe("normalizeLanguage", func() {
DescribeTable("maps caller input to the canonical model language code",
func(input, expected string) {
Expect(normalizeLanguage(input)).To(Equal(expected))
},
// Canonical codes pass through unchanged
Entry("canonical en", "en", "en"),
Entry("canonical zh", "zh", "zh"),
Entry("canonical pt", "pt", "pt"),
// Case-insensitive
Entry("uppercase", "EN", "en"),
Entry("mixed case", "Ja", "ja"),
// Surrounding whitespace
Entry("trims whitespace", " en ", "en"),
// Region/locale stripping
Entry("BCP-47 region", "en-US", "en"),
Entry("underscore region", "en_US", "en"),
Entry("dotted locale", "ja.JP", "ja"),
Entry("region + case", "ZH-CN", "zh"),
// Full-name aliases
Entry("english name", "english", "en"),
Entry("chinese name cased", "Chinese", "zh"),
Entry("japanese name", "japanese", "ja"),
Entry("russian name", "russian", "ru"),
Entry("portuguese name", "portuguese", "pt"),
// Empty stays empty (C++ applies the English default)
Entry("empty", "", ""),
Entry("whitespace only", " ", ""),
// Unknown values pass through normalized so C++ can log + default
Entry("unknown code", "klingon", "klingon"),
Entry("unknown with region", "xx-YY", "xx"),
)
})

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=1f9ee88e09c258053fa59d5e05e23dfb10fa0b13
STABLEDIFFUSION_GGML_VERSION?=7948df8ac1070f5f6881b8d34675821893eb97d6
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=610e664ba7cfe3af46125ed1b5a1184fccb51bcd
WHISPER_CPP_VERSION?=23ee03506a91ac3d3f0071b40e66a430eebdfa1d
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -37,20 +37,6 @@ def is_int(s):
except ValueError:
return False
def coerce_param_value(value):
"""Coerce a TTSRequest.params value (string on the wire) to the type the
Chatterbox generate() kwargs expect (float/int/bool), matching how static
YAML options are coerced at load time. Non-string values pass through."""
if not isinstance(value, str):
return value
if is_float(value):
return float(value)
if is_int(value):
return int(value)
if value.lower() in ["true", "false"]:
return value.lower() == "true"
return value
def split_text_at_word_boundary(text, max_length=250):
"""
Split text at word boundaries without truncating words.
@@ -205,14 +191,6 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
# add options to kwargs
kwargs.update(self.options)
# Merge per-request params (TTSRequest.params), overriding the static
# YAML options. This exposes Chatterbox generation knobs (e.g.
# exaggeration, cfg_weight, temperature) per request. Values arrive as
# strings on the wire and are coerced to float/int/bool.
if hasattr(request, "params") and request.params:
for key, value in request.params.items():
kwargs[key] = coerce_param_value(value)
# Check if text exceeds 250 characters
# (chatterbox does not support long text)
# https://github.com/resemble-ai/chatterbox/issues/60

View File

@@ -47,26 +47,6 @@ def is_int(s):
return False
def coerce_param_value(value):
"""Coerce a string param value (from the TTSRequest.params map, which is
string-typed on the wire) into the most specific Python type the model
generation kwargs expect: bool, int, float, else the original string."""
if not isinstance(value, str):
return value
lowered = value.strip().lower()
if lowered in ("true", "false"):
return lowered == "true"
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
# If MAX_WORKERS are specified in the environment use it, otherwise default to 1
@@ -342,19 +322,6 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
return backend_pb2.Result(message="Model loaded successfully", success=True)
def _effective_instruct(self, request):
"""Resolve the instruction/style string for this request, preferring the
per-request TTSRequest.instructions value and falling back to the static
YAML `instruct` option. Empty string means "no instruction"."""
req_instruct = (
request.instructions
if hasattr(request, "instructions") and request.instructions
else ""
)
if req_instruct:
return req_instruct
return self.options.get("instruct", "") or ""
def _detect_mode(self, request):
"""Detect which mode to use based on request parameters."""
# Priority: VoiceClone > VoiceDesign > CustomVoice
@@ -371,8 +338,8 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
if self.audio_path or self.voices:
return "VoiceClone"
# VoiceDesign: instruct provided per-request or via YAML option
if self._effective_instruct(request):
# VoiceDesign: instruct option is provided
if "instruct" in self.options and self.options["instruct"]:
return "VoiceDesign"
# Default to CustomVoice
@@ -723,20 +690,10 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
if do_sample is not None:
generation_kwargs["do_sample"] = do_sample
# Prefer the per-request instruction (TTSRequest.instructions) over the
# static YAML `instruct` option. This lets clients set a different style
# (CustomVoice emotion) or designed voice (VoiceDesign) per request.
instruct = self._effective_instruct(request)
instruct = self.options.get("instruct", "")
if instruct is not None and instruct != "":
generation_kwargs["instruct"] = instruct
# Merge any per-request backend-specific params (TTSRequest.params).
# Values arrive as strings on the wire; coerce to int/float/bool so the
# model receives the types it expects. These override YAML-derived kwargs.
if hasattr(request, "params") and request.params:
for key, value in request.params.items():
generation_kwargs[key] = coerce_param_value(value)
# Generate audio based on mode
if mode == "VoiceClone":
# VoiceClone mode

View File

@@ -1,4 +1,4 @@
grpcio==1.81.0
grpcio==1.80.0
protobuf==7.35.0
certifi
setuptools

View File

@@ -1,4 +1,4 @@
grpcio==1.81.0
grpcio==1.80.0
protobuf
certifi
setuptools

View File

@@ -102,12 +102,7 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
xlog.Info("Distributed instance", "id", cfg.Distributed.InstanceID)
// Connect to NATS
natsAuth := cfg.Distributed.NatsAuthConfig()
if natsAuth.RequireAuth && (natsAuth.ServiceUserJWT == "" || natsAuth.ServiceUserSeed == "") {
return nil, fmt.Errorf("LOCALAI_NATS_REQUIRE_AUTH requires LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED")
}
natsOpts := cfg.Distributed.NatsMessagingOptions("", "")
natsClient, err := messaging.New(cfg.Distributed.NatsURL, natsOpts...)
natsClient, err := messaging.New(cfg.Distributed.NatsURL)
if err != nil {
return nil, fmt.Errorf("connecting to NATS: %w", err)
}

View File

@@ -123,14 +123,14 @@ var _ = Describe("X-LocalAI-Node ctx propagation contract", func() {
})
It("ModelTTS forwards the request context to the SmartRouter", func() {
_, _, err := backend.ModelTTS(reqCtx, "hello", "", "", "", nil, loader, appCfg, modelCfg)
_, _, err := backend.ModelTTS(reqCtx, "hello", "", "", loader, appCfg, modelCfg)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("router short-circuit (test)"))
stampViaRouterCtx()
})
It("ModelTTSStream forwards the request context to the SmartRouter", func() {
err := backend.ModelTTSStream(reqCtx, "hello", "", "", "", nil, loader, appCfg, modelCfg, func([]byte) error { return nil })
err := backend.ModelTTSStream(reqCtx, "hello", "", "", loader, appCfg, modelCfg, func([]byte) error { return nil })
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("router short-circuit (test)"))
stampViaRouterCtx()

View File

@@ -20,32 +20,11 @@ import (
"github.com/mudler/LocalAI/pkg/utils"
)
// newTTSRequest assembles the gRPC TTSRequest from the per-request inputs. The
// optional instructions string is only attached when non-empty so backends can
// distinguish "no per-request instruction" (fall back to YAML) from an explicit
// empty one. params is forwarded as-is (nil when unset).
func newTTSRequest(text, modelPath, voice, dst, language, instructions string, params map[string]string) *proto.TTSRequest {
req := &proto.TTSRequest{
Text: text,
Model: modelPath,
Voice: voice,
Dst: dst,
Language: &language,
Params: params,
}
if instructions != "" {
req.Instructions = &instructions
}
return req
}
func ModelTTS(
ctx context.Context,
text,
voice,
language,
instructions string,
params map[string]string,
language string,
loader *model.ModelLoader,
appConfig *config.ApplicationConfig,
modelConfig config.ModelConfig,
@@ -95,9 +74,13 @@ func ModelTTS(
startTime = time.Now()
}
ttsRequest := newTTSRequest(text, modelPath, voice, filePath, language, instructions, params)
res, err := ttsModel.TTS(ctx, ttsRequest)
res, err := ttsModel.TTS(ctx, &proto.TTSRequest{
Text: text,
Model: modelPath,
Voice: voice,
Dst: filePath,
Language: &language,
})
if appConfig.EnableTracing {
errStr := ""
@@ -145,9 +128,7 @@ func ModelTTSStream(
ctx context.Context,
text,
voice,
language,
instructions string,
params map[string]string,
language string,
loader *model.ModelLoader,
appConfig *config.ApplicationConfig,
modelConfig config.ModelConfig,
@@ -196,10 +177,12 @@ func ModelTTSStream(
var totalPCMBytes int
snippetCapped := false
// Streaming TTS writes to the HTTP response, not a file, so dst is empty.
ttsRequest := newTTSRequest(text, modelPath, voice, "", language, instructions, params)
err = ttsModel.TTSStream(ctx, ttsRequest, func(reply *proto.Reply) {
err = ttsModel.TTSStream(ctx, &proto.TTSRequest{
Text: text,
Model: modelPath,
Voice: voice,
Language: &language,
}, func(reply *proto.Reply) {
// First message contains sample rate info
if !headerSent && len(reply.Message) > 0 {
var info map[string]any

View File

@@ -1,42 +0,0 @@
package backend
// Specs for the TTSRequest assembly that carries the per-request
// instructions/params from the OpenAI `instructions` field (and the LocalAI
// `params` extension) through to the gRPC boundary. Before this plumbing the
// instruction value was dropped before reaching the backend; these specs pin
// that it now survives, and that the empty case stays backward compatible.
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("newTTSRequest", func() {
It("attaches the instructions when a per-request value is set", func() {
req := newTTSRequest("hi", "/m", "alloy", "/out.wav", "en", "cheerful narrator", nil)
Expect(req.Instructions).ToNot(BeNil())
Expect(req.GetInstructions()).To(Equal("cheerful narrator"))
Expect(req.GetText()).To(Equal("hi"))
Expect(req.GetVoice()).To(Equal("alloy"))
Expect(req.GetDst()).To(Equal("/out.wav"))
Expect(req.GetLanguage()).To(Equal("en"))
})
It("leaves instructions unset when empty so backends fall back to YAML", func() {
req := newTTSRequest("hi", "/m", "", "/out.wav", "", "", nil)
Expect(req.Instructions).To(BeNil())
Expect(req.GetInstructions()).To(Equal(""))
})
It("forwards per-request params through to the backend", func() {
params := map[string]string{"exaggeration": "0.7", "cfg_weight": "0.3"}
req := newTTSRequest("hi", "/m", "", "/out.wav", "", "", params)
Expect(req.GetParams()).To(HaveKeyWithValue("exaggeration", "0.7"))
Expect(req.GetParams()).To(HaveKeyWithValue("cfg_weight", "0.3"))
})
It("leaves params nil when none are supplied", func() {
req := newTTSRequest("hi", "/m", "", "/out.wav", "", "", nil)
Expect(req.GetParams()).To(BeNil())
})
})

View File

@@ -52,15 +52,6 @@ type AgentWorkerCMD struct {
Subject string `env:"LOCALAI_AGENT_SUBJECT" default:"agent.execute" help:"NATS subject for agent execution" group:"distributed"`
Queue string `env:"LOCALAI_AGENT_QUEUE" default:"agent-workers" help:"NATS queue group name" group:"distributed"`
NatsJWT string `env:"LOCALAI_NATS_JWT" help:"NATS user JWT override (defaults to nats_jwt from registration)" group:"distributed"`
NatsUserSeed string `env:"LOCALAI_NATS_USER_SEED" help:"NATS user seed override (defaults to nats_user_seed from registration)" group:"distributed"`
NatsServiceJWT string `env:"LOCALAI_NATS_SERVICE_JWT" help:"Fallback NATS service JWT when registration does not mint agent JWT" group:"distributed"`
NatsServiceSeed string `env:"LOCALAI_NATS_SERVICE_SEED" help:"Fallback NATS service seed paired with LOCALAI_NATS_SERVICE_JWT" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT+seed to connect" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI)" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
// Timeouts
MCPCIJobTimeout string `env:"LOCALAI_MCP_CI_JOB_TIMEOUT" default:"10m" help:"Timeout for MCP CI job execution" group:"distributed"`
}
@@ -90,30 +81,15 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
registrationBody["token"] = cmd.RegistrationToken
}
// Context cancelled on shutdown — used by registration waits, heartbeat, and
// other background goroutines.
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()
// Acquire credentials via (re)registration. When the bus requires auth and no
// static fallback is configured, wait through admin approval until the
// frontend mints credentials rather than starting unauthenticated.
credMgr := workerregistry.NewNATSCredentialManager(
func(ctx context.Context) (*workerregistry.RegisterResponse, error) {
return regClient.RegisterFull(ctx, registrationBody)
},
cmd.NatsRequireAuth && cmd.NatsJWT == "" && cmd.NatsServiceJWT == "",
)
res, err := credMgr.Acquire(shutdownCtx)
nodeID, apiToken, err := regClient.RegisterWithRetry(context.Background(), registrationBody, 10)
if err != nil {
return fmt.Errorf("registration failed: %w", err)
}
nodeID := res.ID
xlog.Info("Registered with frontend", "nodeID", nodeID, "frontend", cmd.RegisterTo)
// Use provisioned API token if none was set
if cmd.APIToken == "" {
cmd.APIToken = res.APIToken
cmd.APIToken = apiToken
}
// Start heartbeat
@@ -122,40 +98,14 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
xlog.Warn("invalid heartbeat interval, using default 10s", "input", cmd.HeartbeatInterval, "error", err)
}
heartbeatInterval = cmp.Or(heartbeatInterval, 10*time.Second)
// Context cancelled on shutdown — used by heartbeat and other background goroutines
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()
go regClient.HeartbeatLoop(shutdownCtx, nodeID, heartbeatInterval, func() map[string]any { return map[string]any{} })
// Resolve NATS credentials with precedence: explicit env override, then
// frontend-minted (auto-refreshed before expiry), then service fallback.
// Each static source must supply JWT and seed together.
natsTLS := messaging.TLSFiles{CA: cmd.NatsTLSCA, Cert: cmd.NatsTLSCert, Key: cmd.NatsTLSKey}
var natsOpts []messaging.Option
switch {
case cmd.NatsJWT != "" || cmd.NatsUserSeed != "":
if (cmd.NatsJWT == "") != (cmd.NatsUserSeed == "") {
return fmt.Errorf("LOCALAI_NATS_JWT and LOCALAI_NATS_USER_SEED must be set together")
}
natsOpts = append(natsOpts, messaging.WithUserJWT(cmd.NatsJWT, cmd.NatsUserSeed))
case credMgr.HasCredentials():
natsOpts = append(natsOpts, messaging.WithUserJWTProvider(credMgr.Provider()))
go func() {
if err := credMgr.RefreshLoop(shutdownCtx); err != nil {
xlog.Error("NATS credential refresh permanently failed; shutting down agent worker", "error", err)
shutdownCancel()
}
}()
case cmd.NatsServiceJWT != "" || cmd.NatsServiceSeed != "":
if (cmd.NatsServiceJWT == "") != (cmd.NatsServiceSeed == "") {
return fmt.Errorf("LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED must be set together")
}
natsOpts = append(natsOpts, messaging.WithUserJWT(cmd.NatsServiceJWT, cmd.NatsServiceSeed))
case cmd.NatsRequireAuth:
return fmt.Errorf("NATS JWT+seed required: enable frontend minting or set LOCALAI_NATS_* env vars")
}
if natsTLS.Enabled() {
natsOpts = append(natsOpts, messaging.WithTLS(natsTLS))
}
natsClient, err := messaging.New(cmd.NatsURL, natsOpts...)
// Connect to NATS
natsClient, err := messaging.New(cmd.NatsURL)
if err != nil {
return fmt.Errorf("connecting to NATS: %w", err)
}
@@ -233,25 +183,17 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
xlog.Info("Agent worker ready, waiting for jobs", "subject", cmd.Subject, "queue", cmd.Queue)
// Wait for an OS signal or an internal fatal condition (e.g. NATS
// credentials became unrenewable), so the worker restarts and re-acquires
// rather than lingering unable to serve.
// Wait for shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
var runErr error
select {
case <-sigCh:
case <-shutdownCtx.Done():
runErr = fmt.Errorf("agent worker shutting down: NATS credentials unavailable")
xlog.Error("Internal shutdown requested", "error", runErr)
}
<-sigCh
xlog.Info("Shutting down agent worker")
shutdownCancel() // stop heartbeat loop immediately
dispatcher.Stop()
mcpTools.CloseAllMCPSessions()
regClient.GracefulDeregister(nodeID)
return runErr
return nil
}
// handleMCPToolRequest handles a NATS request-reply for MCP tool execution.

View File

@@ -159,14 +159,6 @@ type RunCMD struct {
DistributedPrefixCacheTTL string `env:"LOCALAI_DISTRIBUTED_PREFIX_CACHE_TTL" help:"Idle-timeout for prefix-cache index entries; also drives the background eviction cadence (every TTL/2). Default 5m." group:"distributed"`
BackendInstallTimeout string `env:"LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT" help:"NATS round-trip timeout for backend.install requests sent to worker nodes (default 15m). Increase for slow links pulling multi-GB images." group:"distributed"`
BackendUpgradeTimeout string `env:"LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT" help:"NATS round-trip timeout for backend.upgrade requests (default 15m)." group:"distributed"`
NatsAccountSeed string `env:"LOCALAI_NATS_ACCOUNT_SEED" help:"NATS account signing seed (SU...) used to mint per-node worker JWTs at registration" group:"distributed"`
NatsServiceJWT string `env:"LOCALAI_NATS_SERVICE_JWT" help:"NATS user JWT for the frontend (and agent workers) to publish control-plane messages" group:"distributed"`
NatsServiceSeed string `env:"LOCALAI_NATS_SERVICE_SEED" help:"NATS user signing seed (SU...) paired with LOCALAI_NATS_SERVICE_JWT" group:"distributed"`
NatsWorkerJWTTTL string `env:"LOCALAI_NATS_WORKER_JWT_TTL" help:"Lifetime of minted per-node NATS JWTs (e.g. 24h, default 24h)" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT credentials (service JWT + account seed) when distributed mode is enabled" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI); use with tls:// in --nats-url" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
ExposeNodeHeader bool `env:"LOCALAI_EXPOSE_NODE_HEADER" default:"false" help:"Set the X-LocalAI-Node response header on inference responses (OpenAI chat/completions/embeddings, Anthropic /v1/messages, Ollama /api/chat,/api/generate,/api/embed) with the ID of the worker that served the request. Disabled by default: the node ID reveals internal topology and should not be exposed on a public endpoint. Best-effort: under heavy concurrency the header may reflect a recent routing decision rather than this exact request's." group:"distributed"`
Version bool
@@ -291,34 +283,6 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if r.RegistrationToken != "" {
opts = append(opts, config.WithRegistrationToken(r.RegistrationToken))
}
if r.NatsAccountSeed != "" {
opts = append(opts, config.WithNatsAccountSeed(r.NatsAccountSeed))
}
if r.NatsServiceJWT != "" {
opts = append(opts, config.WithNatsServiceJWT(r.NatsServiceJWT))
}
if r.NatsServiceSeed != "" {
opts = append(opts, config.WithNatsServiceSeed(r.NatsServiceSeed))
}
if r.NatsWorkerJWTTTL != "" {
d, err := time.ParseDuration(r.NatsWorkerJWTTTL)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_WORKER_JWT_TTL %q: %w", r.NatsWorkerJWTTTL, err)
}
opts = append(opts, config.WithNatsWorkerJWTTTL(d))
}
if r.NatsRequireAuth {
opts = append(opts, config.EnableNatsRequireAuth)
}
if r.NatsTLSCA != "" {
opts = append(opts, config.WithNatsTLSCA(r.NatsTLSCA))
}
if r.NatsTLSCert != "" {
opts = append(opts, config.WithNatsTLSCert(r.NatsTLSCert))
}
if r.NatsTLSKey != "" {
opts = append(opts, config.WithNatsTLSKey(r.NatsTLSKey))
}
if r.AutoApproveNodes {
opts = append(opts, config.EnableAutoApproveNodes)
}

View File

@@ -62,7 +62,7 @@ func (t *TTSCMD) Run(ctx *cliContext.Context) error {
options.Backend = t.Backend
options.Model = t.Model
filePath, _, err := backend.ModelTTS(context.Background(), text, t.Voice, t.Language, "", nil, ml, opts, options)
filePath, _, err := backend.ModelTTS(context.Background(), text, t.Voice, t.Language, ml, opts, options)
if err != nil {
return err
}

View File

@@ -96,7 +96,7 @@ func (r *VLLMDistributed) Run(ctx *cliContext.Context) error {
FrontendURL: r.RegisterTo,
RegistrationToken: r.RegistrationToken,
}
nodeID, _, _, _, regErr := regClient.RegisterWithRetry(context.Background(), r.registrationBody(), 10)
nodeID, _, regErr := regClient.RegisterWithRetry(context.Background(), r.registrationBody(), 10)
if regErr != nil {
return fmt.Errorf("registering with frontend: %w", regErr)
}

View File

@@ -58,77 +58,65 @@ func (c *RegistrationClient) setAuth(req *http.Request) {
// RegisterResponse is the JSON body returned by /api/node/register.
type RegisterResponse struct {
ID string `json:"id"`
Status string `json:"status,omitempty"` // "pending" until an admin approves the node
APIToken string `json:"api_token,omitempty"`
NatsJWT string `json:"nats_jwt,omitempty"`
NatsUserSeed string `json:"nats_user_seed,omitempty"`
ID string `json:"id"`
APIToken string `json:"api_token,omitempty"`
}
// RegisterFull sends a single registration request and returns the full
// response (node ID, approval status, and optional API token / NATS creds).
// Re-registration is idempotent: the frontend preserves the node row and mints
// a fresh NATS JWT each call, so this doubles as the credential-refresh call.
func (c *RegistrationClient) RegisterFull(ctx context.Context, body map[string]any) (*RegisterResponse, error) {
// Register sends a single registration request and returns the node ID and
// (optionally) an auto-provisioned API token.
func (c *RegistrationClient) Register(ctx context.Context, body map[string]any) (string, string, error) {
jsonBody, _ := json.Marshal(body)
url := c.baseURL() + "/api/node/register"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
return "", "", fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
c.setAuth(req)
resp, err := c.httpClient().Do(req)
if err != nil {
return nil, fmt.Errorf("posting to %s: %w", url, err)
return "", "", fmt.Errorf("posting to %s: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("registration failed with status %d", resp.StatusCode)
return "", "", fmt.Errorf("registration failed with status %d", resp.StatusCode)
}
var result RegisterResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decoding response: %w", err)
return "", "", fmt.Errorf("decoding response: %w", err)
}
return &result, nil
}
// Register sends a single registration request and returns the node ID and
// optional credentials (API token for agent workers, NATS JWT when configured).
func (c *RegistrationClient) Register(ctx context.Context, body map[string]any) (nodeID, apiToken, natsJWT, natsSeed string, err error) {
res, err := c.RegisterFull(ctx, body)
if err != nil {
return "", "", "", "", err
}
return res.ID, res.APIToken, res.NatsJWT, res.NatsUserSeed, nil
return result.ID, result.APIToken, nil
}
// RegisterWithRetry retries registration with exponential backoff.
func (c *RegistrationClient) RegisterWithRetry(ctx context.Context, body map[string]any, maxRetries int) (nodeID, apiToken, natsJWT, natsSeed string, err error) {
func (c *RegistrationClient) RegisterWithRetry(ctx context.Context, body map[string]any, maxRetries int) (string, string, error) {
backoff := 2 * time.Second
maxBackoff := 30 * time.Second
var nodeID, apiToken string
var err error
for attempt := 1; attempt <= maxRetries; attempt++ {
nodeID, apiToken, natsJWT, natsSeed, err = c.Register(ctx, body)
nodeID, apiToken, err = c.Register(ctx, body)
if err == nil {
return nodeID, apiToken, natsJWT, natsSeed, nil
return nodeID, apiToken, nil
}
if attempt == maxRetries {
return "", "", "", "", fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
return "", "", fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}
xlog.Warn("Registration failed, retrying", "attempt", attempt, "next_retry", backoff, "error", err)
select {
case <-ctx.Done():
return "", "", "", "", ctx.Err()
return "", "", ctx.Err()
case <-time.After(backoff):
}
backoff = min(backoff*2, maxBackoff)
}
return nodeID, apiToken, natsJWT, natsSeed, err
return nodeID, apiToken, err
}
// Heartbeat sends a single heartbeat POST with the given body.

View File

@@ -1,200 +0,0 @@
package workerregistry
import (
"context"
"fmt"
"sync"
"time"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/mudler/xlog"
)
// statusPending mirrors nodes.StatusPending. It is duplicated rather than
// imported so the lightweight registration client does not pull in the nodes
// package (and its gorm/DB dependencies).
const statusPending = "pending"
// defaultMaxAttempts bounds how many times Acquire registers (and how many
// consecutive times RefreshLoop may fail) before giving up. It is high enough
// to ride out a slow admin approval or a transient frontend outage, but finite
// so an unauthorized/unapprovable worker exits and surfaces the problem (via a
// non-zero exit and the resulting restart) rather than waiting forever.
const defaultMaxAttempts = 100
// RegisterFunc performs one idempotent registration round-trip.
type RegisterFunc func(ctx context.Context) (*RegisterResponse, error)
// NATSCredentialManager acquires NATS credentials at startup — waiting through
// admin approval when required — and refreshes them before the minted JWT
// expires, by re-registering (which mints a fresh JWT). The live NATS
// connection adopts a refreshed JWT on its next reconnect via Provider. Safe
// for concurrent use.
//
// It addresses two failure modes: a worker that needs credentials but registers
// while still pending approval (it would otherwise give up and never connect),
// and a long-running worker whose 24h JWT expires with no way to renew it.
type NATSCredentialManager struct {
register RegisterFunc
requireCreds bool // block until credentials are present (frontend minting in use)
// Tunables; defaults set by NewNATSCredentialManager, overridable in tests.
initialBackoff time.Duration
maxBackoff time.Duration
maxAttempts int // bound on Acquire attempts / consecutive refresh failures (<=0 = unlimited)
refreshLead float64 // refresh once this fraction of the JWT lifetime has elapsed
refreshRetry time.Duration
expiryOf func(jwt string) (time.Time, bool)
mu sync.RWMutex
jwt string
seed string
nodeID string
}
// NewNATSCredentialManager builds a manager over register. When requireCreds is
// true, Acquire blocks until the node is approved and credentials are minted.
func NewNATSCredentialManager(register RegisterFunc, requireCreds bool) *NATSCredentialManager {
return &NATSCredentialManager{
register: register,
requireCreds: requireCreds,
initialBackoff: 2 * time.Second,
maxBackoff: 30 * time.Second,
maxAttempts: defaultMaxAttempts,
refreshLead: 0.75,
refreshRetry: 30 * time.Second,
expiryOf: jwtExpiry,
}
}
// jwtExpiry decodes the expiry of a minted user JWT. ok is false when the token
// is empty/undecodable or carries no expiry (e.g. a non-expiring service JWT).
func jwtExpiry(token string) (time.Time, bool) {
if token == "" {
return time.Time{}, false
}
uc, err := natsauth.DecodeUserClaims(token)
if err != nil || uc.Expires == 0 {
return time.Time{}, false
}
return time.Unix(uc.Expires, 0), true
}
func (m *NATSCredentialManager) store(res *RegisterResponse) {
m.mu.Lock()
defer m.mu.Unlock()
m.nodeID = res.ID
if res.NatsJWT != "" && res.NatsUserSeed != "" {
m.jwt, m.seed = res.NatsJWT, res.NatsUserSeed
}
}
// Current returns the latest NATS credentials (both empty until acquired).
func (m *NATSCredentialManager) Current() (jwt, seed string) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.jwt, m.seed
}
// NodeID returns the node ID from the most recent registration.
func (m *NATSCredentialManager) NodeID() string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.nodeID
}
// Provider returns a callback compatible with messaging.WithUserJWTProvider,
// supplying the current credentials on each (re)connect.
func (m *NATSCredentialManager) Provider() func() (string, string) {
return m.Current
}
// HasCredentials reports whether complete NATS credentials have been obtained.
func (m *NATSCredentialManager) HasCredentials() bool {
jwt, seed := m.Current()
return jwt != "" && seed != ""
}
// Acquire registers and, when requireCreds is set, keeps re-registering with
// exponential backoff until the node is approved (status != pending) and
// credentials are minted. Without requireCreds it returns the first successful
// response (the historical one-shot behavior, preserved for anonymous NATS).
func (m *NATSCredentialManager) Acquire(ctx context.Context) (*RegisterResponse, error) {
backoff := m.initialBackoff
var lastReason error
for attempt := 1; m.maxAttempts <= 0 || attempt <= m.maxAttempts; attempt++ {
res, err := m.register(ctx)
switch {
case err != nil:
lastReason = err
xlog.Warn("Registration failed, retrying", "attempt", attempt, "next_retry", backoff, "error", err)
case !m.requireCreds:
m.store(res)
return res, nil
case res.Status == statusPending:
lastReason = fmt.Errorf("node %s still pending admin approval", res.ID)
xlog.Info("Node pending admin approval; waiting", "node", res.ID, "attempt", attempt, "next_retry", backoff)
case res.NatsJWT == "" || res.NatsUserSeed == "":
lastReason = fmt.Errorf("node %s approved but NATS credentials not minted", res.ID)
xlog.Info("Node approved but NATS credentials not yet minted; waiting", "node", res.ID, "attempt", attempt, "next_retry", backoff)
default:
m.store(res)
return res, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(backoff):
}
backoff = min(backoff*2, m.maxBackoff)
}
return nil, fmt.Errorf("giving up acquiring NATS credentials after %d attempts: %w", m.maxAttempts, lastReason)
}
// RefreshLoop re-registers to mint a fresh JWT before the current one expires,
// updating the credentials returned by Current/Provider so the NATS connection
// adopts them on its next reconnect. It returns nil when ctx is cancelled or
// when the current credential has no expiry (nothing to refresh), and a non-nil
// error after maxAttempts consecutive refresh failures — letting the caller
// exit the worker so it restarts and re-acquires (or surfaces the outage)
// rather than silently drifting toward an expired, unrenewable JWT.
func (m *NATSCredentialManager) RefreshLoop(ctx context.Context) error {
failures := 0
for {
jwt, _ := m.Current()
exp, ok := m.expiryOf(jwt)
if !ok {
xlog.Debug("NATS credential has no expiry; refresh loop exiting")
return nil
}
wait := max(time.Duration(float64(time.Until(exp))*m.refreshLead), 0)
select {
case <-ctx.Done():
return nil
case <-time.After(wait):
}
res, err := m.register(ctx)
if err == nil && res.NatsJWT != "" && res.NatsUserSeed != "" {
m.store(res)
failures = 0
xlog.Info("Refreshed NATS credentials", "node", res.ID)
continue
}
failures++
if err != nil {
xlog.Warn("NATS credential refresh failed; will retry", "attempt", failures, "error", err)
} else {
xlog.Warn("NATS credential refresh returned no credentials; will retry", "attempt", failures)
}
if m.maxAttempts > 0 && failures >= m.maxAttempts {
return fmt.Errorf("NATS credential refresh failed %d times in a row", failures)
}
// Back off before retrying so a persistent failure near expiry does not spin.
select {
case <-ctx.Done():
return nil
case <-time.After(m.refreshRetry):
}
}
}

View File

@@ -1,198 +0,0 @@
package workerregistry
import (
"context"
"sync"
"testing"
"time"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/nats-io/nkeys"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestWorkerRegistry(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "WorkerRegistry")
}
// fakeRegister returns a sequence of canned responses/errors, one per call, and
// records how many times it was invoked. The last entry repeats once exhausted.
type fakeRegister struct {
mu sync.Mutex
steps []step
calls int
}
type step struct {
res *RegisterResponse
err error
}
func (f *fakeRegister) fn() RegisterFunc {
return func(context.Context) (*RegisterResponse, error) {
f.mu.Lock()
defer f.mu.Unlock()
i := f.calls
f.calls++
if i >= len(f.steps) {
i = len(f.steps) - 1
}
return f.steps[i].res, f.steps[i].err
}
}
func (f *fakeRegister) count() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.calls
}
var _ = Describe("NATSCredentialManager", func() {
approved := func(jwt, seed string) *RegisterResponse {
return &RegisterResponse{ID: "node-1", Status: "healthy", NatsJWT: jwt, NatsUserSeed: seed}
}
pending := &RegisterResponse{ID: "node-1", Status: "pending"}
Describe("Acquire (#4 — wait through admin approval)", func() {
It("keeps re-registering until the node is approved and credentials are minted", func() {
f := &fakeRegister{steps: []step{
{res: pending}, // not approved yet
{res: approved("", "")}, // approved but JWT not minted yet
{res: approved("jwt-1", "seed-1")}, // finally minted
}}
m := NewNATSCredentialManager(f.fn(), true /* requireCreds */)
m.initialBackoff = time.Millisecond
m.maxBackoff = time.Millisecond
res, err := m.Acquire(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(res.ID).To(Equal("node-1"))
Expect(f.count()).To(Equal(3))
jwt, seed := m.Current()
Expect(jwt).To(Equal("jwt-1"))
Expect(seed).To(Equal("seed-1"))
Expect(m.HasCredentials()).To(BeTrue())
Expect(m.NodeID()).To(Equal("node-1"))
})
It("returns immediately on the first success when credentials are not required (anonymous NATS)", func() {
f := &fakeRegister{steps: []step{{res: pending}}}
m := NewNATSCredentialManager(f.fn(), false /* requireCreds */)
res, err := m.Acquire(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(res.Status).To(Equal("pending"))
Expect(f.count()).To(Equal(1))
Expect(m.HasCredentials()).To(BeFalse())
})
It("aborts when the context is cancelled while waiting for approval", func() {
f := &fakeRegister{steps: []step{{res: pending}}}
m := NewNATSCredentialManager(f.fn(), true)
m.initialBackoff = 10 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := m.Acquire(ctx)
Expect(err).To(MatchError(context.Canceled))
})
It("gives up after a bounded number of attempts so the worker exits and alerts", func() {
f := &fakeRegister{steps: []step{{res: pending}}} // never approved
m := NewNATSCredentialManager(f.fn(), true)
m.initialBackoff = time.Millisecond
m.maxBackoff = time.Millisecond
m.maxAttempts = 5
_, err := m.Acquire(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("after 5 attempts"))
Expect(err.Error()).To(ContainSubstring("pending admin approval"))
Expect(f.count()).To(Equal(5))
})
})
Describe("RefreshLoop (#5 — renew before the JWT expires)", func() {
It("re-registers before expiry and updates the credentials served to new connections", func() {
f := &fakeRegister{steps: []step{{res: approved("jwt-2", "seed-2")}}}
m := NewNATSCredentialManager(f.fn(), true)
m.refreshLead = 0.5
m.refreshRetry = time.Millisecond
// jwt-1 expires soon; jwt-2 is long-lived so the loop then idles.
m.expiryOf = func(jwt string) (time.Time, bool) {
switch jwt {
case "jwt-1":
return time.Now().Add(40 * time.Millisecond), true
case "jwt-2":
return time.Now().Add(time.Hour), true
default:
return time.Time{}, false
}
}
m.store(approved("jwt-1", "seed-1"))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = m.RefreshLoop(ctx) }()
Eventually(func() string {
jwt, _ := m.Current()
return jwt
}, "2s", "10ms").Should(Equal("jwt-2"))
})
It("returns an error after the bounded number of consecutive failures so the caller can exit", func() {
f := &fakeRegister{steps: []step{{err: context.DeadlineExceeded}}} // refresh always fails
m := NewNATSCredentialManager(f.fn(), true)
m.refreshLead = 0.5
m.refreshRetry = time.Millisecond
m.maxAttempts = 3
m.expiryOf = func(string) (time.Time, bool) { return time.Now().Add(time.Millisecond), true }
m.store(approved("jwt-1", "seed-1"))
errCh := make(chan error, 1)
go func() { errCh <- m.RefreshLoop(context.Background()) }()
Eventually(errCh, "2s").Should(Receive(MatchError(ContainSubstring("3 times in a row"))))
})
It("exits promptly when the current credential has no expiry (nothing to refresh)", func() {
f := &fakeRegister{steps: []step{{res: approved("x", "y")}}}
m := NewNATSCredentialManager(f.fn(), true)
m.expiryOf = func(string) (time.Time, bool) { return time.Time{}, false }
m.store(approved("static", "seed"))
done := make(chan struct{})
go func() { _ = m.RefreshLoop(context.Background()); close(done) }()
Eventually(done, "1s").Should(BeClosed())
Expect(f.count()).To(Equal(0)) // never tried to re-register
})
})
Describe("jwtExpiry default", func() {
It("decodes the expiry of a real minted worker JWT", func() {
akp, err := nkeys.CreateAccount()
Expect(err).ToNot(HaveOccurred())
seed, err := akp.Seed()
Expect(err).ToNot(HaveOccurred())
cfg := natsauth.Config{AccountSeed: string(seed), WorkerJWTTTL: time.Hour}
token, _, err := cfg.MintWorkerJWT("node-1", "backend")
Expect(err).ToNot(HaveOccurred())
exp, ok := jwtExpiry(token)
Expect(ok).To(BeTrue())
Expect(exp).To(BeTemporally("~", time.Now().Add(time.Hour), 2*time.Minute))
})
It("reports no expiry for an empty or undecodable token", func() {
_, ok := jwtExpiry("")
Expect(ok).To(BeFalse())
_, ok = jwtExpiry("not-a-jwt")
Expect(ok).To(BeFalse())
})
})
})

View File

@@ -22,11 +22,9 @@ const (
UsecaseRerank = "rerank"
UsecaseDetection = "detection"
UsecaseVAD = "vad"
UsecaseAudioTransform = "audio_transform"
UsecaseDiarization = "diarization"
UsecaseRealtimeAudio = "realtime_audio"
UsecaseFaceRecognition = "face_recognition"
UsecaseSpeakerRecognition = "speaker_recognition"
UsecaseAudioTransform = "audio_transform"
UsecaseDiarization = "diarization"
UsecaseRealtimeAudio = "realtime_audio"
)
// GRPCMethod identifies a Backend service RPC from backend.proto.
@@ -49,11 +47,6 @@ const (
MethodAudioTransform GRPCMethod = "AudioTransform"
MethodDiarize GRPCMethod = "Diarize"
MethodAudioToAudioStream GRPCMethod = "AudioToAudioStream"
MethodFaceVerify GRPCMethod = "FaceVerify"
MethodFaceAnalyze GRPCMethod = "FaceAnalyze"
MethodVoiceVerify GRPCMethod = "VoiceVerify"
MethodVoiceEmbed GRPCMethod = "VoiceEmbed"
MethodVoiceAnalyze GRPCMethod = "VoiceAnalyze"
)
// UsecaseInfo describes a single known_usecase value and how it maps
@@ -161,16 +154,6 @@ var UsecaseInfoMap = map[string]UsecaseInfo{
GRPCMethod: MethodAudioToAudioStream,
Description: "Self-contained any-to-any audio model for the Realtime API — accepts microphone audio and emits speech + transcript (+ optional function calls) from a single backend via the AudioToAudioStream RPC.",
},
UsecaseFaceRecognition: {
Flag: FLAG_FACE_RECOGNITION,
GRPCMethod: MethodFaceVerify,
Description: "Face recognition — verify identity, analyze attributes (age/gender/emotion) via FaceVerify and FaceAnalyze RPCs.",
},
UsecaseSpeakerRecognition: {
Flag: FLAG_SPEAKER_RECOGNITION,
GRPCMethod: MethodVoiceVerify,
Description: "Speaker recognition — verify identity, embed and analyze voice via VoiceVerify, VoiceEmbed and VoiceAnalyze RPCs.",
},
}
// BackendCapability describes which gRPC methods and usecases a backend supports.
@@ -488,21 +471,6 @@ var BackendCapabilities = map[string]BackendCapability{
DefaultUsecases: []string{UsecaseDetection},
Description: "RF-DETR C++ object detection",
},
// --- Face and speaker recognition backends ---
"insightface": {
GRPCMethods: []GRPCMethod{MethodEmbedding, MethodDetect, MethodFaceVerify, MethodFaceAnalyze},
PossibleUsecases: []string{UsecaseEmbeddings, UsecaseDetection, UsecaseFaceRecognition},
DefaultUsecases: []string{UsecaseFaceRecognition},
AcceptsImages: true,
Description: "InsightFace — face detection, embedding, verification and attribute analysis",
},
"speaker-recognition": {
GRPCMethods: []GRPCMethod{MethodVoiceVerify, MethodVoiceEmbed, MethodVoiceAnalyze},
PossibleUsecases: []string{UsecaseSpeakerRecognition},
DefaultUsecases: []string{UsecaseSpeakerRecognition},
Description: "Speaker recognition — voice identity verification and analysis",
},
"silero-vad": {
GRPCMethods: []GRPCMethod{MethodVAD},
PossibleUsecases: []string{UsecaseVAD},

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/mudler/xlog"
)
@@ -20,16 +18,6 @@ type DistributedConfig struct {
RegistrationToken string // --registration-token / LOCALAI_REGISTRATION_TOKEN (required token for node registration)
AutoApproveNodes bool // --auto-approve-nodes / LOCALAI_AUTO_APPROVE_NODES (skip admin approval for new workers)
// NATS JWT auth (optional; see pkg/natsauth and docs/features/distributed-mode.md)
NatsAccountSeed string // LOCALAI_NATS_ACCOUNT_SEED — account signing seed to mint per-node worker JWTs
NatsServiceJWT string // LOCALAI_NATS_SERVICE_JWT — user JWT for frontends / agent workers
NatsServiceSeed string // LOCALAI_NATS_SERVICE_SEED — signing seed paired with service JWT
NatsWorkerJWTTTL time.Duration // LOCALAI_NATS_WORKER_JWT_TTL — minted worker JWT lifetime (default 24h)
NatsRequireAuth bool // LOCALAI_NATS_REQUIRE_AUTH — fail startup if NATS credentials are missing
NatsTLSCA string // LOCALAI_NATS_TLS_CA — PEM file for private CA (server verify)
NatsTLSCert string // LOCALAI_NATS_TLS_CERT — client cert for NATS mTLS
NatsTLSKey string // LOCALAI_NATS_TLS_KEY — client key paired with NatsTLSCert
// S3 configuration (used when StorageURL is set)
StorageBucket string // --storage-bucket / LOCALAI_STORAGE_BUCKET
StorageRegion string // --storage-region / LOCALAI_STORAGE_REGION
@@ -92,13 +80,6 @@ func (c DistributedConfig) Validate() error {
if c.RegistrationToken == "" {
xlog.Warn("distributed mode running without registration token — node endpoints are unprotected")
}
if err := c.NatsAuthConfig().Validate(); err != nil {
return err
}
if err := c.NatsTLSFiles().Validate(); err != nil {
return err
}
c.NatsAuthConfig().WarnIfInsecure(true)
// Check for negative durations
for name, d := range map[string]time.Duration{
FlagMCPToolTimeout: c.MCPToolTimeout,
@@ -142,52 +123,6 @@ func WithRegistrationToken(token string) AppOption {
}
}
func WithNatsAccountSeed(seed string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsAccountSeed = seed
}
}
func WithNatsServiceJWT(jwt string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsServiceJWT = jwt
}
}
func WithNatsServiceSeed(seed string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsServiceSeed = seed
}
}
func WithNatsWorkerJWTTTL(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsWorkerJWTTTL = d
}
}
var EnableNatsRequireAuth = func(o *ApplicationConfig) {
o.Distributed.NatsRequireAuth = true
}
func WithNatsTLSCA(path string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsTLSCA = path
}
}
func WithNatsTLSCert(path string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsTLSCert = path
}
}
func WithNatsTLSKey(path string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsTLSKey = path
}
}
func WithStorageURL(url string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.StorageURL = url
@@ -282,44 +217,6 @@ const (
// DefaultMaxUploadSize is the default maximum upload body size (50 GB).
const DefaultMaxUploadSize int64 = 50 << 30
// NatsTLSFiles returns NATS TLS/mTLS PEM paths for the messaging client.
func (c DistributedConfig) NatsTLSFiles() messaging.TLSFiles {
return messaging.TLSFiles{
CA: c.NatsTLSCA,
Cert: c.NatsTLSCert,
Key: c.NatsTLSKey,
}
}
// NatsMessagingOptions builds messaging client options (JWT + TLS) for distributed components.
// Pass explicit userJWT/userSeed when set (e.g. worker overrides); empty uses service JWT from config.
func (c DistributedConfig) NatsMessagingOptions(userJWT, userSeed string) []messaging.Option {
var opts []messaging.Option
jwt, seed := userJWT, userSeed
if jwt == "" && seed == "" {
auth := c.NatsAuthConfig()
jwt, seed = auth.ServiceUserJWT, auth.ServiceUserSeed
}
if jwt != "" && seed != "" {
opts = append(opts, messaging.WithUserJWT(jwt, seed))
}
if tls := c.NatsTLSFiles(); tls.Enabled() {
opts = append(opts, messaging.WithTLS(tls))
}
return opts
}
// NatsAuthConfig builds pkg/natsauth settings from distributed configuration.
func (c DistributedConfig) NatsAuthConfig() natsauth.Config {
return natsauth.Config{
AccountSeed: c.NatsAccountSeed,
ServiceUserJWT: c.NatsServiceJWT,
ServiceUserSeed: c.NatsServiceSeed,
WorkerJWTTTL: c.NatsWorkerJWTTTL,
RequireAuth: c.NatsRequireAuth,
}
}
// BackendInstallTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendInstallTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendInstallTimeout, DefaultBackendInstallTimeout)

View File

@@ -277,34 +277,6 @@ func DefaultRegistry() map[string]FieldMetaOverride {
AutocompleteProvider: ProviderModelsVAD,
Order: 63,
},
"pipeline.disable_thinking": {
Section: "pipeline",
Label: "Disable Thinking",
Description: "Suppress reasoning/thinking output from the pipeline LLM (sets enable_thinking=false on the underlying model). Use for models that emit <think> blocks you don't want spoken or streamed back to the realtime client.",
Component: "toggle",
Order: 64,
},
"pipeline.streaming.llm": {
Section: "pipeline",
Label: "Stream LLM",
Description: "Stream LLM tokens to the realtime client as they are generated instead of waiting for the full response. Emits incremental response.output_audio_transcript.delta / text deltas.",
Component: "toggle",
Order: 65,
},
"pipeline.streaming.tts": {
Section: "pipeline",
Label: "Stream TTS",
Description: "Stream synthesized audio chunks to the realtime client as they are produced (requires a TTS backend that implements TTSStream). Falls back to unary synthesis otherwise.",
Component: "toggle",
Order: 66,
},
"pipeline.streaming.transcription": {
Section: "pipeline",
Label: "Stream Transcription",
Description: "Stream partial transcription text to the realtime client as the STT backend produces it (requires a transcription backend that implements AudioTranscriptionStream). Falls back to unary transcription otherwise.",
Component: "toggle",
Order: 67,
},
// --- Functions ---
"function.grammar.parallel_calls": {

View File

@@ -487,39 +487,6 @@ type Pipeline struct {
LLM string `yaml:"llm,omitempty" json:"llm,omitempty"`
Transcription string `yaml:"transcription,omitempty" json:"transcription,omitempty"`
VAD string `yaml:"vad,omitempty" json:"vad,omitempty"`
// Streaming opts each pipeline stage into incremental delivery (LLM tokens,
// TTS audio chunks, transcription text). Unset stages keep the blocking
// unary path, so existing configs are unaffected.
Streaming PipelineStreaming `yaml:"streaming,omitempty" json:"streaming,omitempty"`
// DisableThinking suppresses reasoning/thinking for the pipeline LLM (maps
// to enable_thinking=false backend metadata) without editing the underlying
// LLM model config. Unset leaves the LLM model config in charge.
DisableThinking *bool `yaml:"disable_thinking,omitempty" json:"disable_thinking,omitempty"`
}
// @Description PipelineStreaming toggles incremental delivery per realtime stage.
type PipelineStreaming struct {
LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"`
TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"`
Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"`
}
// StreamLLM reports whether LLM tokens should be streamed for this pipeline.
func (p Pipeline) StreamLLM() bool { return p.Streaming.LLM != nil && *p.Streaming.LLM }
// StreamTTS reports whether TTS audio should be streamed for this pipeline.
func (p Pipeline) StreamTTS() bool { return p.Streaming.TTS != nil && *p.Streaming.TTS }
// StreamTranscription reports whether transcription text should be streamed.
func (p Pipeline) StreamTranscription() bool {
return p.Streaming.Transcription != nil && *p.Streaming.Transcription
}
// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off.
func (p Pipeline) ThinkingDisabled() bool {
return p.DisableThinking != nil && *p.DisableThinking
}
// @Description File configuration for model downloads

View File

@@ -1,54 +0,0 @@
package config
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gopkg.in/yaml.v3"
)
// The realtime pipeline can stream each stage (LLM tokens, TTS audio,
// transcription text) and can disable model "thinking" for the LLM. These are
// opt-in per pipeline; everything defaults to off so existing configs keep the
// unary behaviour.
var _ = Describe("Pipeline streaming config", func() {
It("defaults every streaming + thinking helper to false when unset", func() {
var p Pipeline
Expect(p.StreamLLM()).To(BeFalse())
Expect(p.StreamTTS()).To(BeFalse())
Expect(p.StreamTranscription()).To(BeFalse())
Expect(p.ThinkingDisabled()).To(BeFalse())
})
It("parses the nested streaming block and disable_thinking from YAML", func() {
var c ModelConfig
err := yaml.Unmarshal([]byte(`
name: gpt-realtime
pipeline:
llm: my-llm
tts: my-tts
transcription: my-stt
streaming:
llm: true
tts: true
transcription: true
disable_thinking: true
`), &c)
Expect(err).ToNot(HaveOccurred())
Expect(c.Pipeline.StreamLLM()).To(BeTrue())
Expect(c.Pipeline.StreamTTS()).To(BeTrue())
Expect(c.Pipeline.StreamTranscription()).To(BeTrue())
Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue())
})
It("treats an explicit false in the streaming block as disabled", func() {
var c ModelConfig
err := yaml.Unmarshal([]byte(`
name: gpt-realtime
pipeline:
streaming:
tts: false
`), &c)
Expect(err).ToNot(HaveOccurred())
Expect(c.Pipeline.StreamTTS()).To(BeFalse())
})
})

View File

@@ -420,9 +420,8 @@ func API(application *application.Application) (*echo.Echo, error) {
remoteUnloader = d.Router.Unloader()
}
}
natsCfg := distCfg.NatsAuthConfig()
routes.RegisterNodeSelfServiceRoutes(e, registry, distCfg.RegistrationToken, distCfg.AutoApproveNodes, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, natsCfg)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, application.GalleryService(), opcache, application.ApplicationConfig(), adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken, natsCfg)
routes.RegisterNodeSelfServiceRoutes(e, registry, distCfg.RegistrationToken, distCfg.AutoApproveNodes, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, application.GalleryService(), opcache, application.ApplicationConfig(), adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)
// Distributed SSE routes (job progress + agent events via NATS)
if d := application.Distributed(); d != nil {

View File

@@ -37,7 +37,7 @@ func TTSEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig
xlog.Debug("elevenlabs TTS request received", "modelName", input.ModelID)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Text, voiceID, input.LanguageCode, "", nil, ml, appConfig, *cfg)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Text, voiceID, input.LanguageCode, ml, appConfig, *cfg)
if err != nil {
return err
}

View File

@@ -28,7 +28,6 @@ import (
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/LocalAI/pkg/httpclient"
"github.com/mudler/LocalAI/pkg/natsauth"
)
// nodeError builds a schema.ErrorResponse for node endpoints.
@@ -90,7 +89,7 @@ type RegisterNodeRequest struct {
// RegisterNodeEndpoint registers a new backend node.
// expectedToken is the registration token configured on the frontend (may be empty to disable auth).
// autoApprove controls whether new nodes go directly to "healthy" or require admin approval.
func RegisterNodeEndpoint(registry *nodes.NodeRegistry, expectedToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string, natsCfg natsauth.Config) echo.HandlerFunc {
func RegisterNodeEndpoint(registry *nodes.NodeRegistry, expectedToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string) echo.HandlerFunc {
return func(c echo.Context) error {
var req RegisterNodeRequest
if err := c.Bind(&req); err != nil {
@@ -218,15 +217,13 @@ func RegisterNodeEndpoint(registry *nodes.NodeRegistry, expectedToken string, au
}
}
attachNatsJWT(response, node, natsCfg)
return c.JSON(http.StatusCreated, response)
}
}
// ApproveNodeEndpoint approves a pending node, setting its status to healthy.
// For agent workers, it also provisions an API key so they can call the inference API.
func ApproveNodeEndpoint(registry *nodes.NodeRegistry, authDB *gorm.DB, hmacSecret string, natsCfg natsauth.Config) echo.HandlerFunc {
func ApproveNodeEndpoint(registry *nodes.NodeRegistry, authDB *gorm.DB, hmacSecret string) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context()
id := c.Param("id")
@@ -256,26 +253,10 @@ func ApproveNodeEndpoint(registry *nodes.NodeRegistry, authDB *gorm.DB, hmacSecr
}
}
attachNatsJWT(response, node, natsCfg)
return c.JSON(http.StatusOK, response)
}
}
// attachNatsJWT adds a per-node NATS user JWT to a register/approve response when minting is enabled.
func attachNatsJWT(response map[string]any, node *nodes.BackendNode, natsCfg natsauth.Config) {
if !natsCfg.CanMintWorkers() || node == nil || node.Status == nodes.StatusPending {
return
}
jwt, seed, err := natsCfg.MintWorkerJWT(node.ID, node.NodeType)
if err != nil {
xlog.Warn("Failed to mint NATS JWT for node", "node", node.Name, "id", node.ID, "error", err)
return
}
response["nats_jwt"] = jwt
response["nats_user_seed"] = seed
}
// provisionAgentWorkerKey creates a dedicated user and API key for an agent worker node.
// Returns the plaintext API key on success.
func provisionAgentWorkerKey(ctx context.Context, authDB *gorm.DB, registry *nodes.NodeRegistry, node *nodes.BackendNode, hmacSecret string) (string, error) {

View File

@@ -12,8 +12,6 @@ import (
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/testutil"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/nats-io/nkeys"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -65,7 +63,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusCreated))
@@ -76,29 +74,6 @@ var _ = Describe("Node HTTP handlers", func() {
Expect(resp["status"]).To(Equal(nodes.StatusHealthy))
})
It("returns nats_jwt when account seed is configured", func() {
akp, err := nkeys.CreateAccount()
Expect(err).ToNot(HaveOccurred())
seed, err := akp.Seed()
Expect(err).ToNot(HaveOccurred())
e := echo.New()
body := `{"name":"worker-nats","address":"10.0.0.2:50051"}`
req := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(body))
req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
natsCfg := natsauth.Config{AccountSeed: string(seed)}
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusCreated))
var resp map[string]any
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
Expect(resp["nats_jwt"]).ToNot(BeEmpty())
})
It("returns 400 when name is missing", func() {
e := echo.New()
body := `{"address":"10.0.0.1:50051"}`
@@ -107,7 +82,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -127,7 +102,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -146,7 +121,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -165,7 +140,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
@@ -184,7 +159,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "correct-token", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "correct-token", true, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusUnauthorized))
})
@@ -197,7 +172,7 @@ var _ = Describe("Node HTTP handlers", func() {
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
handler := RegisterNodeEndpoint(registry, "", false, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", false, nil, "")
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusCreated))
@@ -220,7 +195,7 @@ var _ = Describe("Node HTTP handlers", func() {
req1 := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(body1))
req1.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
rec1 := httptest.NewRecorder()
handler := RegisterNodeEndpoint(registry, "", true, nil, "", natsauth.Config{})
handler := RegisterNodeEndpoint(registry, "", true, nil, "")
Expect(handler(e.NewContext(req1, rec1))).To(Succeed())
Expect(rec1.Code).To(Equal(http.StatusCreated))

View File

@@ -59,7 +59,7 @@ func TTSEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig
c.Response().Header().Set("Connection", "keep-alive")
// Stream audio chunks as they're generated
err := backend.ModelTTSStream(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, input.Instructions, input.Params, ml, appConfig, *cfg, func(audioChunk []byte) error {
err := backend.ModelTTSStream(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, ml, appConfig, *cfg, func(audioChunk []byte) error {
_, writeErr := c.Response().Write(audioChunk)
if writeErr != nil {
return writeErr
@@ -75,7 +75,7 @@ func TTSEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig
}
// Non-streaming TTS (existing behavior)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, input.Instructions, input.Params, ml, appConfig, *cfg)
filePath, _, err := backend.ModelTTS(c.Request().Context(), input.Input, cfg.Voice, cfg.Language, ml, appConfig, *cfg)
if err != nil {
return err
}

View File

@@ -235,12 +235,6 @@ type Model interface {
Transcribe(ctx context.Context, audio, language string, translate bool, diarize bool, prompt string) (*schema.TranscriptionResult, error)
Predict(ctx context.Context, messages schema.Messages, images, videos, audios []string, tokenCallback func(string, backend.TokenUsage) bool, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, logprobs *int, topLogprobs *int, logitBias map[string]float64) (func() (backend.LLMResponse, error), error)
TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error)
// TTSStream synthesizes speech incrementally, invoking onAudio with raw PCM
// chunks (and the backend sample rate) as they are produced.
TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error
// TranscribeStream transcribes audio incrementally, invoking onDelta for each
// transcript text fragment and returning the final aggregated result.
TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error)
PredictConfig() *config.ModelConfig
}
@@ -1260,15 +1254,27 @@ func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Co
// TODO: If we have a real any-to-any model then transcription is optional
var transcript string
if session.InputAudioTranscription != nil {
// emitTranscription streams transcript deltas when
// pipeline.streaming.transcription is set, otherwise emits a single
// completed event; either way it returns the final transcript text.
var err error
transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name())
tr, err := session.ModelInterface.Transcribe(ctx, f.Name(), session.InputAudioTranscription.Language, false, false, session.InputAudioTranscription.Prompt)
if err != nil {
sendError(t, "transcription_failed", err.Error(), "", "event_TODO")
return
} else if tr == nil {
sendError(t, "transcription_failed", "trancribe result is nil", "", "event_TODO")
return
}
transcript = tr.Text
sendEvent(t, types.ConversationItemInputAudioTranscriptionCompletedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},
ItemID: generateItemID(),
// ResponseID: "resp_TODO", // Not needed for transcription completed event
// OutputIndex: 0,
ContentIndex: 0,
Transcript: transcript,
})
} else {
sendNotImplemented(t, "any-to-any models")
return
@@ -1496,26 +1502,6 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
},
})
// Streamed LLM path: when the pipeline opts into LLM streaming, stream the
// transcript to the client as it is generated and synthesize the buffered
// message once. Tool turns are supported only when the model uses its
// tokenizer template: the C++ autoparser then delivers content and tool
// calls via ChatDeltas (clearing the text stream), so the spoken transcript
// never leaks tool-call tokens. Grammar-based function calling emits the
// call as JSON in the token stream, so those turns keep the buffered path.
if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() {
canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate
var respMods []types.Modality
if overrides != nil {
respMods = overrides.OutputModalities
}
if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) {
if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) {
return
}
}
}
predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil)
if err != nil {
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here
@@ -1593,7 +1579,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
// ExtractReasoningWithConfig is a no-op when no tag pair matches,
// so it's safe to apply unconditionally in the no-reasoning branch.
if deltaReasoning == "" && deltaContent != "" {
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, config.ReasoningConfig)
}
reasoningText = deltaReasoning
responseWithoutReasoning = deltaContent
@@ -1601,7 +1587,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
cleanedResponse = deltaContent
toolCalls = deltaToolCalls
} else {
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, config.ReasoningConfig)
textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig)
cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig)
toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig)
@@ -1727,7 +1713,64 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
return
}
// Transcript of the spoken reply (the audio's text).
audioFilePath, res, err := session.ModelInterface.TTS(ctx, finalSpeech, session.Voice, session.InputAudioTranscription.Language)
if err != nil {
if ctx.Err() != nil {
xlog.Debug("TTS cancelled (barge-in)")
sendCancelledResponse()
return
}
xlog.Error("TTS failed", "error", err)
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
return
}
if !res.Success {
xlog.Error("TTS failed", "message", res.Message)
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %s", res.Message), "", item.Assistant.ID)
return
}
defer func() { _ = os.Remove(audioFilePath) }()
audioBytes, err := os.ReadFile(audioFilePath)
if err != nil {
xlog.Error("failed to read TTS file", "error", err)
sendError(t, "tts_error", fmt.Sprintf("Failed to read TTS audio: %v", err), "", item.Assistant.ID)
return
}
// Parse WAV header to get raw PCM and the actual sample rate from the TTS backend.
pcmData, ttsSampleRate := laudio.ParseWAV(audioBytes)
if ttsSampleRate == 0 {
ttsSampleRate = localSampleRate
}
xlog.Debug("TTS audio parsed", "raw_bytes", len(audioBytes), "pcm_bytes", len(pcmData), "sample_rate", ttsSampleRate)
// SendAudio (WebRTC) passes PCM at the TTS sample rate directly to the
// Opus encoder, which resamples to 48kHz internally. This avoids a
// lossy intermediate resample through 16kHz.
// XXX: This is a noop in websocket mode; it's included in the JSON instead
if err := t.SendAudio(ctx, pcmData, ttsSampleRate); err != nil {
if ctx.Err() != nil {
xlog.Debug("Audio playback cancelled (barge-in)")
sendCancelledResponse()
return
}
xlog.Error("failed to send audio via transport", "error", err)
}
// For WebSocket clients, resample to the session's output rate and
// deliver audio as base64 in JSON events. WebRTC clients already
// received audio over the RTP track, so skip the base64 payload.
if !isWebRTC {
wsPCM := pcmData
if ttsSampleRate != session.OutputSampleRate {
samples := sound.BytesToInt16sLE(pcmData)
resampled := sound.ResampleInt16(samples, ttsSampleRate, session.OutputSampleRate)
wsPCM = sound.Int16toBytesLE(resampled)
}
audioString = base64.StdEncoding.EncodeToString(wsPCM)
}
sendEvent(t, types.ResponseOutputAudioTranscriptDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
@@ -1745,26 +1788,15 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
Transcript: finalSpeech,
})
// Synthesize and send the audio. With pipeline.streaming.tts enabled
// emitSpeech forwards a response.output_audio.delta per backend PCM
// chunk as it's produced; otherwise it sends the whole utterance as a
// single delta. The returned PCM is stored (base64) on the item below.
pcmAudio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech)
if err != nil {
if ctx.Err() != nil {
xlog.Debug("TTS cancelled (barge-in)")
sendCancelledResponse()
return
}
xlog.Error("TTS failed", "error", err)
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
return
}
if !isWebRTC {
audioString = base64.StdEncoding.EncodeToString(pcmAudio)
}
if !isWebRTC {
sendEvent(t, types.ResponseOutputAudioDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Delta: audioString,
})
sendEvent(t, types.ResponseOutputAudioDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
@@ -1817,27 +1849,17 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
})
}
// Emit the parsed tool calls, the terminal response.done, and (for
// server-side assistant tools) the follow-up response. Shared with the
// streamed path so both finalize tool calls identically.
emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn)
}
// emitToolCallItems emits the realtime function_call items for the parsed tool
// calls, the terminal response.done, and — for server-side LocalAI Assistant
// tools — re-triggers a follow-up response so the model can speak the result.
// hasContent shifts the tool-call output index past the assistant content item
// when the same turn also produced spoken/text content. Two tool paths:
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side;
// we append both the call and its output to conv.Items and re-trigger. The
// client only sees observability events.
// - All other tools follow the standard OpenAI flow: emit
// function_call_arguments.done and wait for the client to send
// conversation.item.create back.
func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) {
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls))
// Handle Tool Calls. Two paths:
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run
// server-side; we append both the call and its output to conv.Items
// and re-trigger a follow-up response so the model can speak the
// result. The client only sees observability events.
// - All other tools follow the standard OpenAI flow: emit
// function_call_arguments.done and wait for the client to send
// conversation.item.create back.
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls))
executedAssistantTool := false
for i, tc := range toolCalls {
for i, tc := range finalToolCalls {
toolCallID := generateItemID()
callID := "call_" + generateUniqueID() // OpenAI uses call_xyz
@@ -1857,7 +1879,7 @@ func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation
conv.Lock.Unlock()
outputIndex := i
if hasContent {
if finalSpeech != "" {
outputIndex++
}

View File

@@ -1,138 +0,0 @@
package openai
import (
"context"
"strings"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/grpc/proto"
)
// fakeTransport records the server events and audio sent to a realtime client
// so streaming behaviour can be asserted without a real WebSocket/WebRTC peer.
// It is not a *WebRTCTransport, so handler code takes the WebSocket path.
type fakeTransport struct {
events []types.ServerEvent
audio []fakeAudioChunk
}
type fakeAudioChunk struct {
pcm []byte
sampleRate int
}
func (f *fakeTransport) SendEvent(e types.ServerEvent) error {
f.events = append(f.events, e)
return nil
}
func (f *fakeTransport) ReadEvent() ([]byte, error) { return nil, nil }
func (f *fakeTransport) SendAudio(_ context.Context, pcm []byte, sampleRate int) error {
f.audio = append(f.audio, fakeAudioChunk{pcm: pcm, sampleRate: sampleRate})
return nil
}
func (f *fakeTransport) Close() error { return nil }
// countEvents returns how many recorded events have the given type.
func (f *fakeTransport) countEvents(et types.ServerEventType) int {
n := 0
for _, e := range f.events {
if e.ServerEventType() == et {
n++
}
}
return n
}
// transcriptDeltaText concatenates the Delta of every recorded transcript
// delta event — i.e. the text streamed to the client as it is generated.
func (f *fakeTransport) transcriptDeltaText() string {
var b strings.Builder
for _, e := range f.events {
if d, ok := e.(types.ResponseOutputAudioTranscriptDeltaEvent); ok {
b.WriteString(d.Delta)
}
}
return b.String()
}
// fakeModel is a configurable Model double. TTSStream replays ttsStreamChunks
// and TranscribeStream replays transcribeDeltas, so the handler's streaming
// paths can be driven deterministically.
type fakeModel struct {
cfg *config.ModelConfig
ttsFile string
ttsStreamChunks [][]byte
ttsStreamRate int
ttsStreamErr error
transcribeDeltas []string
transcribeFinal *schema.TranscriptionResult
// Predict streaming: predictTokens are replayed through the token callback
// (simulating streamed LLM output); predictResp/predictErr are returned by
// the deferred predict function. predictChunkDeltas, when set, are delivered
// per-token via TokenUsage.ChatDeltas to exercise the autoparser path.
predictTokens []string
predictChunkDeltas [][]*proto.ChatDelta
predictResp backend.LLMResponse
predictErr error
}
func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) {
return nil, nil
}
func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, string) (*schema.TranscriptionResult, error) {
return m.transcribeFinal, nil
}
func (m *fakeModel) Predict(_ context.Context, _ schema.Messages, _, _, _ []string, cb func(string, backend.TokenUsage) bool, _ []types.ToolUnion, _ *types.ToolChoiceUnion, _, _ *int, _ map[string]float64) (func() (backend.LLMResponse, error), error) {
if m.predictErr != nil {
return nil, m.predictErr
}
return func() (backend.LLMResponse, error) {
for i, tok := range m.predictTokens {
if cb == nil {
continue
}
usage := backend.TokenUsage{}
if i < len(m.predictChunkDeltas) {
usage.ChatDeltas = m.predictChunkDeltas[i]
}
cb(tok, usage)
}
return m.predictResp, nil
}, nil
}
func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) {
return m.ttsFile, &proto.Result{Success: true}, nil
}
func (m *fakeModel) TTSStream(_ context.Context, _, _, _ string, onAudio func(pcm []byte, sampleRate int) error) error {
if m.ttsStreamErr != nil {
return m.ttsStreamErr
}
for _, c := range m.ttsStreamChunks {
if err := onAudio(c, m.ttsStreamRate); err != nil {
return err
}
}
return nil
}
func (m *fakeModel) TranscribeStream(_ context.Context, _, _ string, _, _ bool, _ string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
for _, d := range m.transcribeDeltas {
onDelta(d)
}
return m.transcribeFinal, nil
}
func (m *fakeModel) PredictConfig() *config.ModelConfig { return m.cfg }

View File

@@ -3,7 +3,6 @@ package openai
import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
@@ -45,10 +44,10 @@ type wrappedModel struct {
// deps in. nil-safe: with classifierRegistry == nil the per-turn
// routing block in Predict is skipped, preserving today's "one LLM
// for the whole session" behaviour.
routerDeps *middleware.ClassifierDeps
routerStore router.DecisionStore
routerSessionID string
routerUserID string
routerDeps *middleware.ClassifierDeps
routerStore router.DecisionStore
routerSessionID string
routerUserID string
}
// anyToAnyModel represent a model which supports Any-to-Any operations
@@ -88,14 +87,6 @@ func (m *transcriptOnlyModel) TTS(ctx context.Context, text, voice, language str
return "", nil, fmt.Errorf("TTS not supported in transcript-only mode")
}
func (m *transcriptOnlyModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
return fmt.Errorf("TTS not supported in transcript-only mode")
}
func (m *transcriptOnlyModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta)
}
func (m *transcriptOnlyModel) PredictConfig() *config.ModelConfig {
return nil
}
@@ -322,78 +313,13 @@ func newRealtimeDecisionID() string {
}
func (m *wrappedModel) TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error) {
return backend.ModelTTS(ctx, text, voice, language, "", nil, m.modelLoader, m.appConfig, *m.TTSConfig)
}
func (m *wrappedModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
return ttsStream(ctx, m.modelLoader, m.appConfig, *m.TTSConfig, text, voice, language, onAudio)
}
func (m *wrappedModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta)
return backend.ModelTTS(ctx, text, voice, language, m.modelLoader, m.appConfig, *m.TTSConfig)
}
func (m *wrappedModel) PredictConfig() *config.ModelConfig {
return m.LLMConfig
}
// wavStreamHeaderBytes is the size of the WAV header that backend.ModelTTSStream
// emits as its first audio callback; the sample rate lives at byte offset 24.
const wavStreamHeaderBytes = 44
// ttsStream adapts backend.ModelTTSStream (which emits a WAV stream: a 44-byte
// header carrying the sample rate, then raw PCM) to the realtime onAudio
// callback, which wants raw PCM plus the sample rate. The header is buffered
// until complete, the sample rate is read from it, and subsequent bytes are
// forwarded as PCM.
func ttsStream(ctx context.Context, ml *model.ModelLoader, appConfig *config.ApplicationConfig, ttsConfig config.ModelConfig, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
var header []byte
headerDone := false
sampleRate := 0
return backend.ModelTTSStream(ctx, text, voice, language, "", nil, ml, appConfig, ttsConfig, func(b []byte) error {
if headerDone {
if len(b) == 0 {
return nil
}
return onAudio(b, sampleRate)
}
header = append(header, b...)
if len(header) < wavStreamHeaderBytes {
return nil
}
sampleRate = int(binary.LittleEndian.Uint32(header[24:28]))
headerDone = true
if len(header) > wavStreamHeaderBytes {
return onAudio(header[wavStreamHeaderBytes:], sampleRate)
}
return nil
})
}
// transcribeStream adapts backend.ModelTranscriptionStream to the realtime
// onDelta callback, returning the final aggregated transcription result.
func transcribeStream(ctx context.Context, ml *model.ModelLoader, transcriptionConfig config.ModelConfig, appConfig *config.ApplicationConfig, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
var final *schema.TranscriptionResult
err := backend.ModelTranscriptionStream(ctx, backend.TranscriptionRequest{
Audio: audio,
Language: language,
Translate: translate,
Diarize: diarize,
Prompt: prompt,
}, ml, transcriptionConfig, appConfig, func(chunk backend.TranscriptionStreamChunk) {
if chunk.Delta != "" {
onDelta(chunk.Delta)
}
if chunk.Final != nil {
final = chunk.Final
}
})
if err != nil {
return nil, err
}
return final, nil
}
func newTranscriptionOnlyModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) (Model, *config.ModelConfig, error) {
cfgVAD, err := cl.LoadModelConfigFileByName(pipeline.VAD, ml.ModelPath)
if err != nil {
@@ -523,9 +449,6 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model
return nil, fmt.Errorf("failed to validate config: %w", err)
}
// Let the pipeline force the LLM's thinking off (cfgLLM is a per-session copy).
applyPipelineThinking(cfgLLM, *pipeline)
cfgTTS, err := cl.LoadModelConfigFileByName(pipeline.TTS, ml.ModelPath)
if err != nil {

View File

@@ -1,102 +0,0 @@
package openai
import (
"context"
"encoding/base64"
"fmt"
"os"
"path/filepath"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
laudio "github.com/mudler/LocalAI/pkg/audio"
"github.com/mudler/LocalAI/pkg/sound"
)
// emitSpeech synthesizes text and sends the audio to the client. When the
// pipeline opts into TTS streaming it forwards each PCM chunk as its own
// response.output_audio.delta as soon as the backend produces it; otherwise it
// synthesizes the whole utterance and sends it as a single delta.
//
// It deliberately does NOT emit transcript or audio-done events: the caller owns
// those so a streamed reply can be split into several spoken segments that share
// one response/item.
//
// It returns the PCM audio (at the session output rate) accumulated across all
// chunks, which the caller base64-encodes onto the conversation item. For WebRTC
// the audio goes over the RTP track instead, so the returned slice is empty.
func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) ([]byte, error) {
if text == "" {
return nil, nil
}
_, isWebRTC := t.(*WebRTCTransport)
var wsAudio []byte // PCM at the session output rate, accumulated for the item record
// sendChunk hands one PCM buffer to the transport: WebRTC consumes the raw
// PCM directly (it resamples internally); WebSocket gets base64 PCM at the
// session output rate via a JSON delta event.
sendChunk := func(pcm []byte, sampleRate int) error {
if len(pcm) == 0 {
return nil
}
if err := t.SendAudio(ctx, pcm, sampleRate); err != nil {
return err
}
if isWebRTC {
return nil
}
wsPCM := pcm
if sampleRate != 0 && sampleRate != session.OutputSampleRate {
samples := sound.BytesToInt16sLE(pcm)
resampled := sound.ResampleInt16(samples, sampleRate, session.OutputSampleRate)
wsPCM = sound.Int16toBytesLE(resampled)
}
wsAudio = append(wsAudio, wsPCM...)
return t.SendEvent(types.ResponseOutputAudioDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: itemID,
OutputIndex: 0,
ContentIndex: 0,
Delta: base64.StdEncoding.EncodeToString(wsPCM),
})
}
language := ""
if session.InputAudioTranscription != nil {
language = session.InputAudioTranscription.Language
}
if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS() {
if err := session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk); err != nil {
return nil, err
}
return wsAudio, nil
}
// Unary fallback: synthesize the whole utterance to a file, then emit once.
audioFilePath, res, err := session.ModelInterface.TTS(ctx, text, session.Voice, language)
if err != nil {
return nil, err
}
if res != nil && !res.Success {
return nil, fmt.Errorf("tts generation failed: %s", res.Message)
}
defer func() { _ = os.Remove(audioFilePath) }()
// filepath.Clean normalizes the backend-produced temp path before reading
// (also keeps gosec G304 quiet — the path is backend-controlled, not user input).
audioBytes, err := os.ReadFile(filepath.Clean(audioFilePath))
if err != nil {
return nil, fmt.Errorf("read tts audio: %w", err)
}
pcm, sampleRate := laudio.ParseWAV(audioBytes)
if sampleRate == 0 {
sampleRate = session.OutputSampleRate
}
if err := sendChunk(pcm, sampleRate); err != nil {
return nil, err
}
return wsAudio, nil
}

View File

@@ -1,70 +0,0 @@
package openai
import (
"context"
"os"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
laudio "github.com/mudler/LocalAI/pkg/audio"
)
// emitSpeech synthesizes a piece of text and forwards the audio to the client,
// streaming a delta per TTS chunk when the pipeline opts in, or sending the
// whole utterance as one delta otherwise.
var _ = Describe("emitSpeech", func() {
ttsOn := true
streamingSession := func(m Model) *Session {
return &Session{
OutputSampleRate: 24000,
ModelInterface: m,
ModelConfig: &config.ModelConfig{
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &ttsOn}},
},
}
}
It("streams one output_audio.delta per TTS chunk when streaming is enabled", func() {
m := &fakeModel{
ttsStreamChunks: [][]byte{{1, 2}, {3, 4}, {5, 6}},
ttsStreamRate: 24000,
}
t := &fakeTransport{}
audio, err := emitSpeech(context.Background(), t, streamingSession(m), "resp1", "item1", "Hello there.")
Expect(err).ToNot(HaveOccurred())
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(3))
// The returned audio is all chunks concatenated (session output rate).
Expect(audio).To(Equal([]byte{1, 2, 3, 4, 5, 6}))
})
It("sends a single output_audio.delta in unary mode", func() {
// A minimal real WAV file for the unary TTS path to read + parse.
f, err := os.CreateTemp("", "emit-*.wav")
Expect(err).ToNot(HaveOccurred())
defer func() { _ = os.Remove(f.Name()) }()
pcm := make([]byte, 320) // 160 samples of silence
hdr := laudio.NewWAVHeader(uint32(len(pcm)))
Expect(hdr.Write(f)).To(Succeed())
_, err = f.Write(pcm)
Expect(err).ToNot(HaveOccurred())
Expect(f.Close()).To(Succeed())
session := &Session{
OutputSampleRate: 24000,
ModelInterface: &fakeModel{ttsFile: f.Name()},
ModelConfig: &config.ModelConfig{}, // streaming off
}
t := &fakeTransport{}
_, err = emitSpeech(context.Background(), t, session, "resp1", "item1", "Hello there.")
Expect(err).ToNot(HaveOccurred())
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1))
})
})

View File

@@ -1,253 +0,0 @@
package openai
import (
"context"
"encoding/base64"
"fmt"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/functions"
"github.com/mudler/LocalAI/pkg/reasoning"
)
// transcriptStreamer turns streamed LLM tokens into the assistant's spoken
// transcript: it strips reasoning incrementally and sends one
// response.output_audio_transcript.delta per content fragment. It does NOT
// synthesize audio — the caller buffers the full message and synthesizes it
// once (streaming the audio chunks when the TTS backend supports TTSStream),
// which works uniformly for streaming and non-streaming TTS and for languages
// without sentence or word boundaries.
type transcriptStreamer struct {
ctx context.Context
t Transport
responseID string
itemID string
extractor *reasoning.ReasoningExtractor
// announce, if set, is invoked once just before the first transcript delta.
// It lets the caller create the assistant item lazily, so a content-less
// tool-call turn never emits a spurious empty assistant item.
announce func()
announced bool
}
func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer {
return &transcriptStreamer{
ctx: ctx,
t: t,
responseID: responseID,
itemID: itemID,
extractor: reasoning.NewReasoningExtractor(thinkingStartToken, spokenReasoningConfig(reasoningCfg)),
}
}
// onToken handles one streamed unit of model output, sending a transcript delta
// for the new content (reasoning stripped). For plain-content models the unit is
// the raw text token; for autoparser tool turns the backend clears the text and
// delivers content via ChatDeltas, so the caller passes that content here.
func (s *transcriptStreamer) onToken(token string) {
_, content := s.extractor.ProcessToken(token)
if content == "" {
return
}
if !s.announced {
s.announced = true
if s.announce != nil {
s.announce()
}
}
_ = s.t.SendEvent(types.ResponseOutputAudioTranscriptDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: s.responseID,
ItemID: s.itemID,
OutputIndex: 0,
ContentIndex: 0,
Delta: content,
})
}
// content returns the full transcript so far with reasoning stripped.
func (s *transcriptStreamer) content() string {
return s.extractor.CleanedContent()
}
// streamLLMResponse drives a streamed realtime reply. It streams the assistant
// transcript as the LLM generates, then synthesizes the whole buffered message
// once (streaming the audio chunks when the TTS backend supports it, otherwise a
// single unary delta). Tool calls parsed from the autoparser ChatDeltas are
// emitted after the spoken content. The assistant content item is created lazily
// on the first content delta, so a content-less tool-call turn emits only the
// tool calls. It returns true when it has fully handled the response so the
// caller can return; callers must only invoke it for an audio modality, and with
// tools only when the model uses its tokenizer template (see triggerResponseAtTurn).
func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, toolTurn int) bool {
itemID := generateItemID()
item := types.MessageItemUnion{
Assistant: &types.MessageItemAssistant{
ID: itemID,
Status: types.ItemStatusInProgress,
Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}},
},
}
// announce creates the assistant content item lazily, just before the first
// transcript delta — a tool-only turn never produces content, so it stays out
// of the conversation and the client sees only the tool calls.
announced := false
announce := func() {
announced = true
conv.Lock.Lock()
conv.Items = append(conv.Items, &item)
conv.Lock.Unlock()
sendEvent(t, types.ResponseOutputItemAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: 0,
Item: item,
})
sendEvent(t, types.ResponseContentPartAddedEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: itemID,
OutputIndex: 0,
ContentIndex: 0,
Part: item.Assistant.Content[0],
})
}
cancel := func() {
if announced {
conv.Lock.Lock()
for i := len(conv.Items) - 1; i >= 0; i-- {
if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == itemID {
conv.Items = append(conv.Items[:i], conv.Items[i+1:]...)
break
}
}
conv.Lock.Unlock()
}
sendEvent(t, types.ResponseDoneEvent{
ServerEventBase: types.ServerEventBase{},
Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled},
})
}
var template string
if llmCfg.TemplateConfig.UseTokenizerTemplate {
template = llmCfg.GetModelTemplate()
} else {
template = llmCfg.TemplateConfig.Chat
}
thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig)
streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, llmCfg.ReasoningConfig)
streamer.announce = announce
cb := func(token string, usage backend.TokenUsage) bool {
if ctx.Err() != nil {
return false
}
// Plain-content models stream text via the token; autoparser tool turns
// clear the text and deliver content via ChatDeltas, so prefer the latter
// when present. Either way only content reaches the transcript — tool-call
// deltas are parsed from the final response below.
text := token
if len(usage.ChatDeltas) > 0 {
text = functions.ContentFromChatDeltas(usage.ChatDeltas)
}
streamer.onToken(text)
return true
}
predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, tools, toolChoice, nil, nil, nil)
if err != nil {
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", itemID)
return true
}
pred, err := predFunc()
if err != nil {
if ctx.Err() != nil {
cancel()
return true
}
sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", itemID)
return true
}
if ctx.Err() != nil {
cancel()
return true
}
content := streamer.content()
toolCalls := functions.ToolCallsFromChatDeltas(pred.ChatDeltas)
// Finalize the spoken content item only when the turn produced content. A
// tool-only turn skips this entirely (no empty assistant item).
if content != "" {
if !announced {
announce()
}
// Buffer the whole message, then synthesize it once. emitSpeech streams
// the audio chunks when the TTS backend supports TTSStream, otherwise it
// sends a single unary delta — no per-sentence segmentation either way.
audio, err := emitSpeech(ctx, t, session, responseID, itemID, content)
if err != nil {
if ctx.Err() != nil {
cancel()
return true
}
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", itemID)
return true
}
_, isWebRTC := t.(*WebRTCTransport)
sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: itemID,
OutputIndex: 0,
ContentIndex: 0,
Transcript: content,
})
if !isWebRTC {
sendEvent(t, types.ResponseOutputAudioDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: itemID,
OutputIndex: 0,
ContentIndex: 0,
})
}
conv.Lock.Lock()
item.Assistant.Status = types.ItemStatusCompleted
item.Assistant.Content[0].Transcript = content
if !isWebRTC {
item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio)
}
conv.Lock.Unlock()
sendEvent(t, types.ResponseContentPartDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: itemID,
OutputIndex: 0,
ContentIndex: 0,
Part: item.Assistant.Content[0],
})
sendEvent(t, types.ResponseOutputItemDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
OutputIndex: 0,
Item: item,
})
}
// Emit any tool calls, the terminal response.done, and (for server-side
// assistant tools) the follow-up turn — shared with the buffered path.
emitToolCallItems(ctx, session, conv, t, responseID, toolCalls, content != "", toolTurn)
return true
}

View File

@@ -1,150 +0,0 @@
package openai
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
"github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/reasoning"
)
// transcriptStreamer turns streamed LLM tokens into incremental transcript
// deltas, stripping reasoning. Audio is synthesized once from the full message
// by the caller, so there is no per-sentence segmentation.
var _ = Describe("transcriptStreamer", func() {
It("emits one transcript delta per content token", func() {
t := &fakeTransport{}
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{})
for _, tok := range []string{"Hello", " world.", " Bye"} {
s.onToken(tok)
}
Expect(s.content()).To(Equal("Hello world. Bye"))
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
Expect(t.transcriptDeltaText()).To(Equal("Hello world. Bye"))
})
It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() {
// disable_thinking maps to DisableReasoning=true (enable_thinking=false to
// the backend). If the model emits thinking anyway, the transcript must
// still not leak it: stripping always runs for spoken output.
disable := true
t := &fakeTransport{}
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "",
reasoning.Config{DisableReasoning: &disable})
s.onToken("<think>secret plan</think>")
s.onToken("The answer is 42.")
Expect(s.content()).To(Equal("The answer is 42."))
Expect(s.content()).ToNot(ContainSubstring("secret plan"))
Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan"))
})
})
// streamLLMResponse drives a full streamed realtime turn: live transcript
// deltas while the LLM generates, then the whole message is synthesized once.
var _ = Describe("streamLLMResponse", func() {
It("streams transcript deltas then synthesizes the whole message once", func() {
on := true
m := &fakeModel{
predictTokens: []string{"Hello", " world.", " How are you?"},
predictResp: backend.LLMResponse{Response: "Hello world. How are you?"},
ttsStreamChunks: [][]byte{{9}},
ttsStreamRate: 24000,
}
session := &Session{
OutputSampleRate: 24000,
ModelInterface: m,
ModelConfig: &config.ModelConfig{
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
},
}
conv := &Conversation{}
t := &fakeTransport{}
llmCfg := &config.ModelConfig{}
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
Expect(handled).To(BeTrue())
// One live transcript delta per streamed token.
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
// The whole message is synthesized ONCE (not per sentence): a single
// emitSpeech replays the one TTS stream chunk.
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1))
Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?"))
})
It("streams content deltas and emits tool-call items (autoparser tool turn)", func() {
on := true
// Autoparser path: reply.Message is empty; content + tool calls arrive via
// ChatDeltas. Chunk 1 carries content, chunk 2 carries the tool call.
contentDelta := []*proto.ChatDelta{{Content: "Let me check."}}
toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Paris"}`}}}}
m := &fakeModel{
predictTokens: []string{"", ""},
predictChunkDeltas: [][]*proto.ChatDelta{contentDelta, toolDelta},
predictResp: backend.LLMResponse{ChatDeltas: append(append([]*proto.ChatDelta{}, contentDelta...), toolDelta...)},
ttsStreamChunks: [][]byte{{9}},
ttsStreamRate: 24000,
}
session := &Session{
OutputSampleRate: 24000,
ModelInterface: m,
ModelConfig: &config.ModelConfig{
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
},
}
conv := &Conversation{}
t := &fakeTransport{}
llmCfg := &config.ModelConfig{}
llmCfg.TemplateConfig.UseTokenizerTemplate = true
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
Expect(handled).To(BeTrue())
// The spoken content was streamed live.
Expect(t.transcriptDeltaText()).To(Equal("Let me check."))
// The tool call is emitted as a function_call item.
Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1))
// Exactly one terminal response.done.
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
})
It("emits only tool-call items for a content-less tool turn (no empty assistant item)", func() {
on := true
toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Rome"}`}}}}
m := &fakeModel{
predictTokens: []string{""},
predictChunkDeltas: [][]*proto.ChatDelta{toolDelta},
predictResp: backend.LLMResponse{ChatDeltas: toolDelta},
}
session := &Session{
OutputSampleRate: 24000,
ModelInterface: m,
ModelConfig: &config.ModelConfig{
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
},
}
conv := &Conversation{}
t := &fakeTransport{}
llmCfg := &config.ModelConfig{}
llmCfg.TemplateConfig.UseTokenizerTemplate = true
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
Expect(handled).To(BeTrue())
// No content → no transcript deltas and no spurious assistant content item.
Expect(t.transcriptDeltaText()).To(Equal(""))
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(0))
// The tool call is still emitted.
Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1))
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
})
})

View File

@@ -1,33 +0,0 @@
package openai
import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/pkg/reasoning"
)
// applyPipelineThinking forces the LLM's reasoning/thinking off when the realtime
// pipeline sets disable_thinking, mapping to the enable_thinking=false backend
// metadata via ReasoningConfig.DisableReasoning. The LLM config passed in is the
// per-session copy returned by the config loader, so this does not affect other
// users of the same model. When the pipeline does not set disable_thinking the
// LLM config is left untouched.
func applyPipelineThinking(llm *config.ModelConfig, pipeline config.Pipeline) {
if llm == nil || !pipeline.ThinkingDisabled() {
return
}
disable := true
llm.ReasoningConfig.DisableReasoning = &disable
}
// spokenReasoningConfig adapts a model's reasoning config for stripping reasoning
// OUT of realtime spoken output. ReasoningConfig.DisableReasoning is overloaded:
// the backend reads it as the "enable_thinking=false" hint (which pipeline
// disable_thinking sets via applyPipelineThinking), but the reasoning extractor
// reads it as "skip stripping, assume there is no reasoning". Honouring the latter
// when extracting for speech would leak raw <think>…</think> whenever the model
// ignores the suppression hint. Spoken output must never contain reasoning, so we
// always strip: clear DisableReasoning while keeping custom tokens/tag pairs.
func spokenReasoningConfig(cfg reasoning.Config) reasoning.Config {
cfg.DisableReasoning = nil
return cfg
}

View File

@@ -1,50 +0,0 @@
package openai
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/pkg/reasoning"
)
// applyPipelineThinking lets a realtime pipeline force the LLM's thinking off
// (enable_thinking=false metadata) without editing the LLM model config.
var _ = Describe("applyPipelineThinking", func() {
It("disables reasoning on the LLM config when the pipeline disables thinking", func() {
disable := true
llm := &config.ModelConfig{}
applyPipelineThinking(llm, config.Pipeline{DisableThinking: &disable})
Expect(llm.ReasoningConfig.DisableReasoning).ToNot(BeNil())
Expect(*llm.ReasoningConfig.DisableReasoning).To(BeTrue())
})
It("leaves the LLM config untouched when the pipeline does not set disable_thinking", func() {
llm := &config.ModelConfig{}
applyPipelineThinking(llm, config.Pipeline{})
Expect(llm.ReasoningConfig.DisableReasoning).To(BeNil())
})
})
// spokenReasoningConfig clears DisableReasoning so realtime spoken output always
// strips reasoning, even though disable_thinking sets DisableReasoning=true on the
// LLM config (which the backend reads as enable_thinking=false).
var _ = Describe("spokenReasoningConfig", func() {
It("clears DisableReasoning so the extractor still strips leaked reasoning", func() {
disable := true
out := spokenReasoningConfig(reasoning.Config{DisableReasoning: &disable})
Expect(out.DisableReasoning).To(BeNil())
})
It("preserves the other reasoning settings", func() {
disable := true
out := spokenReasoningConfig(reasoning.Config{
DisableReasoning: &disable,
ThinkingStartTokens: []string{"<reason>"},
TagPairs: []reasoning.TagPair{{Start: "<reason>", End: "</reason>"}},
})
Expect(out.ThinkingStartTokens).To(Equal([]string{"<reason>"}))
Expect(out.TagPairs).To(HaveLen(1))
Expect(out.TagPairs[0].Start).To(Equal("<reason>"))
})
})

View File

@@ -1,63 +0,0 @@
package openai
import (
"context"
"fmt"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
)
// emitTranscription transcribes a committed utterance and emits the transcription
// events for it, returning the final transcript text. With
// pipeline.streaming.transcription enabled it streams each transcript fragment as
// a conversation.item.input_audio_transcription.delta as the backend produces it,
// then a completed event; otherwise it transcribes the whole utterance and emits
// a single completed event. delta and completed events share itemID.
func emitTranscription(ctx context.Context, t Transport, session *Session, itemID, audioPath string) (string, error) {
cfg := session.InputAudioTranscription
if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTranscription() {
final, err := session.ModelInterface.TranscribeStream(ctx, audioPath, cfg.Language, false, false, cfg.Prompt, func(delta string) {
_ = t.SendEvent(types.ConversationItemInputAudioTranscriptionDeltaEvent{
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
ItemID: itemID,
ContentIndex: 0,
Delta: delta,
})
})
if err != nil {
return "", err
}
transcript := ""
if final != nil {
transcript = final.Text
}
if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
ItemID: itemID,
ContentIndex: 0,
Transcript: transcript,
}); err != nil {
return "", err
}
return transcript, nil
}
// Unary fallback: transcribe the whole utterance, emit one completed event.
tr, err := session.ModelInterface.Transcribe(ctx, audioPath, cfg.Language, false, false, cfg.Prompt)
if err != nil {
return "", err
}
if tr == nil {
return "", fmt.Errorf("transcribe result is nil")
}
if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
ItemID: itemID,
ContentIndex: 0,
Transcript: tr.Text,
}); err != nil {
return "", err
}
return tr.Text, nil
}

View File

@@ -1,54 +0,0 @@
package openai
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
"github.com/mudler/LocalAI/core/schema"
)
// emitTranscription transcribes a committed utterance, streaming transcript text
// deltas when the pipeline opts in, and returns the final transcript text.
var _ = Describe("emitTranscription", func() {
It("streams transcription deltas then a completed event when streaming is enabled", func() {
on := true
session := &Session{
InputAudioTranscription: &types.AudioTranscription{},
ModelConfig: &config.ModelConfig{
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{Transcription: &on}},
},
ModelInterface: &fakeModel{
transcribeDeltas: []string{"Hel", "lo", " world"},
transcribeFinal: &schema.TranscriptionResult{Text: "Hello world"},
},
}
t := &fakeTransport{}
transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav")
Expect(err).ToNot(HaveOccurred())
Expect(transcript).To(Equal("Hello world"))
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(3))
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1))
})
It("emits a single completed event with no deltas in unary mode", func() {
session := &Session{
InputAudioTranscription: &types.AudioTranscription{},
ModelConfig: &config.ModelConfig{}, // streaming off
ModelInterface: &fakeModel{transcribeFinal: &schema.TranscriptionResult{Text: "Hi"}},
}
t := &fakeTransport{}
transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav")
Expect(err).ToNot(HaveOccurred())
Expect(transcript).To(Equal("Hi"))
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(0))
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1))
})
})

View File

@@ -10,7 +10,6 @@ import (
"github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/pkg/natsauth"
"gorm.io/gorm"
)
@@ -36,7 +35,7 @@ func nodeReadyMiddleware(registry *nodes.NodeRegistry) echo.MiddlewareFunc {
// token but do not verify per-node identity. A compromised worker can heartbeat/drain/
// deregister other nodes. Future: issue per-node JWT at registration, validate node
// identity on subsequent requests (compare :id param with token subject).
func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, registrationToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string, natsCfg natsauth.Config) {
func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, registrationToken string, autoApprove bool, authDB *gorm.DB, hmacSecret string) {
if registry == nil {
return
}
@@ -45,7 +44,7 @@ func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, r
tokenAuthMw := nodeTokenAuth(registrationToken)
node := e.Group("/api/node", readyMw, tokenAuthMw)
node.POST("/register", localai.RegisterNodeEndpoint(registry, registrationToken, autoApprove, authDB, hmacSecret, natsCfg))
node.POST("/register", localai.RegisterNodeEndpoint(registry, registrationToken, autoApprove, authDB, hmacSecret))
node.POST("/:id/heartbeat", localai.HeartbeatEndpoint(registry))
node.POST("/:id/drain", localai.DrainNodeEndpoint(registry))
node.POST("/:id/resume", localai.ResumeNodeEndpoint(registry))
@@ -61,7 +60,7 @@ func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, r
// backend install path (POST /:id/backends/install). That handler enqueues a
// ManagementOp on the gallery channel rather than blocking on a NATS reply, so
// the browser gets HTTP 202 + jobID immediately instead of waiting up to 3 minutes.
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string, natsCfg natsauth.Config) {
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string) {
if registry == nil {
return
}
@@ -82,7 +81,7 @@ func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloade
admin.DELETE("/:id", localai.DeregisterNodeEndpoint(registry))
admin.POST("/:id/drain", localai.DrainNodeEndpoint(registry))
admin.POST("/:id/resume", localai.ResumeNodeEndpoint(registry))
admin.POST("/:id/approve", localai.ApproveNodeEndpoint(registry, authDB, hmacSecret, natsCfg))
admin.POST("/:id/approve", localai.ApproveNodeEndpoint(registry, authDB, hmacSecret))
// Backend management on workers
admin.GET("/:id/backends", localai.ListBackendsOnNodeEndpoint(unloader))

View File

@@ -60,14 +60,6 @@ type TTSRequest struct {
Format string `json:"response_format,omitempty" yaml:"response_format,omitempty"` // (optional) output format
Stream bool `json:"stream,omitempty" yaml:"stream,omitempty"` // (optional) enable streaming TTS
SampleRate int `json:"sample_rate,omitempty" yaml:"sample_rate,omitempty"` // (optional) desired output sample rate
// Instructions is a free-form, per-request style/voice description. It maps to
// the OpenAI `instructions` field and is forwarded to the backend so expressive
// TTS models (e.g. Qwen3-TTS CustomVoice/VoiceDesign) can vary tone or designed
// voice per request instead of only via the static YAML option.
Instructions string `json:"instructions,omitempty" yaml:"instructions,omitempty"`
// Params carries optional, backend-specific per-request generation parameters
// (LocalAI extension, e.g. Chatterbox exaggeration/cfg_weight/temperature).
Params map[string]string `json:"params,omitempty" yaml:"params,omitempty"`
}
// @Description VAD request body

View File

@@ -2,22 +2,15 @@ package messaging
import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/mudler/LocalAI/pkg/sanitize"
"github.com/mudler/xlog"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
)
// subscribeConfirmTimeout bounds the server round-trip used to detect whether a
// subscription was rejected (e.g. by JWT permissions) before returning to the caller.
const subscribeConfirmTimeout = 5 * time.Second
// Client wraps a NATS connection and provides helpers for pub/sub and queue subscriptions.
type Client struct {
conn *nats.Conn
@@ -25,13 +18,8 @@ type Client struct {
}
// New creates a new NATS client with auto-reconnect.
func New(url string, opts ...Option) (*Client, error) {
var cfg connectConfig
for _, o := range opts {
o(&cfg)
}
natsOpts := []nats.Option{
func New(url string) (*Client, error) {
nc, err := nats.Connect(url,
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(-1),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
@@ -45,60 +33,7 @@ func New(url string, opts ...Option) (*Client, error) {
nats.ClosedHandler(func(_ *nats.Conn) {
xlog.Info("NATS connection closed")
}),
// Surface async errors (notably permission violations) that NATS would
// otherwise deliver silently. A subscription the server rejects for a
// JWT permission means the worker never receives those messages, so make
// it loud rather than letting the feature fail invisibly.
nats.ErrorHandler(func(_ *nats.Conn, sub *nats.Subscription, err error) {
subject := ""
if sub != nil {
subject = sub.Subject
}
if errors.Is(err, nats.ErrPermissionViolation) {
xlog.Error("NATS permission violation — check JWT pub/sub allow lists", "subject", subject, "error", err)
return
}
xlog.Warn("NATS async error", "subject", subject, "error", err)
}),
}
switch {
case cfg.jwtProvider != nil:
// Fetch creds on every (re)connect so a refresh loop can rotate the JWT
// before expiry; the server expiring the old JWT triggers a reconnect
// that transparently picks up the new one.
natsOpts = append(natsOpts, nats.UserJWT(
func() (string, error) {
jwt, _ := cfg.jwtProvider()
if jwt == "" {
return "", fmt.Errorf("no NATS user JWT available")
}
return jwt, nil
},
func(nonce []byte) ([]byte, error) {
_, seed := cfg.jwtProvider()
kp, err := nkeys.FromSeed([]byte(seed))
if err != nil {
return nil, fmt.Errorf("loading NATS user seed: %w", err)
}
defer kp.Wipe()
return kp.Sign(nonce)
},
))
case cfg.userJWT != "" && cfg.userSeed != "":
natsOpts = append(natsOpts, nats.UserJWTAndSeed(cfg.userJWT, cfg.userSeed))
}
if cfg.tls.Enabled() {
if err := cfg.tls.Validate(); err != nil {
return nil, err
}
tlsOpts, err := cfg.tls.natsOptions()
if err != nil {
return nil, err
}
natsOpts = append(natsOpts, tlsOpts...)
}
nc, err := nats.Connect(url, natsOpts...)
)
if err != nil {
return nil, fmt.Errorf("connecting to NATS at %s: %w", sanitize.URL(url), err)
}
@@ -119,65 +54,21 @@ func (c *Client) Publish(subject string, data any) error {
// Subscribe creates a subscription on the given subject. All subscribers receive every message.
func (c *Client) Subscribe(subject string, handler func([]byte)) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
}
// QueueSubscribe creates a queue subscription. Within the same queue group,
// only one subscriber receives each message (load-balanced).
func (c *Client) QueueSubscribe(subject, queue string, handler func([]byte)) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data)
})
})
}
// confirmSubscription creates a subscription via mk and forces a server
// round-trip so that a permissions violation — which NATS otherwise reports
// only asynchronously — is returned to the caller synchronously. The server
// emits the "-ERR Permissions Violation" for a rejected SUB before the PONG
// that satisfies the flush, so by the time FlushTimeout returns the violation
// is recorded as the connection's last error. Without this, a worker whose JWT
// lacks a subject gets a non-nil subscription that never receives a message,
// turning a permission misconfiguration into a silent failure.
func (c *Client) confirmSubscription(subject string, mk func(*nats.Conn) (*nats.Subscription, error)) (Subscription, error) {
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
return nil, fmt.Errorf("subscribe to %s: nil NATS connection", subject)
}
sub, err := mk(conn)
if err != nil {
return nil, err
}
// A failed flush here means we could not round-trip to the server (not yet
// connected, reconnecting, slow link). RetryOnFailedConnect intentionally
// buffers subscriptions across that gap, so do NOT fail — keep the
// subscription and let it replay on (re)connect; a later permission
// violation is still logged by the async error handler in New.
if err := conn.FlushTimeout(subscribeConfirmTimeout); err != nil {
xlog.Debug("Could not confirm NATS subscription (will replay on connect)", "subject", subject, "error", err)
return sub, nil
}
// Flush succeeded, so any permission violation for this SUB has already been
// recorded as the connection's last error (the server emits it before the
// PONG). LastError is per-connection; match the exact quoted subject the
// server echoes ("Subscription to \"<subject>\"") so a stale violation for
// another subject can't be mis-attributed here.
if lerr := conn.LastError(); lerr != nil &&
errors.Is(lerr, nats.ErrPermissionViolation) &&
strings.Contains(lerr.Error(), `Subscription to "`+subject+`"`) {
_ = sub.Unsubscribe()
return nil, fmt.Errorf("subscription to %s denied by NATS server (check JWT sub allow list): %w", subject, lerr)
}
return sub, nil
defer c.mu.RUnlock()
return c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data)
})
}
// Request sends a request and waits for a reply (request-reply pattern).
@@ -195,15 +86,15 @@ func (c *Client) Request(subject string, data []byte, timeout time.Duration) ([]
// SubscribeReply creates a subscription that supports replying to requests.
// The handler receives the raw request data and the reply subject.
func (c *Client) SubscribeReply(subject string, handler func(data []byte, reply func([]byte))) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
})
}
})
})
}
@@ -211,15 +102,15 @@ func (c *Client) SubscribeReply(subject string, handler func(data []byte, reply
// QueueSubscribeReply creates a queue subscription that supports replying to requests.
// Load-balanced across subscribers in the same queue group, with request-reply support.
func (c *Client) QueueSubscribeReply(subject, queue string, handler func(data []byte, reply func([]byte))) (Subscription, error) {
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Data, func(replyData []byte) {
if msg.Reply != "" {
if err := msg.Respond(replyData); err != nil {
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
}
})
}
})
})
}

View File

@@ -1,34 +0,0 @@
package messaging
// Option configures NATS client connection behavior.
type Option func(*connectConfig)
// CredentialProvider returns the NATS user JWT and signing seed to use for the
// next (re)connect. It is consulted on every connection attempt, so a refresh
// loop can rotate credentials before they expire and the connection picks them
// up automatically when the server expires the old JWT and triggers a reconnect.
type CredentialProvider func() (jwt, seed string)
type connectConfig struct {
userJWT string
userSeed string
jwtProvider CredentialProvider
tls TLSFiles
}
// WithUserJWT connects using a static NATS user JWT and signing seed (UserJWTAndSeed).
func WithUserJWT(jwt, seed string) Option {
return func(c *connectConfig) {
c.userJWT = jwt
c.userSeed = seed
}
}
// WithUserJWTProvider connects using credentials fetched from provider on each
// (re)connect, enabling JWT rotation without dropping the client. Takes
// precedence over WithUserJWT when both are set.
func WithUserJWTProvider(provider CredentialProvider) Option {
return func(c *connectConfig) {
c.jwtProvider = provider
}
}

View File

@@ -1,68 +0,0 @@
package messaging
import (
"fmt"
"os"
"github.com/nats-io/nats.go"
)
// TLSFiles holds PEM paths for NATS TLS / mTLS. Cert and key must be set together.
// Use tls:// in LOCALAI_NATS_URL; CA and client cert paths are optional extras.
type TLSFiles struct {
CA string // LOCALAI_NATS_TLS_CA — private CA for server verification
Cert string // LOCALAI_NATS_TLS_CERT — client certificate (mTLS)
Key string // LOCALAI_NATS_TLS_KEY — client private key
}
// Enabled reports whether any TLS file path is configured.
func (f TLSFiles) Enabled() bool {
return f.CA != "" || f.Cert != "" || f.Key != ""
}
// Validate checks path pairing and that files exist.
func (f TLSFiles) Validate() error {
if f.Cert != "" && f.Key == "" {
return fmt.Errorf("LOCALAI_NATS_TLS_KEY is required when LOCALAI_NATS_TLS_CERT is set")
}
if f.Key != "" && f.Cert == "" {
return fmt.Errorf("LOCALAI_NATS_TLS_CERT is required when LOCALAI_NATS_TLS_KEY is set")
}
for _, path := range []struct {
name, path string
}{
{"LOCALAI_NATS_TLS_CA", f.CA},
{"LOCALAI_NATS_TLS_CERT", f.Cert},
{"LOCALAI_NATS_TLS_KEY", f.Key},
} {
if path.path == "" {
continue
}
if _, err := os.Stat(path.path); err != nil {
return fmt.Errorf("%s: %w", path.name, err)
}
}
return nil
}
// natsOptions builds nats-go TLS options. Call Validate first.
func (f TLSFiles) natsOptions() ([]nats.Option, error) {
if !f.Enabled() {
return nil, nil
}
opts := []nats.Option{nats.Secure()}
if f.CA != "" {
opts = append(opts, nats.RootCAs(f.CA))
}
if f.Cert != "" {
opts = append(opts, nats.ClientCert(f.Cert, f.Key))
}
return opts, nil
}
// WithTLS configures CA and/or client certificate paths for the NATS connection.
func WithTLS(files TLSFiles) Option {
return func(c *connectConfig) {
c.tls = files
}
}

View File

@@ -1,25 +0,0 @@
package messaging_test
import (
"os"
"path/filepath"
"github.com/mudler/LocalAI/core/services/messaging"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("TLSFiles", func() {
It("requires cert and key together", func() {
Expect((messaging.TLSFiles{Cert: "/tmp/c.pem"}).Validate()).To(HaveOccurred())
Expect((messaging.TLSFiles{Key: "/tmp/k.pem"}).Validate()).To(HaveOccurred())
})
It("validates files exist", func() {
dir := GinkgoT().TempDir()
ca := filepath.Join(dir, "ca.pem")
Expect(os.WriteFile(ca, []byte("x"), 0600)).To(Succeed())
Expect((messaging.TLSFiles{CA: ca}).Validate()).To(Succeed())
})
})

View File

@@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
@@ -933,12 +932,13 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op
{"AudioPath", &opts.AudioPath},
}
// Count stageable files for progress tracking. Directory models expand to
// the number of files they contain, matching what stageDirectory uploads.
// Count stageable files for progress tracking
totalFiles := 0
for _, f := range fields {
if *f.val != "" {
totalFiles += countStageableFiles(*f.val)
if _, err := os.Stat(*f.val); err == nil {
totalFiles++
}
}
}
for _, adapter := range opts.LoraAdapters {
@@ -969,33 +969,8 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op
*f.val = ""
continue
}
localPath := *f.val
// Directory models (e.g. qwen3-tts-cpp ships its weights and tokenizer
// ggufs under one directory) can't be uploaded as a single file — the
// stager would open the directory and read its fd, failing with
// "is a directory" (EISDIR). Expand the directory and stage each
// contained file, then rewrite the field to the remote directory.
if fi, statErr := os.Stat(localPath); statErr == nil && fi.IsDir() {
remoteDir, dirErr := r.stageDirectory(ctx, node, trackingKey, localPath, keyMapper, &fileIdx, totalFiles)
if dirErr != nil {
if f.name == "ModelFile" {
xlog.Error("Failed to stage model directory for remote node", "node", node.Name, "field", f.name, "path", localPath, "error", dirErr)
return nil, fmt.Errorf("staging model file: %w", dirErr)
}
xlog.Warn("Failed to stage model directory, clearing field", "field", f.name, "path", localPath, "error", dirErr)
*f.val = ""
continue
}
*f.val = remoteDir
if f.name == "ModelFile" && opts.Model != "" {
opts.ModelPath = DeriveRemoteModelPath(remoteDir, opts.Model)
xlog.Debug("Derived remote ModelPath", "modelPath", opts.ModelPath)
}
continue
}
fileIdx++
localPath := *f.val
key := keyMapper.Key(localPath)
// Attach progress callback to context for byte-level tracking
@@ -1099,77 +1074,6 @@ func (r *SmartRouter) withStagingCallback(ctx context.Context, trackingKey, file
})
}
// countStageableFiles returns the number of regular files a model path expands
// to for staging: 1 for a regular file, the contained file count for a
// directory, and 0 if the path does not exist.
func countStageableFiles(path string) int {
fi, err := os.Stat(path)
if err != nil {
return 0
}
if !fi.IsDir() {
return 1
}
n := 0
_ = filepath.WalkDir(path, func(_ string, d fs.DirEntry, walkErr error) error {
if walkErr != nil {
return nil
}
if !d.IsDir() {
n++
}
return nil
})
return n
}
// stageDirectory stages every file under a directory-based model (e.g.
// qwen3-tts-cpp, whose weights and tokenizer ggufs live in one directory).
// Each file is uploaded individually with a structure-preserving key; the
// returned path is the remote directory that contained them, suitable for the
// backend's ModelFile/ModelPath. fileIdx is advanced per staged file so the
// staging progress tracker stays accurate.
func (r *SmartRouter) stageDirectory(ctx context.Context, node *BackendNode, trackingKey, dir string, keyMapper *StagingKeyMapper, fileIdx *int, totalFiles int) (string, error) {
var remoteDir string
err := filepath.WalkDir(dir, func(path string, d fs.DirEntry, walkErr error) error {
if walkErr != nil {
return walkErr
}
if d.IsDir() {
return nil
}
*fileIdx++
fileName := filepath.Base(path)
stageCtx := r.withStagingCallback(ctx, trackingKey, fileName, *fileIdx, totalFiles)
xlog.Info("Staging file", "model", trackingKey, "node", node.Name, "field", "ModelDir", "file", fileName, "fileIndex", *fileIdx, "totalFiles", totalFiles)
remoteFile, err := r.fileStager.EnsureRemote(stageCtx, node.ID, path, keyMapper.Key(path))
if err != nil {
return fmt.Errorf("staging %s: %w", path, err)
}
r.stagingTracker.FileComplete(trackingKey, *fileIdx, totalFiles)
// Every file under dir shares the same remote parent directory; derive
// it from this file's staged path and its path relative to dir.
rel, relErr := filepath.Rel(dir, path)
if relErr != nil {
return relErr
}
remoteDir = DeriveRemoteModelPath(remoteFile, rel)
r.stageCompanionFiles(ctx, node, path, keyMapper.Key)
return nil
})
if err != nil {
return "", err
}
if remoteDir == "" {
return "", fmt.Errorf("model directory %s contains no files", dir)
}
return remoteDir, nil
}
// stageCompanionFiles stages known companion files that exist alongside
// localPath. For example, piper TTS implicitly loads ".onnx.json" next to
// the ".onnx" model file. Errors are logged but not propagated.

View File

@@ -1,64 +0,0 @@
package nodes
import (
"context"
"os"
"path/filepath"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
)
// These tests cover staging of "directory models" — models whose ModelFile is a
// directory containing multiple files (e.g. qwen3-tts-cpp ships weights +
// tokenizer ggufs under one directory). The HTTP file stager uploads a single
// regular file per path, so a directory ModelFile must be expanded into its
// constituent files; otherwise the upload reads a directory fd and fails with
// "is a directory" (EISDIR) on remote NATS worker nodes.
var _ = Describe("stageModelFiles directory models", func() {
var (
stager *fakeFileStager
router *SmartRouter
node *BackendNode
tmp string
modelID = "qwen3-tts-cpp"
)
BeforeEach(func() {
stager = &fakeFileStager{}
router = &SmartRouter{
fileStager: stager,
stagingTracker: NewStagingTracker(),
}
node = &BackendNode{ID: "node-1", Name: "node-1", Address: "10.0.0.1:50051"}
tmp = GinkgoT().TempDir()
})
It("stages every file inside a directory ModelFile instead of the directory path", func() {
modelDir := filepath.Join(tmp, "models", modelID)
Expect(os.MkdirAll(modelDir, 0o755)).To(Succeed())
weights := filepath.Join(modelDir, "qwen3-tts-0.6b-f16.gguf")
tokenizer := filepath.Join(modelDir, "qwen3-tts-tokenizer-f16.gguf")
Expect(os.WriteFile(weights, []byte("weights"), 0o644)).To(Succeed())
Expect(os.WriteFile(tokenizer, []byte("tokenizer"), 0o644)).To(Succeed())
opts := &pb.ModelOptions{
Model: modelID,
ModelFile: modelDir,
}
_, err := router.stageModelFiles(context.Background(), node, opts, "track-key")
Expect(err).ToNot(HaveOccurred())
staged := make([]string, 0, len(stager.ensureCalls))
for _, c := range stager.ensureCalls {
staged = append(staged, c.localPath)
}
// Each contained file is staged individually; the directory path itself
// is never handed to the stager (which would read a directory fd).
Expect(staged).To(ConsistOf(weights, tokenizer))
Expect(staged).ToNot(ContainElement(modelDir))
})
})

View File

@@ -60,13 +60,7 @@ type Config struct {
MaxReplicasPerModel int `env:"LOCALAI_MAX_REPLICAS_PER_MODEL" default:"1" help:"Max replicas of any single model on this worker. Default 1 preserves single-replica behavior; set higher to allow stacking replicas on a fat node." group:"registration"`
// NATS (required)
NatsURL string `env:"LOCALAI_NATS_URL" required:"" help:"NATS server URL" group:"distributed"`
NatsJWT string `env:"LOCALAI_NATS_JWT" help:"NATS user JWT override (normally from registration nats_jwt)" group:"distributed"`
NatsUserSeed string `env:"LOCALAI_NATS_USER_SEED" help:"NATS user signing seed override (normally from registration nats_user_seed)" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT+seed from registration or env" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI)" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
NatsURL string `env:"LOCALAI_NATS_URL" required:"" help:"NATS server URL" group:"distributed"`
// S3 storage for distributed file transfer
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3 endpoint URL" group:"distributed"`

View File

@@ -1,33 +0,0 @@
package worker
import (
"fmt"
"github.com/mudler/LocalAI/core/services/messaging"
)
// connectNATS opens a NATS client using JWT+seed from env or registration (env wins).
func connectNATS(url, envJWT, envSeed, registerJWT, registerSeed string, requireAuth bool, tls messaging.TLSFiles) (*messaging.Client, error) {
// Env credentials take precedence, but only fall back to registration when
// the env supplied neither half — otherwise a JWT set without its seed (or
// vice-versa) would be silently completed from a different source.
jwt, seed := envJWT, envSeed
if jwt == "" && seed == "" {
jwt, seed = registerJWT, registerSeed
}
// A JWT without its paired seed (or vice-versa) is a misconfiguration: refuse
// rather than silently connecting anonymously, which would look authenticated.
if (jwt == "") != (seed == "") {
return nil, fmt.Errorf("NATS JWT and seed must be provided together (got JWT set=%t, seed set=%t)", jwt != "", seed != "")
}
var opts []messaging.Option
if jwt != "" && seed != "" {
opts = append(opts, messaging.WithUserJWT(jwt, seed))
} else if requireAuth {
return nil, fmt.Errorf("NATS JWT+seed required: set LOCALAI_NATS_JWT/LOCALAI_NATS_USER_SEED or enable frontend minting")
}
if tls.Enabled() {
opts = append(opts, messaging.WithTLS(tls))
}
return messaging.New(url, opts...)
}

View File

@@ -1,29 +0,0 @@
package worker
import (
"github.com/mudler/LocalAI/core/services/messaging"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("connectNATS", func() {
It("requires JWT when requireAuth is set and no credentials are provided", func() {
_, err := connectNATS("nats://127.0.0.1:4222", "", "", "", "", true, messaging.TLSFiles{})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("NATS JWT+seed required"))
})
// A JWT supplied without its paired seed (or vice-versa) is an operator
// misconfiguration. Today connectNATS silently drops the unpaired credential
// and connects anonymously, so the operator believes the link is
// authenticated when it is not. It should refuse instead.
It("rejects a JWT supplied without a seed instead of connecting anonymously", func() {
client, err := connectNATS("nats://127.0.0.1:4222", "jwt-without-seed", "", "", "", false, messaging.TLSFiles{})
if client != nil {
client.Close()
}
Expect(err).To(HaveOccurred(),
"connectNATS should reject an unpaired JWT rather than silently connecting anonymously")
})
})

View File

@@ -15,7 +15,6 @@ import (
"github.com/mudler/LocalAI/core/cli/workerregistry"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
grpc "github.com/mudler/LocalAI/pkg/grpc"
@@ -68,63 +67,10 @@ func Run(ctx *cliContext.Context, cfg *Config) error {
RegistrationToken: cfg.RegistrationToken,
}
// Context cancelled on shutdown — used by registration waits, heartbeat, and
// other background goroutines.
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()
registrationBody := cfg.registrationBody()
natsTLS := messaging.TLSFiles{CA: cfg.NatsTLSCA, Cert: cfg.NatsTLSCert, Key: cfg.NatsTLSKey}
// Resolve how to connect to NATS. Static env credentials cannot be re-minted,
// so register once and use them directly. Otherwise the credential manager
// (re)registers to obtain credentials — waiting through admin approval — and
// refreshes them before the minted JWT expires, so the connection survives
// expiry via a transparent reconnect.
var (
nodeID string
connectNats func() (*messaging.Client, error)
)
if cfg.NatsJWT != "" || cfg.NatsUserSeed != "" {
nid, _, _, _, regErr := regClient.RegisterWithRetry(shutdownCtx, registrationBody, 10)
if regErr != nil {
return fmt.Errorf("failed to register with frontend: %w", regErr)
}
nodeID = nid
connectNats = func() (*messaging.Client, error) {
return connectNATS(cfg.NatsURL, cfg.NatsJWT, cfg.NatsUserSeed, "", "", cfg.NatsRequireAuth, natsTLS)
}
} else {
credMgr := workerregistry.NewNATSCredentialManager(
func(ctx context.Context) (*workerregistry.RegisterResponse, error) {
return regClient.RegisterFull(ctx, registrationBody)
},
cfg.NatsRequireAuth,
)
res, regErr := credMgr.Acquire(shutdownCtx)
if regErr != nil {
return fmt.Errorf("failed to register with frontend: %w", regErr)
}
nodeID = res.ID
connectNats = func() (*messaging.Client, error) {
var opts []messaging.Option
if credMgr.HasCredentials() {
opts = append(opts, messaging.WithUserJWTProvider(credMgr.Provider()))
}
if natsTLS.Enabled() {
opts = append(opts, messaging.WithTLS(natsTLS))
}
client, cerr := messaging.New(cfg.NatsURL, opts...)
if cerr == nil && credMgr.HasCredentials() {
go func() {
if err := credMgr.RefreshLoop(shutdownCtx); err != nil {
xlog.Error("NATS credential refresh permanently failed; shutting down worker", "error", err)
shutdownCancel()
}
}()
}
return client, cerr
}
nodeID, _, err := regClient.RegisterWithRetry(context.Background(), registrationBody, 10)
if err != nil {
return fmt.Errorf("failed to register with frontend: %w", err)
}
xlog.Info("Registered with frontend", "nodeID", nodeID, "frontend", cfg.RegisterTo)
@@ -133,6 +79,9 @@ func Run(ctx *cliContext.Context, cfg *Config) error {
xlog.Warn("invalid heartbeat interval, using default 10s", "input", cfg.HeartbeatInterval, "error", err)
}
heartbeatInterval = cmp.Or(heartbeatInterval, 10*time.Second)
// Context cancelled on shutdown — used by heartbeat and other background goroutines
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
defer shutdownCancel()
// Start HTTP file transfer server
httpAddr := cfg.resolveHTTPAddr()
@@ -145,7 +94,7 @@ func Run(ctx *cliContext.Context, cfg *Config) error {
// Connect to NATS
xlog.Info("Connecting to NATS", "url", sanitize.URL(cfg.NatsURL))
natsClient, err := connectNats()
natsClient, err := messaging.New(cfg.NatsURL)
if err != nil {
nodes.ShutdownFileTransferServer(httpServer)
return fmt.Errorf("connecting to NATS: %w", err)
@@ -205,21 +154,12 @@ func Run(ctx *cliContext.Context, cfg *Config) error {
}
xlog.Info("Worker ready, waiting for backend.install events")
// Exit on an OS signal or on an internal fatal condition (e.g. NATS
// credentials became unrenewable), so the worker restarts and re-acquires
// rather than lingering unable to serve.
var runErr error
select {
case <-sigCh:
case <-shutdownCtx.Done():
runErr = fmt.Errorf("worker shutting down: NATS credentials unavailable")
xlog.Error("Internal shutdown requested", "error", runErr)
}
<-sigCh
xlog.Info("Shutting down worker")
shutdownCancel() // stop heartbeat loop immediately
regClient.GracefulDeregister(nodeID)
supervisor.stopAllBackends()
nodes.ShutdownFileTransferServer(httpServer)
return runErr
return nil
}

View File

@@ -71,50 +71,6 @@ The frontend is a standard LocalAI instance with distributed mode enabled. These
| `--backend-upgrade-timeout` | `LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT` | `15m` | Same as the install timeout, applied to backend upgrades (force-reinstall). |
| `--expose-node-header` | `LOCALAI_EXPOSE_NODE_HEADER` | `false` | When enabled, inference responses carry an `X-LocalAI-Node` header with the ID of the worker node that served the request. Coverage spans the OpenAI-compatible endpoints (chat completions, completions, embeddings, audio transcriptions, audio speech / TTS, image generations, image inpainting), the Jina rerank endpoint (`/v1/rerank`), the VAD endpoints (`/v1/vad`, `/vad`), and the Anthropic Messages (`/v1/messages`) and Ollama (`/api/chat`, `/api/generate`, `/api/embed`) shims. Useful for debugging, observability and load-balancer attribution. Off by default: the node ID reveals internal cluster topology and should not be exposed on a public endpoint. Best-effort: under heavy concurrency for the same model across multiple replicas, the header may reflect a recent routing decision rather than this exact request's. Acceptable for observability and debugging. |
### NATS JWT authentication (recommended for production)
By default, NATS connections are anonymous: any client that can reach port `4222` may publish control-plane subjects such as `nodes.<id>.backend.install`. Enable JWT auth to scope workers to their own node subjects and give the frontend a dedicated service credential.
| Flag | Env Var | Description |
|------|---------|-------------|
| `--nats-account-seed` | `LOCALAI_NATS_ACCOUNT_SEED` | Account signing seed (`SU...`). The frontend mints a per-node user JWT at registration (`nats_jwt` in the register response). |
| `--nats-service-jwt` | `LOCALAI_NATS_SERVICE_JWT` | User JWT for the frontend (and optional fallback for agent workers) to publish install/upgrade and related subjects. |
| `--nats-service-seed` | `LOCALAI_NATS_SERVICE_SEED` | User signing seed (`SU...`) paired with the service JWT. |
| `--nats-worker-jwt-ttl` | `LOCALAI_NATS_WORKER_JWT_TTL` | Lifetime of minted worker JWTs (default `24h`). |
| `--nats-require-auth` | `LOCALAI_NATS_REQUIRE_AUTH` | Fail startup if JWT credentials are missing when distributed mode is enabled. |
### NATS TLS / mTLS (optional)
Use `tls://` in `--nats-url` / `LOCALAI_NATS_URL` for encrypted transport. When the server uses a private CA or requires client certificates, set:
| Flag | Env Var | Description |
|------|---------|-------------|
| `--nats-tls-ca` | `LOCALAI_NATS_TLS_CA` | PEM file to verify the NATS server (private CA) |
| `--nats-tls-cert` | `LOCALAI_NATS_TLS_CERT` | Client certificate for NATS mTLS |
| `--nats-tls-key` | `LOCALAI_NATS_TLS_KEY` | Client private key (required with `--nats-tls-cert`) |
The same env vars apply to backend workers and `local-ai agent-worker`. If the server cert is already trusted by the OS, `tls://` alone is enough.
**Worker register response** (when minting is enabled and the node is approved):
```json
{
"id": "…",
"nats_jwt": "eyJ…",
"nats_user_seed": "SU…"
}
```
Workers connect with that JWT and seed automatically (shown once; store securely). Override with `LOCALAI_NATS_JWT` / `LOCALAI_NATS_USER_SEED` if needed. Set `LOCALAI_NATS_REQUIRE_AUTH=true` on workers when the bus requires credentials.
When `LOCALAI_NATS_REQUIRE_AUTH=true` and no static credentials are provided, a worker that registers while still **pending admin approval** keeps re-registering (with backoff) until an admin approves it and the frontend mints its JWT — it does not start unauthenticated. This retry is **bounded**: if the node is never approved (or no credentials are minted) after a large number of attempts, the worker exits non-zero so the failure is visible (a crash-looping or failed worker) rather than hanging silently. Minted worker JWTs are also **refreshed automatically** before they expire (the worker re-registers at ~75% of the JWT lifetime), so long-running workers survive past `LOCALAI_NATS_WORKER_JWT_TTL`; the NATS connection picks up the new JWT on its next reconnect. If refresh fails persistently, the worker exits (to restart and re-acquire) rather than drifting toward an expired, unrenewable JWT. Statically configured (`LOCALAI_NATS_JWT`) and service (`LOCALAI_NATS_SERVICE_JWT`) credentials are used as-is and not refreshed.
Generate operator/account material with [`scripts/nats-auth-setup.sh`](https://github.com/mudler/LocalAI/blob/master/scripts/nats-auth-setup.sh) (requires [nsc](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/nsc)). Configure the NATS server with account resolver JWTs before enabling `LOCALAI_NATS_REQUIRE_AUTH`.
{{% notice note %}}
`LOCALAI_AUTH` (HTTP users/sessions) and NATS JWTs are separate: end-user API keys do not connect to NATS. HTTP registration still uses `LOCALAI_REGISTRATION_TOKEN`.
{{% /notice %}}
### Optional: S3 Object Storage
For multi-host deployments where workers don't share a filesystem, S3-compatible storage enables distributed file transfer (model files, configs):
@@ -178,12 +134,6 @@ local-ai worker \
| `--registration-token` | `LOCALAI_REGISTRATION_TOKEN` | *(empty)* | Token to authenticate with the frontend |
| `--heartbeat-interval` | `LOCALAI_HEARTBEAT_INTERVAL` | `10s` | Interval between heartbeat pings |
| `--nats-url` | `LOCALAI_NATS_URL` | *(required)* | NATS URL for backend installation and file staging |
| `--nats-jwt` | `LOCALAI_NATS_JWT` | *(empty)* | Optional override for the `nats_jwt` returned at registration |
| `--nats-user-seed` | `LOCALAI_NATS_USER_SEED` | *(empty)* | Optional override for `nats_user_seed` from registration |
| `--nats-require-auth` | `LOCALAI_NATS_REQUIRE_AUTH` | `false` | Require NATS JWT+seed (from registration or env) |
| `--nats-tls-ca` | `LOCALAI_NATS_TLS_CA` | *(empty)* | PEM file for NATS server CA |
| `--nats-tls-cert` | `LOCALAI_NATS_TLS_CERT` | *(empty)* | Client certificate for NATS mTLS |
| `--nats-tls-key` | `LOCALAI_NATS_TLS_KEY` | *(empty)* | Client private key for NATS mTLS |
| `--backends-path` | `LOCALAI_BACKENDS_PATH` | `./backends` | Path to backend binaries |
| `--models-path` | `LOCALAI_MODELS_PATH` | `./models` | Path to model files |

View File

@@ -31,41 +31,6 @@ This configuration links the following components:
Make sure all referenced models (`silero-vad-ggml`, `whisper-large-turbo`, `qwen3-4b`, `tts-1`) are also installed or defined in your LocalAI instance.
### Streaming the pipeline
By default each stage runs to completion before the next begins: the whole utterance is transcribed, the full LLM reply is generated, then it is synthesized. Each stage can instead be streamed incrementally, which lowers the time-to-first-audio of a turn:
```yaml
name: gpt-realtime
pipeline:
vad: silero-vad-ggml
transcription: whisper-large-turbo
llm: qwen3-4b
tts: tts-1
streaming:
llm: true # stream LLM tokens as transcript deltas
tts: true # emit audio deltas per synthesized chunk
transcription: true # stream transcript text deltas of the user's speech
```
- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces (requires a backend that supports streaming synthesis), instead of one delta for the whole utterance. Falls back to a single unary delta otherwise.
- **streaming.transcription**: stream `conversation.item.input_audio_transcription.delta` events as the transcript is produced (requires a transcription backend that supports streaming).
- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events. The full reply is buffered and synthesized once it is complete — streamed as audio chunks when `streaming.tts` is enabled (and the TTS backend supports it), otherwise as a single unary delta. Reasoning/thinking is always stripped from the spoken transcript. Tool calls are supported while streaming when the LLM uses its tokenizer template (`use_tokenizer_template: true`): the backend's autoparser then delivers content and tool calls separately, so the spoken transcript never leaks tool-call tokens. Grammar-based function calling keeps the buffered path.
All streaming flags are off by default, so existing pipelines are unaffected.
### Disabling thinking
For reasoning models, you can force the pipeline LLM's thinking off without editing the LLM model config:
```yaml
pipeline:
llm: qwen3-4b
disable_thinking: true # maps to enable_thinking=false for the realtime LLM
```
This is applied only to the realtime session's copy of the LLM config, so it does not affect other users of the same model. Leave it unset to use the LLM model config's own reasoning settings.
## Transports
The Realtime API supports two transports: **WebSocket** and **WebRTC**.

View File

@@ -296,28 +296,6 @@ curl http://localhost:8080/tts -H "Content-Type: application/json" -d '{
}' | aplay
```
#### Language
You can hint the synthesis language with the `language` request field:
```
curl http://localhost:8080/tts -H "Content-Type: application/json" -d '{
"model": "qwen-tts",
"input": "Bonjour le monde.",
"language": "fr"
}' | aplay
```
Supported languages: `en` (English), `zh` (Chinese), `ru` (Russian), `ja` (Japanese), `ko` (Korean), `de` (German), `fr` (French), `es` (Spanish), `it` (Italian), `pt` (Portuguese).
The value is matched case-insensitively and accepts a few forms for convenience:
- the two-letter code (`fr`, `FR`)
- a locale/region form, whose region is ignored (`fr-FR`, `pt_BR`, `zh-Hans``fr`/`pt`/`zh`)
- the English full name (`french`, `Portuguese`)
If the field is omitted or the value isn't one of the supported languages, the backend defaults to English.
#### Custom Voice Mode
Qwen3-TTS supports predefined speakers. You can specify a speaker using the `voice` parameter:
@@ -359,37 +337,6 @@ curl http://localhost:8080/tts -H "Content-Type: application/json" -d '{
}' | aplay
```
#### Per-request instructions
Instead of (or in addition to) the static YAML `instruct` option, you can pass an
`instructions` string per request. It maps to the OpenAI
[`instructions`](https://platform.openai.com/docs/api-reference/audio/createSpeech) field
and takes precedence over the YAML option when set, falling back to it when empty. This lets
a single model config serve a different emotion (CustomVoice) or a different designed voice
(VoiceDesign) on every request - useful for roleplay/narration clients that need many voices:
```
curl http://localhost:8080/v1/audio/speech -H "Content-Type: application/json" -d '{
"model": "qwen-tts-design",
"input": "Hello world, this is a test.",
"instructions": "A calm, low-pitched elderly storyteller with a warm tone."
}' | aplay
```
Backends that do not support style/voice instructions simply ignore the field.
You can also pass backend-specific generation parameters per request via the LocalAI
`params` extension (a string-to-string map; values are coerced to the backend's expected
types). For example, with the Chatterbox backend:
```
curl http://localhost:8080/v1/audio/speech -H "Content-Type: application/json" -d '{
"model": "chatterbox",
"input": "Hello world, this is a test.",
"params": { "exaggeration": "0.7", "cfg_weight": "0.3", "temperature": "0.8" }
}' | aplay
```
#### Voice Clone Mode
Voice Clone allows you to clone a voice from reference audio. Configure the model with an `AudioPath` and optional `ref_text`:

View File

@@ -1,58 +1,4 @@
---
- name: "step-3.7-flash"
url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls:
- https://huggingface.co/unsloth/Step-3.7-Flash-GGUF
description: |
**[ModelPage]**: https://static.stepfun.com/blog/step-3.7-flash/
## 1. Introduction
Step 3.7 Flash is a 198B-parameter sparse Mixture-of-Experts (MoE) vision-language model that combines a 196B-parameter language backbone with a 1.8B-parameter vision encoder for native image understanding. Engineered for high-frequency production workloads, it activates approximately 11B parameters per token and delivers a throughput of up to 400 tokens per second. Step 3.7 Flash supports a 256k context window and offers three selectable reasoning levels (low, medium, and high) so developers can easily balance speed, cost, and cognitive depth.
We built Step 3.7 Flash for developers who need to scale agentic workflows that combine perception, search, and reasoning. It is designed to handle intensive tasks such as parsing massive financial reports in one pass, running multi-step search loops with cross-source verification, or operating concurrent coding agents in high-throughput pipelines.
## 2. Capabilities & Performance
### Multimodal Perception and Verification
...
license: "apache-2.0"
tags:
- llm
- gguf
icon: https://example.com/photo.jpg
overrides:
backend: llama-cpp
function:
automatic_tool_parsing_fallback: true
grammar:
disable: true
known_usecases:
- chat
mmproj: llama-cpp/mmproj/Step-3.7-Flash-GGUF/mmproj-F32.gguf
options:
- use_jinja:true
parameters:
model: llama-cpp/models/Step-3.7-Flash-GGUF/Step-3.7-Flash-UD-Q4_K_M-00001-of-00004.gguf
template:
use_tokenizer_template: true
files:
- filename: llama-cpp/models/Step-3.7-Flash-GGUF/Step-3.7-Flash-UD-Q4_K_M-00001-of-00004.gguf
sha256: 3ace7518df03a818243c55076e8c5b422961aa3cefe4fa8f120d4456dd2edde7
uri: https://huggingface.co/unsloth/Step-3.7-Flash-GGUF/resolve/main/UD-Q4_K_M/Step-3.7-Flash-UD-Q4_K_M-00001-of-00004.gguf
- filename: llama-cpp/models/Step-3.7-Flash-GGUF/Step-3.7-Flash-UD-Q4_K_M-00002-of-00004.gguf
sha256: 1ff05ea5a4518c488548219ec944aadec6a1a075140a3f81ae258ec51b755a75
uri: https://huggingface.co/unsloth/Step-3.7-Flash-GGUF/resolve/main/UD-Q4_K_M/Step-3.7-Flash-UD-Q4_K_M-00002-of-00004.gguf
- filename: llama-cpp/models/Step-3.7-Flash-GGUF/Step-3.7-Flash-UD-Q4_K_M-00003-of-00004.gguf
sha256: 47c1b36d9e6df9fcd6e05873bdaa101a54b85e56bcd775ce0a199453387c339d
uri: https://huggingface.co/unsloth/Step-3.7-Flash-GGUF/resolve/main/UD-Q4_K_M/Step-3.7-Flash-UD-Q4_K_M-00003-of-00004.gguf
- filename: llama-cpp/models/Step-3.7-Flash-GGUF/Step-3.7-Flash-UD-Q4_K_M-00004-of-00004.gguf
sha256: 1cc54c0a491b63b86ef0ddc631950c2b881ed701de9ffb1903338d3cbf088262
uri: https://huggingface.co/unsloth/Step-3.7-Flash-GGUF/resolve/main/UD-Q4_K_M/Step-3.7-Flash-UD-Q4_K_M-00004-of-00004.gguf
- filename: llama-cpp/mmproj/Step-3.7-Flash-GGUF/mmproj-F32.gguf
sha256: 2fab13dcd32e4b3dc4410297df80f4d82627308e725dedac802940ceca7dff13
uri: https://huggingface.co/unsloth/Step-3.7-Flash-GGUF/resolve/main/mmproj-F32.gguf
- name: "lfm2.5-8b-a1b"
url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls:
@@ -31909,7 +31855,6 @@
files:
- filename: parakeet-tdt-0.6b-v3-q4_k.gguf
uri: huggingface://cstr/parakeet-tdt-0.6b-v3-GGUF/parakeet-tdt-0.6b-v3-q4_k.gguf
sha256: 1a60f6e53e5781240dde6e69a47a47a8a71995a3a106517b009225afcc514457
- name: parakeet-v2-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -31932,7 +31877,6 @@
files:
- filename: parakeet-tdt-0.6b-v2-q4_k.gguf
uri: huggingface://cstr/parakeet-tdt-0.6b-v2-GGUF/parakeet-tdt-0.6b-v2-q4_k.gguf
sha256: f392cee3c2ba81b397b021e151e4588ded7fc985f8115cfaeb405ea42fc518a9
- name: parakeet-ja-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -31955,7 +31899,6 @@
files:
- filename: parakeet-tdt-0.6b-ja.gguf
uri: huggingface://cstr/parakeet-tdt-0.6b-ja-GGUF/parakeet-tdt-0.6b-ja.gguf
sha256: a9c43116b180b8a2ada2771ac829cf751b9e73adcbe69b7c8379593f9e5da31e
- name: parakeet-tdt-1.1b-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -31978,7 +31921,6 @@
files:
- filename: parakeet-tdt-1.1b-q4_k.gguf
uri: huggingface://cstr/parakeet-tdt-1.1b-GGUF/parakeet-tdt-1.1b-q4_k.gguf
sha256: db64b442d02430b76e664fa1fd5facc7866d2bdc071d64028dad55772cde252c
- name: parakeet-tdt_ctc-110m-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -32001,7 +31943,6 @@
files:
- filename: parakeet-tdt_ctc-110m-q4_k.gguf
uri: huggingface://cstr/parakeet-tdt_ctc-110m-GGUF/parakeet-tdt_ctc-110m-q4_k.gguf
sha256: c57f84d0826b6a10172c0b9696da472efb5e4c604987ef0d023214b29f38e929
- name: parakeet-tdt_ctc-1.1b-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -32024,7 +31965,6 @@
files:
- filename: parakeet-tdt_ctc-1.1b-q4_k.gguf
uri: huggingface://cstr/parakeet-tdt_ctc-1.1b-GGUF/parakeet-tdt_ctc-1.1b-q4_k.gguf
sha256: 52784c0ac7321a6e1d915a96837f6f508fc5bff240b37f5e58dea39feb302edd
- name: parakeet-rnnt-0.6b-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -32047,7 +31987,6 @@
files:
- filename: parakeet-rnnt-0.6b-q4_k.gguf
uri: huggingface://cstr/parakeet-rnnt-0.6b-GGUF/parakeet-rnnt-0.6b-q4_k.gguf
sha256: 84de2c556e30e87ef1fe5b0ac035b581c233ec017afe517082543b19eba8c73d
- name: parakeet-rnnt-1.1b-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -32070,7 +32009,6 @@
files:
- filename: parakeet-rnnt-1.1b-q4_k.gguf
uri: huggingface://cstr/parakeet-rnnt-1.1b-GGUF/parakeet-rnnt-1.1b-q4_k.gguf
sha256: 9e6d6e5aba6dbe15853f93ad317b8017fe21df78fd854d334ca0c4144aefce08
- name: fastconformer-ctc-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -32093,7 +32031,6 @@
files:
- filename: stt-en-fastconformer-ctc-large-q4_k.gguf
uri: huggingface://cstr/stt-en-fastconformer-ctc-large-GGUF/stt-en-fastconformer-ctc-large-q4_k.gguf
sha256: 5529d6762d1799a58b4fb806f766c2ce893f59d4d38d948d1177fcd3bfa28920
- name: canary-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
@@ -32116,7 +32053,6 @@
files:
- filename: canary-1b-v2-q4_k.gguf
uri: huggingface://cstr/canary-1b-v2-GGUF/canary-1b-v2-q4_k.gguf
sha256: 187668f4b7bb7faee0c02de55664c7cb13c792dd54e47da888e05815420e16f1
- name: voxtral-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:

68
go.mod
View File

@@ -22,7 +22,7 @@ require (
github.com/go-audio/wav v1.1.0
github.com/go-skynet/go-llama.cpp v0.0.0-20240314183750-6a8041ef6b46
github.com/gofrs/flock v0.13.0
github.com/google/go-containerregistry v0.21.6
github.com/google/go-containerregistry v0.21.5
github.com/google/uuid v1.6.0
github.com/gpustack/gguf-parser-go v0.24.0
github.com/hpcloud/tail v1.0.0
@@ -37,16 +37,14 @@ require (
github.com/microcosm-cc/bluemonday v1.0.27
github.com/modelcontextprotocol/go-sdk v1.5.0
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b
github.com/mudler/edgevpn v0.34.0
github.com/mudler/edgevpn v0.32.2
github.com/mudler/go-processmanager v0.1.1
github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8
github.com/mudler/xlog v0.0.6
github.com/nats-io/jwt/v2 v2.7.4
github.com/nats-io/nats.go v1.52.0
github.com/nats-io/nkeys v0.4.15
github.com/ollama/ollama v0.20.4
github.com/onsi/ginkgo/v2 v2.29.0
github.com/onsi/gomega v1.41.0
github.com/onsi/gomega v1.40.0
github.com/openai/openai-go/v3 v3.26.0
github.com/otiai10/copy v1.14.1
github.com/otiai10/openaigo v1.7.0
@@ -65,10 +63,10 @@ require (
github.com/testcontainers/testcontainers-go/modules/nats v0.42.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.42.0
github.com/timbutler/zxcvbn v1.0.4
go.opentelemetry.io/otel v1.44.0
go.opentelemetry.io/otel/exporters/prometheus v0.66.0
go.opentelemetry.io/otel/metric v1.44.0
go.opentelemetry.io/otel/sdk/metric v1.44.0
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/exporters/prometheus v0.65.0
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
@@ -125,7 +123,7 @@ require (
github.com/go-openapi/validate v0.25.1 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/google/certificate-transparency-go v1.3.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
github.com/in-toto/attestation v1.1.2 // indirect
github.com/in-toto/in-toto-golang v0.9.0 // indirect
github.com/invopop/jsonschema v0.13.0 // indirect
@@ -134,8 +132,9 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2 // indirect
github.com/klippa-app/go-pdfium v1.19.2 // indirect
github.com/mattn/go-sqlite3 v1.14.28 // indirect
github.com/moby/moby/api v1.54.2 // indirect
github.com/moby/moby/client v0.4.1 // indirect
github.com/moby/moby/api v1.54.1 // indirect
github.com/moby/moby/client v0.4.0 // indirect
github.com/nats-io/nkeys v0.4.15 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.9.1 // indirect
@@ -156,7 +155,7 @@ require (
github.com/transparency-dev/merkle v0.0.2 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
go.mongodb.org/mongo-driver v1.17.6 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
)
@@ -326,14 +325,14 @@ require (
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 // indirect
go.uber.org/mock v0.5.2 // indirect
go.yaml.in/yaml/v2 v2.4.4
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/image v0.38.0 // indirect
golang.org/x/net v0.54.0 // indirect (for websocket)
golang.org/x/net v0.53.0 // indirect (for websocket)
golang.org/x/oauth2 v0.36.0
golang.org/x/telemetry v0.0.0-20260508192327-42602be52be6 // indirect
golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect
golang.org/x/time v0.14.0 // indirect
)
@@ -352,20 +351,21 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/c-robinson/iplib v1.0.8 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/containerd/continuity v0.4.4 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.18.2 // indirect
github.com/creachadair/otp v0.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.1 // indirect
github.com/dlclark/regexp2 v1.11.5 // indirect
github.com/docker/cli v29.4.3+incompatible // indirect
github.com/docker/cli v29.4.0+incompatible // indirect
github.com/docker/docker v28.5.2+incompatible
github.com/docker/docker-credential-helpers v0.9.3 // indirect
github.com/docker/go-connections v0.7.0 // indirect
github.com/docker/go-connections v0.6.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/flynn/noise v1.1.0 // indirect
@@ -392,24 +392,24 @@ require (
github.com/henvic/httpretty v0.1.4 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/boxo v0.39.0 // indirect
github.com/ipfs/boxo v0.37.0 // indirect
github.com/ipfs/go-cid v0.6.1 // indirect
github.com/ipfs/go-datastore v0.9.1 // indirect
github.com/ipfs/go-log/v2 v2.9.2 // indirect
github.com/ipfs/go-log/v2 v2.9.1 // indirect
github.com/ipld/go-ipld-prime v0.23.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jaypipes/pcidb v1.1.1 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.18.6
github.com/klauspost/compress v1.18.5
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/koron/go-ssdp v0.0.6 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.40.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.39.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.16.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.15.0 // indirect
github.com/libp2p/go-libp2p-record v0.3.1 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
@@ -421,7 +421,7 @@ require (
github.com/mailru/easyjson v0.9.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.22 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.17 // indirect
github.com/miekg/dns v1.1.72 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
@@ -429,6 +429,7 @@ require (
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/sys/sequential v0.6.0 // indirect
github.com/moby/term v0.5.2 // indirect
@@ -477,6 +478,7 @@ require (
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/ulikunitz/xz v0.5.14 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/vbatts/tar-split v0.12.2 // indirect
github.com/vishvananda/netlink v1.3.1 // indirect
github.com/vishvananda/netns v0.0.5 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
@@ -485,25 +487,25 @@ require (
github.com/yuin/goldmark-emoji v1.0.6 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.44.0 // indirect
go.opentelemetry.io/otel/trace v1.44.0 // indirect
go.opentelemetry.io/otel/sdk v1.43.0 // indirect
go.opentelemetry.io/otel/trace v1.43.0 // indirect
go.uber.org/dig v1.19.0 // indirect
go.uber.org/fx v1.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.28.0 // indirect
go.uber.org/zap v1.27.1 // indirect
golang.org/x/crypto v0.51.0
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect
golang.org/x/mod v0.36.0 // indirect
golang.org/x/mod v0.35.0 // indirect
golang.org/x/sync v0.20.0
golang.org/x/sys v0.45.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/term v0.43.0
golang.org/x/text v0.37.0 // indirect
golang.org/x/tools v0.45.0 // indirect
golang.org/x/tools v0.44.0 // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb // indirect
golang.zx2c4.com/wireguard/windows v0.6.1 // indirect
golang.zx2c4.com/wireguard/windows v0.5.3 // indirect
gonum.org/v1/gonum v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
howett.net/plist v1.0.2-0.20250314012144-ee69052608d9 // indirect

132
go.sum
View File

@@ -296,6 +296,8 @@ github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A=
github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw=
github.com/containerd/stargz-snapshotter/estargz v0.18.2 h1:yXkZFYIzz3eoLwlTUZKz2iQ4MrckBxJjkmD16ynUTrw=
github.com/containerd/stargz-snapshotter/estargz v0.18.2/go.mod h1:XyVU5tcJ3PRpkA9XS2T5us6Eg35yM0214Y+wvrZTBrY=
github.com/coreos/go-oidc/v3 v3.18.0 h1:V9orjXynvu5wiC9SemFTWnG4F45v403aIcjWo0d41+A=
github.com/coreos/go-oidc/v3 v3.18.0/go.mod h1:DYCf24+ncYi+XkIH97GY1+dqoRlbaSI26KVTCI9SrY4=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -340,14 +342,14 @@ github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZ
github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/docker/cli v29.4.3+incompatible h1:u+UliYm2J/rYrIh2FqHQg32neRG8GjbvNuwQRTzGspU=
github.com/docker/cli v29.4.3+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/cli v29.4.0+incompatible h1:+IjXULMetlvWJiuSI0Nbor36lcJ5BTcVpUmB21KBoVM=
github.com/docker/cli v29.4.0+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v28.5.2+incompatible h1:DBX0Y0zAjZbSrm1uzOkdr1onVghKaftjlSWt4AFexzM=
github.com/docker/docker v28.5.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker-credential-helpers v0.9.3 h1:gAm/VtF9wgqJMoxzT3Gj5p4AqIjCBS4wrsOh9yRqcz8=
github.com/docker/docker-credential-helpers v0.9.3/go.mod h1:x+4Gbw9aGmChi3qTLZj8Dfn0TD20M/fuWy0E5+WDeCo=
github.com/docker/go-connections v0.7.0 h1:6SsRfJddP22WMrCkj19x9WKjEDTB+ahsdiGYf0mN39c=
github.com/docker/go-connections v0.7.0/go.mod h1:no1qkHdjq7kLMGUXYAduOhYPSJxxvgWBh7ogVvptn3Q=
github.com/docker/go-connections v0.6.0 h1:LlMG9azAe1TqfR7sO+NJttz1gy6KO7VJBh+pMmjSD94=
github.com/docker/go-connections v0.6.0/go.mod h1:AahvXYshr6JgfUJGdDCs2b5EZG/vmaMAntpSFH5BFKE=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 h1:iFaUwBSo5Svw6L7HYpRu/0lE3e0BaElwnNO1qkNQxBY=
@@ -593,8 +595,8 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/go-containerregistry v0.21.6 h1:T+yqQIlJXKrM98Om4DlW3GoWQAmhZuLMwoDOvVrtiUM=
github.com/google/go-containerregistry v0.21.6/go.mod h1:U7MMSBIJynke2MVQrQk19NP9k/uQsGz/h0amIFSHMbo=
github.com/google/go-containerregistry v0.21.5 h1:KTJG9Pn/jC0VdZR6ctV3/jcN+q6/Iqlx0sTVz3ywZlM=
github.com/google/go-containerregistry v0.21.5/go.mod h1:ySvMuiWg+dOsRW0Hw8GYwfMwBlNRTmpYBFJPlkco5zU=
github.com/google/go-github/v69 v69.2.0 h1:wR+Wi/fN2zdUx9YxSmYE0ktiX9IAR/BeePzeaUUbEHE=
github.com/google/go-github/v69 v69.2.0/go.mod h1:xne4jymxLR6Uj9b7J7PyTpkMYstEMMwGZa0Aehh1azM=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
@@ -647,8 +649,8 @@ github.com/gpustack/gguf-parser-go v0.24.0/go.mod h1:y4TwTtDqFWTK+xvprOjRUh+dowg
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
github.com/hack-pad/go-indexeddb v0.3.2 h1:DTqeJJYc1usa45Q5r52t01KhvlSN02+Oq+tQbSBI91A=
github.com/hack-pad/go-indexeddb v0.3.2/go.mod h1:QvfTevpDVlkfomY498LhstjwbPW6QC4VC/lxYb0Kom0=
github.com/hack-pad/safejs v0.1.0 h1:qPS6vjreAqh2amUqj4WNG1zIw7qlRQJ9K10eDKMCnE8=
@@ -720,8 +722,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E=
github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0=
github.com/ipfs/boxo v0.39.0 h1:u9jLf5pLx5SWROXjHtj8VMvv+iDlMbiTyZ/vVTQ4VhI=
github.com/ipfs/boxo v0.39.0/go.mod h1:k9YCvMjytFguMHndEiGdCGMMj4b7CkdOT44vtgAxOdk=
github.com/ipfs/boxo v0.37.0 h1:2E3mZvydMI2t5IkAgtkmZ3sGsld0oS7o3I+xyzDk6uI=
github.com/ipfs/boxo v0.37.0/go.mod h1:8yyiRn54F2CsW13n0zwXEPrVsZix/gFj9SYIRYMZ6KE=
github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk=
github.com/ipfs/go-block-format v0.2.3/go.mod h1:WJaQmPAKhD3LspLixqlqNFxiZ3BZ3xgqxxoSR/76pnA=
github.com/ipfs/go-cid v0.6.1 h1:T5TnNb08+ueovG76Z5gx1L4Y7QOaGTXHg1F6raWFxIc=
@@ -733,10 +735,10 @@ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46U
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-log/v2 v2.9.2 h1:O/5BB0elpkRILvT24rCJ5976wWd7u0nJ436T3rdYdc4=
github.com/ipfs/go-log/v2 v2.9.2/go.mod h1:RziRwwXWhndlk8L75RnEe0zeAYaq2heKtEMc3jqUov0=
github.com/ipfs/go-test v0.3.0 h1:0Y4Uve3tp9HI+2lIJjfOliOrOgv/YpXg/l1y3P4DEYE=
github.com/ipfs/go-test v0.3.0/go.mod h1:JK+U8pRpATZb7lsYNSJlCj3WYB3cFfWIbI6nWRM/GFk=
github.com/ipfs/go-log/v2 v2.9.1 h1:3JXwHWU31dsCpvQ+7asz6/QsFJHqFr4gLgQ0FWteujk=
github.com/ipfs/go-log/v2 v2.9.1/go.mod h1:evFx7sBiohUN3AG12mXlZBw5hacBQld3ZPHrowlJYoo=
github.com/ipfs/go-test v0.2.3 h1:Z/jXNAReQFtCYyn7bsv/ZqUwS6E7iIcSpJ2CuzCvnrc=
github.com/ipfs/go-test v0.2.3/go.mod h1:QW8vSKkwYvWFwIZQLGQXdkt9Ud76eQXRQ9Ao2H+cA1o=
github.com/ipld/go-ipld-prime v0.23.0 h1:csqdPZH60BsTC+AZrv7fpa27v+09I/oTqyHYYYE27eE=
github.com/ipld/go-ipld-prime v0.23.0/go.mod h1:46YCFSFNFBJHPjB0pfMuv7Ly7df2eChpkpyPo5SE0bA=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
@@ -796,8 +798,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao=
github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
@@ -837,12 +839,12 @@ github.com/libp2p/go-libp2p v0.48.0 h1:h2BrLAgrj7X8bEN05K7qmrjpNHYA+6tnsGRdprjTn
github.com/libp2p/go-libp2p v0.48.0/go.mod h1:Q1fBZNdmC2Hf82husCTfkKJVfHm2we5zk+NWmOGEmWk=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-kad-dht v0.40.0 h1:as8U7Y1RX9CTKCBiFBHWKZ6tSS+rE+6WNz+H1+M+wbo=
github.com/libp2p/go-libp2p-kad-dht v0.40.0/go.mod h1:iLUjII47u3/HjxyhucI2lhsl29lrzlAs/ym16+H40jE=
github.com/libp2p/go-libp2p-kad-dht v0.39.0 h1:mww38eBYiUvdsu+Xl/GLlBC0Aa8M+5HAwvafkFOygAM=
github.com/libp2p/go-libp2p-kad-dht v0.39.0/go.mod h1:Po2JugFEkDq9Vig/JXtc153ntOi0q58o4j7IuITCOVs=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=
github.com/libp2p/go-libp2p-pubsub v0.16.0 h1:j7G2C8kJwkcAQqYR7Wmq3d75d3Sgw/N0Hhiv0dVx7OY=
github.com/libp2p/go-libp2p-pubsub v0.16.0/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
github.com/libp2p/go-libp2p-pubsub v0.15.0 h1:cG7Cng2BT82WttmPFMi50gDNV+58K626m/wR00vGL1o=
github.com/libp2p/go-libp2p-pubsub v0.15.0/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=
github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E=
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 h1:HdwZj9NKovMx0vqq6YNPTh6aaNzey5zHD7HeLJtq6fI=
@@ -883,8 +885,8 @@ github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stg
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4=
github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-runewidth v0.0.17 h1:78v8ZlW0bP43XfmAfPsdXcoNCelfMHsDmd/pkENfrjQ=
@@ -935,10 +937,10 @@ github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3N
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/go-archive v0.2.0 h1:zg5QDUM2mi0JIM9fdQZWC7U8+2ZfixfTYoHL7rWUcP8=
github.com/moby/go-archive v0.2.0/go.mod h1:mNeivT14o8xU+5q1YnNrkQVpK+dnNe/K6fHqnTg4qPU=
github.com/moby/moby/api v1.54.2 h1:wiat9QAhnDQjA7wk1kh/TqHz2I1uUA7M7t9SAl/JNXg=
github.com/moby/moby/api v1.54.2/go.mod h1:+RQ6wluLwtYaTd1WnPLykIDPekkuyD/ROWQClE83pzs=
github.com/moby/moby/client v0.4.1 h1:DMQgisVoMkmMs7fp3ROSdiBnoAu8+vo3GggFl06M/wY=
github.com/moby/moby/client v0.4.1/go.mod h1:z52C9O2POPOsnxZAy//WtKcQ32P+jT/NGeXu/7nfjGQ=
github.com/moby/moby/api v1.54.1 h1:TqVzuJkOLsgLDDwNLmYqACUuTehOHRGKiPhvH8V3Nn4=
github.com/moby/moby/api v1.54.1/go.mod h1:+RQ6wluLwtYaTd1WnPLykIDPekkuyD/ROWQClE83pzs=
github.com/moby/moby/client v0.4.0 h1:S+2XegzHQrrvTCvF6s5HFzcrywWQmuVnhOXe2kiWjIw=
github.com/moby/moby/client v0.4.0/go.mod h1:QWPbvWchQbxBNdaLSpoKpCdf5E+WxFAgNHogCWDoa7g=
github.com/moby/patternmatcher v0.6.1 h1:qlhtafmr6kgMIJjKJMDmMWq7WLkKIo23hsrpR3x084U=
github.com/moby/patternmatcher v0.6.1/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
github.com/moby/sys/mountinfo v0.7.2 h1:1shs6aH5s4o5H2zQLn796ADW1wMrIwHsyJ2v9KouLrg=
@@ -970,8 +972,8 @@ github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87 h1:az+2umaD/sT1rRv
github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87/go.mod h1:x77p9W1zKZr+W+UcEwg8/qdp00p4XXOI69wE7WlXZc0=
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU=
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4=
github.com/mudler/edgevpn v0.34.0 h1:qDrD/rCPFY/FdURbXudIZWihVKY4VOX3nMn3CcbeQEU=
github.com/mudler/edgevpn v0.34.0/go.mod h1:yki7uMi5LR9gSMrw8PdPieuxsrk8BLV2Ui7VBEmbbIA=
github.com/mudler/edgevpn v0.32.2 h1:umTPyyZgkom/A81Bk4HbP0p1ZSEU5EFPW3Bg+YPxI8A=
github.com/mudler/edgevpn v0.32.2/go.mod h1:UaMc8MORbcRsAjuO5gVJj9Bn3Nq2AP5U9NTb6epVyv8=
github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX28EkpdAyL4yQVnJMrbquKospjHyA=
github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc/go.mod h1:O7SwdSWMilAWhBZMK9N9Y/oBDyMMzshE3ju8Xkexwig=
github.com/mudler/go-processmanager v0.1.1 h1:c/1NRZOZpW8HuFv9RhBG57nQu1oDMRomEHedwBFMlrw=
@@ -1016,8 +1018,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/natefinch/atomic v1.0.1 h1:ZPYKxkqQOx3KZ+RsbnP/YsgvxWQPGxjC0oBt2AhwV0A=
github.com/natefinch/atomic v1.0.1/go.mod h1:N/D/ELrljoqDyT3rZrsUmtsuzvHkeB/wWjHV22AZRbM=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats.go v1.52.0 h1:n3avV4VBsCgsdwh71TppsTwtv+QdPs7ntSKM8qJLGsc=
github.com/nats-io/nats.go v1.52.0/go.mod h1:26HypzazeOkyO3/mqd1zZd53STJN0EjCYF9Uy2ZOBno=
github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
@@ -1044,8 +1044,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.29.0 h1:rfh+ZFjgJhYWRoIqVf3Uwx/W20yLrcrE2h2GmYVRaag=
github.com/onsi/ginkgo/v2 v2.29.0/go.mod h1:+aXOY+vzZ5mu2iI2HpTZUPmM//oQfsNFX6gU9kNcA44=
github.com/onsi/gomega v1.41.0 h1:OwKp4pXNgVxf6sCplzYo794OFNuoL2q2SBMU5NSWOjA=
github.com/onsi/gomega v1.41.0/go.mod h1:M/Uqpu/8qTjtzCLUA2zJHX9Iilrau25x1PdoSRbWh5A=
github.com/onsi/gomega v1.40.0 h1:Vtol0e1MghCD2ZVIilPDIg44XSL9l2QAn8ZNaljWcJc=
github.com/onsi/gomega v1.40.0/go.mod h1:M/Uqpu/8qTjtzCLUA2zJHX9Iilrau25x1PdoSRbWh5A=
github.com/openai/openai-go/v3 v3.26.0 h1:bRt6H/ozMNt/dDkN4gobnLqaEGrRGBzmbVs0xxJEnQE=
github.com/openai/openai-go/v3 v3.26.0/go.mod h1:cdufnVK14cWcT9qA1rRtrXx4FTRsgbDPW7Ia7SS5cZo=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
@@ -1357,6 +1357,8 @@ github.com/valyala/fasthttp v1.68.0 h1:v12Nx16iepr8r9ySOwqI+5RBJ/DqTxhOy1HrHoDFn
github.com/valyala/fasthttp v1.68.0/go.mod h1:5EXiRfYQAoiO/khu4oU9VISC/eVY6JqmSpPJoHCKsz4=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/vbatts/tar-split v0.12.2 h1:w/Y6tjxpeiFMR47yzZPlPj/FcPLpXbTUi/9H7d3CPa4=
github.com/vbatts/tar-split v0.12.2/go.mod h1:eF6B6i6ftWQcDqEn3/iGFRFRo8cBIMSJVOpnNdfTMFA=
github.com/vishvananda/netlink v1.3.1 h1:3AEMt62VKqz90r0tmNhog0r/PpWKmrEShJU0wJW6bV0=
github.com/vishvananda/netlink v1.3.1/go.mod h1:ARtKouGSTGchR8aMwmkzC0qiNPrrWO5JS/XMVl45+b4=
github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY=
@@ -1415,22 +1417,20 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 h1:YH4g8lQroajqUwWbq/tr2QX1JFmEXaDLgG+ew9bLMWo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0/go.mod h1:fvPi2qXDqFs8M4B4fmJhE92TyQs9Ydjlg3RvfUp+NbQ=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 h1:OyrsyzuttWTSur2qN/Lm0m2a8yqyIjUVBZcxFPuXq2o=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0/go.mod h1:C2NGBr+kAB4bk3xtMXfZ94gqFDtg/GkI7e9zqGh5Beg=
go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU=
go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc=
go.opentelemetry.io/otel/exporters/prometheus v0.66.0 h1:vkrK8PAznv2NKt2r+kdu252ccGzkEqLc2aSXbQIALYQ=
go.opentelemetry.io/otel/exporters/prometheus v0.66.0/go.mod h1:V/UB6D3vMF/UBOL5igAsAYnk1nG/bzYYTzvsB16cy7o=
go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc=
go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo=
go.opentelemetry.io/otel/metric/x v0.66.0 h1:YkCrx1zLOChi9ZcZ6euupOcsgzbVlec7D/xoEU1+cTA=
go.opentelemetry.io/otel/metric/x v0.66.0/go.mod h1:d1+BDj9t96do0/1LoU1ayfCv79ZgNE41qbhBvnMOBZk=
go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58=
go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0=
go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI=
go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA=
go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk=
go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 h1:7iP2uCb7sGddAr30RRS6xjKy7AZ2JtTOPA3oolgVSw8=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0/go.mod h1:c7hN3ddxs/z6q9xwvfLPk+UHlWRQyaeR1LdgfL/66l0=
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
go.opentelemetry.io/otel/exporters/prometheus v0.65.0 h1:jOveH/b4lU9HT7y+Gfamf18BqlOuz2PWEvs8yM7Q6XE=
go.opentelemetry.io/otel/exporters/prometheus v0.65.0/go.mod h1:i1P8pcumauPtUI4YNopea1dhzEMuEqWP1xoUZDylLHo=
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
go.starlark.net v0.0.0-20250417143717-f57e51f710eb h1:zOg9DxxrorEmgGUr5UPdCEwKqiqG0MlZciuCuA3XiDE=
go.starlark.net v0.0.0-20250417143717-f57e51f710eb/go.mod h1:YKMCv9b1WrfWmeqdV5MAuEHWsu5iC+fe6kYl2sQjdI8=
go.step.sm/crypto v0.74.0 h1:/APBEv45yYR4qQFg47HA8w1nesIGcxh44pGyQNw6JRA=
@@ -1452,8 +1452,8 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.28.0 h1:IZzaP1Fv73/T/pBMLk4VutPl36uNC+OSUh3JLG3FIjo=
go.uber.org/zap v1.28.0/go.mod h1:rDLpOi171uODNm/mxFcuYWxDsqWSAVkFdX4XojSKg/Q=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
@@ -1524,8 +1524,8 @@ golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.36.0 h1:JJjpVx6myfUsUdAzZuOSTTmRE0PfZeNWzzvKrP7amb4=
golang.org/x/mod v0.36.0/go.mod h1:moc6ELqsWcOw5Ef3xVprK5ul/MvtVvkIXLziUOICjUQ=
golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM=
golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -1573,8 +1573,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w=
golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -1674,11 +1674,11 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/telemetry v0.0.0-20260508192327-42602be52be6 h1:HjU6IWBiAgRIdAJ9/y1rwCn+UELEmwV+VsTLzj/W4sE=
golang.org/x/telemetry v0.0.0-20260508192327-42602be52be6/go.mod h1:Eqhaxk/wZsWEH8CRxLwj6xzEJbz7k1EFGqx7nyCoabE=
golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa h1:efT73AJZfAAUV7SOip6pWGkwJDzIGiKBZGVzHYa+ve4=
golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa/go.mod h1:kHjTxDEnAu6/Nl9lDkzjWpR+bmKfxeiRuSDlsMb70gE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -1773,8 +1773,8 @@ golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/tools v0.45.0 h1:18qN3FAooORvApf5XjCXgsuayZOEtXf6JK18I3+ONa8=
golang.org/x/tools v0.45.0/go.mod h1:LuUGqqaXcXMEFEruIVJVm5mgDD8vww/z/SR1gQ4uE/0=
golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c=
golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -1785,8 +1785,8 @@ golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 h1:B82qJJgjvYKsXS9jeu
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2/go.mod h1:deeaetjYA+DHMHg+sMSMI58GrEteJUUzzw7en6TJQcI=
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb h1:whnFRlWMcXI9d+ZbWg+4sHnLp52d5yiIPUxMBSt4X9A=
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb/go.mod h1:rpwXGsirqLqN2L0JDJQlwOboGHmptD5ZD6T2VmcqhTw=
golang.zx2c4.com/wireguard/windows v0.6.1 h1:XMaKojH1Hs/raMrmnir4n35nTvzvWj7NmSYzHn2F4qU=
golang.zx2c4.com/wireguard/windows v0.6.1/go.mod h1:04aqInu5GYuTFvMuDw/rKBAF7mHrltW/3rekpfbbZDM=
golang.zx2c4.com/wireguard/windows v0.5.3 h1:On6j2Rpn3OEMXqBq00QEDC7bWSZrPIHKIus8eIuExIE=
golang.zx2c4.com/wireguard/windows v0.5.3/go.mod h1:9TEe8TJmtwyQebdFwAkEWOPr3prrtqm+REGFifP60hI=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
@@ -1865,10 +1865,10 @@ google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaE
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9 h1:LvZVVaPE0JSqL+ZWb6ErZfnEOKIqqFWUJE2D0fObSmc=
google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9/go.mod h1:QFOrLhdAe2PsTp3vQY4quuLKTi9j3XG3r6JPPaw7MSc=
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0=
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=

View File

@@ -1,66 +0,0 @@
package natsauth
import (
"fmt"
"time"
"github.com/mudler/xlog"
)
// DefaultWorkerJWTTTL is how long a worker may use a minted NATS user JWT before re-registering.
const DefaultWorkerJWTTTL = 24 * time.Hour
// Config holds NATS JWT authentication settings for distributed mode.
type Config struct {
// AccountSeed is the NATS account signing seed (SU...). Used to mint per-node worker JWTs.
AccountSeed string
// ServiceUserJWT is a pre-generated user JWT for frontends and agent workers (broad publish).
ServiceUserJWT string
// ServiceUserSeed is the signing seed (SU...) paired with ServiceUserJWT.
ServiceUserSeed string
// WorkerJWTTTL sets expiry on minted worker JWTs. Zero uses DefaultWorkerJWTTTL.
WorkerJWTTTL time.Duration
// RequireAuth rejects anonymous NATS when true (both ServiceUserJWT and AccountSeed expected).
RequireAuth bool
}
// Enabled reports whether any NATS credential material is configured.
func (c Config) Enabled() bool {
return c.AccountSeed != "" || c.ServiceUserJWT != ""
}
// CanMintWorkers reports whether per-node JWTs can be issued at registration.
func (c Config) CanMintWorkers() bool {
return c.AccountSeed != ""
}
// WorkerTTL returns the configured worker JWT lifetime.
func (c Config) WorkerTTL() time.Duration {
if c.WorkerJWTTTL > 0 {
return c.WorkerJWTTTL
}
return DefaultWorkerJWTTTL
}
// Validate checks consistency when distributed NATS auth is required.
func (c Config) Validate() error {
if !c.RequireAuth {
return nil
}
if c.ServiceUserJWT == "" || c.ServiceUserSeed == "" {
return fmt.Errorf("LOCALAI_NATS_REQUIRE_AUTH requires LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED")
}
if c.AccountSeed == "" {
return fmt.Errorf("LOCALAI_NATS_REQUIRE_AUTH is set but LOCALAI_NATS_ACCOUNT_SEED is empty")
}
return nil
}
// WarnIfInsecure logs when distributed NATS is reachable without credentials.
func (c Config) WarnIfInsecure(distributed bool) {
if !distributed || c.Enabled() {
return
}
xlog.Warn("NATS is used without JWT credentials — any client on the bus can publish backend.install. " +
"Set LOCALAI_NATS_ACCOUNT_SEED + LOCALAI_NATS_SERVICE_JWT (see docs/features/distributed-mode.md).")
}

View File

@@ -1,16 +0,0 @@
package natsauth
import (
"fmt"
"github.com/nats-io/jwt/v2"
)
// DecodeUserClaims decodes a minted worker JWT for tests and diagnostics.
func DecodeUserClaims(token string) (*jwt.UserClaims, error) {
uc, err := jwt.DecodeUserClaims(token)
if err != nil {
return nil, fmt.Errorf("natsauth: decode user JWT: %w", err)
}
return uc, nil
}

View File

@@ -1,59 +0,0 @@
package natsauth
import (
"fmt"
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
)
// MintWorkerJWT creates a signed NATS user JWT and user seed scoped to nodeID and nodeType.
// The seed is returned once at registration so the worker can sign NATS connections.
func (c Config) MintWorkerJWT(nodeID, nodeType string) (userJWT, userSeed string, err error) {
if c.AccountSeed == "" {
return "", "", fmt.Errorf("natsauth: account seed not configured")
}
if nodeID == "" {
return "", "", fmt.Errorf("natsauth: node ID is required")
}
accountKP, err := nkeys.FromSeed([]byte(c.AccountSeed))
if err != nil {
return "", "", fmt.Errorf("natsauth: invalid account seed: %w", err)
}
userKP, err := nkeys.CreateUser()
if err != nil {
return "", "", fmt.Errorf("natsauth: create user key: %w", err)
}
seedBytes, err := userKP.Seed()
if err != nil {
return "", "", fmt.Errorf("natsauth: user seed: %w", err)
}
accountPub, err := accountKP.PublicKey()
if err != nil {
return "", "", fmt.Errorf("natsauth: account public key: %w", err)
}
userPub, err := userKP.PublicKey()
if err != nil {
return "", "", fmt.Errorf("natsauth: user public key: %w", err)
}
pubAllow, subAllow := WorkerPermissions(nodeID, nodeType)
uc := jwt.NewUserClaims(userPub)
uc.Name = fmt.Sprintf("localai-%s-%s", nodeType, workerSubjectToken(nodeID))
uc.IssuerAccount = accountPub
uc.Expires = time.Now().Add(c.WorkerTTL()).Unix()
uc.Permissions.Pub.Allow = pubAllow
uc.Permissions.Sub.Allow = subAllow
token, err := uc.Encode(accountKP)
if err != nil {
return "", "", fmt.Errorf("natsauth: encode user JWT: %w", err)
}
return token, string(seedBytes), nil
}

View File

@@ -1,60 +0,0 @@
package natsauth_test
import (
"testing"
"time"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestNatsAuth(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "NatsAuth")
}
var _ = Describe("MintWorkerJWT", func() {
var accountSeed string
BeforeEach(func() {
akp, err := nkeys.CreateAccount()
Expect(err).NotTo(HaveOccurred())
seed, err := akp.Seed()
Expect(err).NotTo(HaveOccurred())
accountSeed = string(seed)
})
It("mints a JWT with backend worker permissions", func() {
cfg := natsauth.Config{AccountSeed: accountSeed, WorkerJWTTTL: time.Hour}
token, seed, err := cfg.MintWorkerJWT("550e8400-e29b-41d4-a716-446655440000", "backend")
Expect(err).NotTo(HaveOccurred())
Expect(token).NotTo(BeEmpty())
Expect(seed).NotTo(BeEmpty())
uc, err := jwt.DecodeUserClaims(token)
Expect(err).NotTo(HaveOccurred())
Expect(uc.Permissions.Sub.Allow).To(ContainElement("nodes.550e8400-e29b-41d4-a716-446655440000.>"))
Expect(uc.Permissions.Pub.Allow).To(ContainElement("nodes.550e8400-e29b-41d4-a716-446655440000.backend.install.*.progress"))
})
It("mints agent permissions without backend install subscribe", func() {
cfg := natsauth.Config{AccountSeed: accountSeed}
token, _, err := cfg.MintWorkerJWT("node-1", "agent")
Expect(err).NotTo(HaveOccurred())
uc, err := jwt.DecodeUserClaims(token)
Expect(err).NotTo(HaveOccurred())
Expect(uc.Permissions.Sub.Allow).To(ContainElement("agent.execute"))
for _, subj := range uc.Permissions.Sub.Allow {
Expect(subj).NotTo(ContainSubstring("backend.install"))
}
})
It("rejects mint without account seed", func() {
_, _, err := (natsauth.Config{}).MintWorkerJWT("id", "backend")
Expect(err).To(HaveOccurred())
})
})

View File

@@ -1,49 +0,0 @@
package natsauth
import "strings"
// workerSubjectToken mirrors messaging.sanitizeSubjectToken without importing unexported logic.
func workerSubjectToken(nodeID string) string {
r := strings.NewReplacer(".", "-", "*", "-", ">", "-", " ", "-", "\t", "-", "\n", "-")
return r.Replace(nodeID)
}
// WorkerPermissions returns NATS pub/sub allow lists for a registered node.
func WorkerPermissions(nodeID, nodeType string) (pubAllow, subAllow []string) {
tok := workerSubjectToken(nodeID)
prefix := "nodes." + tok
switch nodeType {
case "agent":
// Agent workers consume queue workloads; they must not handle backend.install.
// Keep this list in sync with the subscriptions in core/cli/agent_worker.go.
subAllow = []string{
"agent.execute",
"jobs.*.cancel",
"jobs.*.progress",
"jobs.*.result",
"jobs.mcp-ci.new", // MCP CI jobs dispatched to agent workers
"mcp.tools.execute",
"mcp.discovery",
prefix + ".backend.stop", // stop events drive MCP session cleanup
"_INBOX.>",
}
pubAllow = []string{
"agent.>",
"jobs.>",
"_INBOX.>",
}
default:
// Backend worker: lifecycle + file staging on this node only.
subAllow = []string{
prefix + ".>",
"_INBOX.>",
}
pubAllow = []string{
prefix + ".backend.install.*.progress",
prefix + ".files.>",
"_INBOX.>",
}
}
return pubAllow, subAllow
}

View File

@@ -1,134 +0,0 @@
package natsauth_test
import (
"os"
"regexp"
"strings"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/natsauth"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// subjectMatches implements NATS subject-token matching: "*" matches exactly one
// token and ">" matches one or more trailing tokens. It lets these tests assert
// that a permission allow-list (which uses wildcards) actually covers a concrete
// subject a component publishes/subscribes — the same check the NATS server makes.
func subjectMatches(pattern, subject string) bool {
p := strings.Split(pattern, ".")
s := strings.Split(subject, ".")
for i, tok := range p {
if tok == ">" {
return i < len(s) // ">" must match at least one remaining token
}
if i >= len(s) {
return false
}
if tok != "*" && tok != s[i] {
return false
}
}
return len(p) == len(s)
}
func anyAllows(allow []string, subject string) bool {
for _, p := range allow {
if subjectMatches(p, subject) {
return true
}
}
return false
}
var _ = Describe("WorkerPermissions subject coverage", func() {
// A node ID containing NATS-reserved characters exercises the (duplicated)
// sanitizer in pkg/natsauth against the canonical one in core/services/messaging.
// If the two ever diverge, the minted prefix stops matching the real subject
// and these assertions fail — guarding the copy noted in the review.
const nodeID = "host.a 1*b"
Context("backend worker", func() {
pub, sub := natsauth.WorkerPermissions(nodeID, "backend")
// Every subject core/services/worker/{lifecycle,file_staging}.go subscribes to.
subscribed := []string{
messaging.SubjectNodeBackendInstall(nodeID),
messaging.SubjectNodeBackendUpgrade(nodeID),
messaging.SubjectNodeBackendStop(nodeID),
messaging.SubjectNodeBackendDelete(nodeID),
messaging.SubjectNodeBackendList(nodeID),
messaging.SubjectNodeModelUnload(nodeID),
messaging.SubjectNodeModelDelete(nodeID),
messaging.SubjectNodeStop(nodeID),
messaging.SubjectNodeFilesEnsure(nodeID),
messaging.SubjectNodeFilesStage(nodeID),
messaging.SubjectNodeFilesTemp(nodeID),
messaging.SubjectNodeFilesListDir(nodeID),
}
for _, subject := range subscribed {
It("allows subscribing to "+subject, func() {
Expect(anyAllows(sub, subject)).To(BeTrue(),
"backend JWT sub allow-list %v does not cover %s", sub, subject)
})
}
It("allows publishing backend.install progress", func() {
subject := messaging.SubjectNodeBackendInstallProgress(nodeID, "op-123")
Expect(anyAllows(pub, subject)).To(BeTrue(),
"backend JWT pub allow-list %v does not cover %s", pub, subject)
})
})
Context("agent worker", func() {
// node_type "agent"; subjects from core/cli/agent_worker.go.
pub, sub := natsauth.WorkerPermissions(nodeID, "agent")
_ = pub
subscribed := []string{
messaging.SubjectAgentExecute, // dispatcher (default --agent-subject)
messaging.SubjectMCPToolExecute, // QueueSubscribeReply
messaging.SubjectMCPDiscovery, // QueueSubscribeReply
messaging.SubjectMCPCIJobsNew, // QueueSubscribe — jobs.mcp-ci.new
messaging.SubjectNodeBackendStop(nodeID), // Subscribe — MCP session cleanup
}
for _, subject := range subscribed {
It("allows subscribing to "+subject, func() {
Expect(anyAllows(sub, subject)).To(BeTrue(),
"agent JWT sub allow-list %v does not cover %s — the agent worker subscribes to it", sub, subject)
})
}
})
})
var allowPubRe = regexp.MustCompile(`--allow-pub "([^"]*)"`)
var _ = Describe("Documented NATS service-user permissions", func() {
// scripts/nats-auth-setup.sh ships the recommended service (frontend) JWT
// permissions. They must cover every subject the frontend actually publishes,
// or prefix-cache sync (and friends) break once LOCALAI_NATS_REQUIRE_AUTH is on.
const scriptPath = "../../scripts/nats-auth-setup.sh"
// Representative subjects the frontend publishes on the control plane.
// prefixcache.* is emitted by prefixcache.Sync in core/application/distributed.go.
frontendPublishes := []string{
messaging.SubjectPrefixCacheObserve,
messaging.SubjectPrefixCacheInvalidate,
messaging.SubjectNodeBackendInstall("node-1"),
messaging.SubjectGalleryProgress("op-1"),
}
It("cover every subject the frontend publishes", func() {
raw, err := os.ReadFile(scriptPath)
Expect(err).ToNot(HaveOccurred(), "cannot read %s", scriptPath)
m := allowPubRe.FindStringSubmatch(string(raw))
Expect(m).To(HaveLen(2), "no --allow-pub list found in %s", scriptPath)
allow := strings.Split(m[1], ",")
for _, subject := range frontendPublishes {
Expect(anyAllows(allow, subject)).To(BeTrue(),
"service-user --allow-pub %v does not cover %s (frontend publishes it)", allow, subject)
}
})
})

View File

@@ -1,49 +0,0 @@
#!/usr/bin/env bash
# Generate NATS account + service user JWTs for LocalAI distributed mode.
#
# Requires: nsc (https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/nsc)
#
# Usage:
# ./scripts/nats-auth-setup.sh
#
# Outputs operator/account seeds and a service user JWT suitable for:
# LOCALAI_NATS_ACCOUNT_SEED
# LOCALAI_NATS_SERVICE_JWT
#
# Per-node worker JWTs are minted automatically by the frontend at registration
# when LOCALAI_NATS_ACCOUNT_SEED is set.
set -euo pipefail
if ! command -v nsc >/dev/null 2>&1; then
echo "nsc is required. Install from https://github.com/nats-io/nsc/releases" >&2
exit 1
fi
OPERATOR="${NATS_OPERATOR_NAME:-localai-operator}"
ACCOUNT="${NATS_ACCOUNT_NAME:-localai}"
SERVICE_USER="${NATS_SERVICE_USER:-localai-frontend}"
nsc add operator -n "$OPERATOR" --generate-signing-key
nsc add account -n "$ACCOUNT"
nsc add user -n "$SERVICE_USER" --account "$ACCOUNT"
# Broad publish for frontend control plane (tighten with custom claims in production).
nsc edit user -n "$SERVICE_USER" --account "$ACCOUNT" \
--allow-pub "nodes.>,gallery.>,agent.>,jobs.>,mcp.>,cache.>,prefixcache.>,finetune.>" \
--allow-sub "nodes.>,gallery.>,agent.>,jobs.>,mcp.>,cache.>,prefixcache.>,_INBOX.>"
KEYS_DIR="${NATS_KEYS_DIR:-./nats-keys}"
mkdir -p "$KEYS_DIR"
nsc generate creds -a "$ACCOUNT" -n "$SERVICE_USER" -o "$KEYS_DIR"
ACCOUNT_SEED=$(nsc describe account "$ACCOUNT" -o json | jq -r '.nats.private_key')
SERVICE_JWT=$(cat "$KEYS_DIR/${ACCOUNT}/${SERVICE_USER}.jwt" 2>/dev/null || cat "$KEYS_DIR/${SERVICE_USER}.jwt")
echo ""
echo "=== LocalAI NATS auth material ==="
echo "LOCALAI_NATS_ACCOUNT_SEED=${ACCOUNT_SEED}"
echo "LOCALAI_NATS_SERVICE_JWT=${SERVICE_JWT}"
echo ""
echo "Configure the NATS server with the generated operator/account JWTs under $KEYS_DIR"
echo "and set LOCALAI_NATS_REQUIRE_AUTH=true on frontends and workers in production."

View File

@@ -5897,10 +5897,6 @@ const docTemplate = `{
"description": "text input",
"type": "string"
},
"instructions": {
"description": "Instructions is a free-form, per-request style/voice description. It maps to\nthe OpenAI ` + "`" + `instructions` + "`" + ` field and is forwarded to the backend so expressive\nTTS models (e.g. Qwen3-TTS CustomVoice/VoiceDesign) can vary tone or designed\nvoice per request instead of only via the static YAML option.",
"type": "string"
},
"language": {
"description": "(optional) language to use with TTS model",
"type": "string"
@@ -5908,13 +5904,6 @@ const docTemplate = `{
"model": {
"type": "string"
},
"params": {
"description": "Params carries optional, backend-specific per-request generation parameters\n(LocalAI extension, e.g. Chatterbox exaggeration/cfg_weight/temperature).",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"response_format": {
"description": "(optional) output format",
"type": "string"

View File

@@ -5894,10 +5894,6 @@
"description": "text input",
"type": "string"
},
"instructions": {
"description": "Instructions is a free-form, per-request style/voice description. It maps to\nthe OpenAI `instructions` field and is forwarded to the backend so expressive\nTTS models (e.g. Qwen3-TTS CustomVoice/VoiceDesign) can vary tone or designed\nvoice per request instead of only via the static YAML option.",
"type": "string"
},
"language": {
"description": "(optional) language to use with TTS model",
"type": "string"
@@ -5905,13 +5901,6 @@
"model": {
"type": "string"
},
"params": {
"description": "Params carries optional, backend-specific per-request generation parameters\n(LocalAI extension, e.g. Chatterbox exaggeration/cfg_weight/temperature).",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"response_format": {
"description": "(optional) output format",
"type": "string"

View File

@@ -1996,25 +1996,11 @@ definitions:
input:
description: text input
type: string
instructions:
description: |-
Instructions is a free-form, per-request style/voice description. It maps to
the OpenAI `instructions` field and is forwarded to the backend so expressive
TTS models (e.g. Qwen3-TTS CustomVoice/VoiceDesign) can vary tone or designed
voice per request instead of only via the static YAML option.
type: string
language:
description: (optional) language to use with TTS model
type: string
model:
type: string
params:
additionalProperties:
type: string
description: |-
Params carries optional, backend-specific per-request generation parameters
(LocalAI extension, e.g. Chatterbox exaggeration/cfg_weight/temperature).
type: object
response_format:
description: (optional) output format
type: string

View File

@@ -1,156 +0,0 @@
package distributed_test
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/natsauth"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/testcontainers/testcontainers-go"
tcnats "github.com/testcontainers/testcontainers-go/modules/nats"
)
// JWTTestInfra holds a NATS server configured with JWT auth and minted worker credentials.
type JWTTestInfra struct {
*TestInfra
AccountSeed string
NodeID string
WorkerJWT string
WorkerSeed string
}
// SetupJWTInfra starts NATS with an in-memory JWT resolver and returns worker credentials
// minted the same way as node registration (pkg/natsauth).
func SetupJWTInfra() *JWTTestInfra {
GinkgoHelper()
infra := &JWTTestInfra{TestInfra: &TestInfra{Ctx: context.Background()}}
operatorJWT, accountJWT, accountSeed, err := jwtResolverMaterial()
Expect(err).ToNot(HaveOccurred())
infra.AccountSeed = accountSeed
conf := fmt.Sprintf(`listen: 0.0.0.0:4222
operator: %s
resolver: MEMORY
resolver_preload: {
%s: %s
}
`, operatorJWT, accountPublicKeyFromSeed(accountSeed), accountJWT)
var natsContainer *tcnats.NATSContainer
// Override default testcontainers -js: JetStream fails without a system account in JWT mode.
natsContainer, err = tcnats.Run(infra.Ctx, "nats:2-alpine",
tcnats.WithConfigFile(bytes.NewBufferString(conf)),
testcontainers.WithCmd("-c", "/etc/nats.conf"),
)
Expect(err).ToNot(HaveOccurred())
infra.NATSContainer = natsContainer
infra.NatsURL, err = infra.NATSContainer.ConnectionString(infra.Ctx)
Expect(err).ToNot(HaveOccurred())
infra.NodeID = "550e8400-e29b-41d4-a716-446655440000"
cfg := natsauth.Config{AccountSeed: infra.AccountSeed, WorkerJWTTTL: time.Hour}
infra.WorkerJWT, infra.WorkerSeed, err = cfg.MintWorkerJWT(infra.NodeID, "backend")
Expect(err).ToNot(HaveOccurred())
infra.NC, err = messaging.New(infra.NatsURL, messaging.WithUserJWT(infra.WorkerJWT, infra.WorkerSeed))
Expect(err).ToNot(HaveOccurred())
DeferCleanup(func() {
if infra.NC != nil {
infra.NC.Close()
}
if infra.NATSContainer != nil {
_ = infra.NATSContainer.Terminate(context.Background())
}
})
return infra
}
// jwtResolverMaterial builds operator + account JWTs for a MEMORY resolver.
// Follows the NATS JWT tutorial: self-signed account, then operator re-sign, with the
// account identity key listed as a signing key so MintWorkerJWT can use the account seed.
func jwtResolverMaterial() (operatorJWT, accountJWT, accountSeed string, err error) {
okp, err := nkeys.CreateOperator()
if err != nil {
return "", "", "", err
}
opk, err := okp.PublicKey()
if err != nil {
return "", "", "", err
}
oc := jwt.NewOperatorClaims(opk)
oc.Name = "localai-test-operator"
oskp, err := nkeys.CreateOperator()
if err != nil {
return "", "", "", err
}
ospk, err := oskp.PublicKey()
if err != nil {
return "", "", "", err
}
oc.SigningKeys.Add(ospk)
operatorJWT, err = oc.Encode(okp)
if err != nil {
return "", "", "", err
}
akp, err := nkeys.CreateAccount()
if err != nil {
return "", "", "", err
}
seed, err := akp.Seed()
if err != nil {
return "", "", "", err
}
accountSeed = string(seed)
apk, err := akp.PublicKey()
if err != nil {
return "", "", "", err
}
ac := jwt.NewAccountClaims(apk)
ac.Name = "localai-test-account"
ac.SigningKeys.Add(apk)
accountJWT, err = ac.Encode(akp)
if err != nil {
return "", "", "", err
}
ac, err = jwt.DecodeAccountClaims(accountJWT)
if err != nil {
return "", "", "", err
}
accountJWT, err = ac.Encode(oskp)
if err != nil {
return "", "", "", err
}
return operatorJWT, accountJWT, accountSeed, nil
}
func accountPublicKeyFromSeed(accountSeed string) string {
akp, err := nkeys.FromSeed([]byte(accountSeed))
Expect(err).ToNot(HaveOccurred())
pk, err := akp.PublicKey()
Expect(err).ToNot(HaveOccurred())
return pk
}
// nodeSubjectPrefix returns the sanitized nodes.* prefix for a node ID.
func nodeSubjectPrefix(nodeID string) string {
tok := strings.NewReplacer(".", "-", "*", "-", ">", "-", " ", "-", "\t", "-", "\n", "-").Replace(nodeID)
return "nodes." + tok
}

View File

@@ -1,99 +0,0 @@
package distributed_test
import (
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/natsauth"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("NATS JWT Auth", Label("Distributed", "NatsJWT"), func() {
var infra *JWTTestInfra
BeforeEach(func() {
infra = SetupJWTInfra()
})
It("connects with a minted backend worker JWT and publishes on allowed subjects", func() {
// Backend workers may publish under nodes.<id>.files.> (see pkg/natsauth permissions).
subject := nodeSubjectPrefix(infra.NodeID) + ".files.in"
Expect(infra.NC.Publish(subject, map[string]string{"path": "/tmp/model"})).To(Succeed())
Expect(infra.NC.Conn().FlushTimeout(2 * time.Second)).To(Succeed())
Expect(infra.NC.Conn().IsConnected()).To(BeTrue())
})
It("allows backend subscribe on the node prefix", func() {
wild := nodeSubjectPrefix(infra.NodeID) + ".>"
sub, err := infra.NC.Subscribe(wild, func(_ []byte) {})
Expect(err).ToNot(HaveOccurred())
defer func() { _ = sub.Unsubscribe() }()
Expect(infra.NC.Conn().FlushTimeout(2 * time.Second)).To(Succeed())
Expect(infra.NC.Conn().IsConnected()).To(BeTrue())
})
It("rejects anonymous publish on the JWT-enabled server", func() {
anon, err := messaging.New(infra.NatsURL)
Expect(err).ToNot(HaveOccurred())
defer anon.Close()
err = anon.Publish("nodes.any.files.x", map[string]string{"x": "1"})
Expect(err).ToNot(HaveOccurred())
Expect(anon.Conn().FlushTimeout(2 * time.Second)).To(HaveOccurred())
})
It("denies backend publish to another node's subjects", func() {
other := nodeSubjectPrefix("other-node-id") + ".files.stage"
Expect(infra.NC.Publish(other, map[string]string{"stage": "nope"})).To(Succeed())
Eventually(func() error {
_ = infra.NC.Conn().FlushTimeout(500 * time.Millisecond)
return infra.NC.Conn().LastError()
}, "3s", "50ms").Should(HaveOccurred())
})
It("mints agent JWT without backend.install in claims", func() {
cfg := natsauth.Config{AccountSeed: infra.AccountSeed}
token, _, err := cfg.MintWorkerJWT("agent-node-1", "agent")
Expect(err).ToNot(HaveOccurred())
claims, err := natsauth.DecodeUserClaims(token)
Expect(err).ToNot(HaveOccurred())
Expect(claims.Permissions.Sub.Allow).To(ContainElement("agent.execute"))
for _, subj := range claims.Permissions.Sub.Allow {
Expect(subj).NotTo(ContainSubstring("backend.install"))
}
})
// Regression guard for the silent permission gaps: decoding the JWT claims
// (above) only proves the agent JWT is *restrictive*, not that it is
// *sufficient*. Stand a real agent connection up against the enforcing
// server and exercise every subscription core/cli/agent_worker.go actually
// makes — a denied SUB now surfaces synchronously via confirmSubscription,
// so a missing allow rule fails this test instead of silently dropping
// backend.stop / MCP-CI deliveries at runtime.
It("lets an agent-minted JWT establish all the subscriptions the agent worker uses", func() {
const nodeID = "agent-node-subs"
cfg := natsauth.Config{AccountSeed: infra.AccountSeed, WorkerJWTTTL: time.Hour}
token, seed, err := cfg.MintWorkerJWT(nodeID, "agent")
Expect(err).ToNot(HaveOccurred())
nc, err := messaging.New(infra.NatsURL, messaging.WithUserJWT(token, seed))
Expect(err).ToNot(HaveOccurred())
DeferCleanup(nc.Close)
// Mirror core/cli/agent_worker.go exactly.
_, err = nc.QueueSubscribeReply(messaging.SubjectMCPToolExecute, messaging.QueueAgentWorkers, func([]byte, func([]byte)) {})
Expect(err).ToNot(HaveOccurred(), "agent JWT must allow %s", messaging.SubjectMCPToolExecute)
_, err = nc.QueueSubscribeReply(messaging.SubjectMCPDiscovery, messaging.QueueAgentWorkers, func([]byte, func([]byte)) {})
Expect(err).ToNot(HaveOccurred(), "agent JWT must allow %s", messaging.SubjectMCPDiscovery)
_, err = nc.QueueSubscribe(messaging.SubjectMCPCIJobsNew, messaging.QueueWorkers, func([]byte) {})
Expect(err).ToNot(HaveOccurred(), "agent JWT must allow %s (MCP CI jobs)", messaging.SubjectMCPCIJobsNew)
_, err = nc.Subscribe(messaging.SubjectNodeBackendStop(nodeID), func([]byte) {})
Expect(err).ToNot(HaveOccurred(), "agent JWT must allow %s (MCP session cleanup)", messaging.SubjectNodeBackendStop(nodeID))
})
})