Compare commits

..

21 Commits

Author SHA1 Message Date
Ettore Di Giacinto
e4fec35772 fix(review): address self-review findings on the distributed install fixes
Three findings from an adversarial review of this branch:

1. CRITICAL - OpCache.GetStatus crashed under concurrent load. m.Map() returns
   the live internal map by reference, so deleting from it on the read path was
   an unsynchronized write to a map four HTTP handlers poll every ~1s -> a
   'concurrent map writes' fatal. Rewritten to iterate a Keys() snapshot, build
   a fresh result map, and apply evictions via the locked DeleteUUID after the
   loop. Added a -race concurrency regression guard.

2. HIGH - GetStatus evicted failed ops too, hiding them from /api/operations
   and breaking the dismiss-failed-op flow (the panel keeps Error != nil ops so
   the admin can read the error and click Dismiss). Eviction now fires only for
   terminal ops with Error == nil (success/cancelled); failures are retained.

3. MEDIUM - DeleteStalePendingBackendOps missed StatusUnhealthy nodes. A node
   marked unhealthy on a NATS ErrNoResponders never transitions to offline
   (health.go skips re-marking it), so its pending ops leaked exactly like the
   offline case. Unhealthy is now reaped via the same stale-heartbeat grace path
   (a fresh-heartbeat node is recovering and keeps its op).

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 23:31:53 +00:00
Ettore Di Giacinto
05c0a08e24 fix(galleryop): persist cancellation + periodically reap orphaned ops
Two distributed gaps surfaced when a replica was killed mid-upgrade on a live
cluster, leaving the backend stuck 'processing' in the UI forever:

1. CancelOperation flipped the in-memory status to cancelled and broadcast a
   NATS event but never persisted the terminal status. On the next replica
   restart the still-active row re-hydrated straight back into
   processingBackends and the UI spun again. It now calls store.Cancel(id) so
   the cancel survives a restart.

2. CleanStale (which marks abandoned active ops failed) only ran once on
   startup, so an op orphaned AFTER startup - its owning replica's foreground
   handler goroutine gone - was never reaped until the next restart. Add
   GalleryService.ReapStaleOperations and run it on a 15m ticker (CleanStale
   now returns the reaped count for observability).

Neither is covered by the OpCache self-evict fix: an orphaned op never reaches
Processed, so it would never self-evict.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 23:10:53 +00:00
Ettore Di Giacinto
7824105a31 fix(nodes): stream per-node progress during backend upgrade
The install dispatch subscribed to a per-op progress subject and streamed
per-node download ticks; the upgrade dispatch did a bare 15-minute blocking
NATS round-trip with no subscription, so the UI showed progress:0 the whole
time (the 'reinstalling but nothing happens' report on a slow node).

Thread the op ID through BackendManager.UpgradeBackend -> the distributed
manager -> the adapter, and have the adapter subscribe to the per-op progress
subject before the request (extracted into a shared subscribeProgress helper
reused by install/upgrade/force-fallback). The worker's upgradeBackend now
creates the same DebouncedInstallProgressPublisher installBackend uses. An
upgrade is a force-reinstall, so it reuses SubjectNodeBackendInstallProgress
rather than minting a new subject - no new NATS permission, no new
rolling-update compat surface. Reconciler-driven retries pass empty
opID/onProgress and stay on the silent path.

Reproduced on a live cluster: upgrade of llama-cpp-development on agx-orin-slow
sat at progress:0 for 4+ minutes with no per-node feedback.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 22:58:09 +00:00
Ettore Di Giacinto
47fa847d55 fix(nodes): clear pending backend ops behind offline/draining nodes
ListDuePendingBackendOps filters status=healthy, so a backend op queued against
a node that went offline (stale heartbeat) or draining (admin action) was never
retried, aged out, or deleted - it leaked forever and kept the UI operation
spinning. Add DeleteStalePendingBackendOps and run it each reconcile pass:
draining nodes are cleared immediately (model rows already purged), offline
nodes once their heartbeat is older than a grace window (blip protection).

Reproduced on a live cluster: orphaned llama-cpp install rows targeting an
offline (nvidia-thor) and a draining (mac-mini-m4) node sat at attempts=0
indefinitely.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 22:37:24 +00:00
Ettore Di Giacinto
9a7ebc1151 fix(galleryop): self-evict terminal ops from OpCache.GetStatus
The processingBackends map (the UI 'reinstalling' spinner source) only cleared
an op when a client polled /api/backends/job/:uid. The Manage-page Reinstall and
Upgrade buttons never poll, so completed installs leaked into processingBackends
forever and the backend card spun 'reinstalling' even though the install had
finished. Evict terminal ops on the list read instead; DeleteUUID already
broadcasts the eviction so peer replicas converge.

Reproduced on a live 5-node distributed cluster: 5 backends sat in
processingBackends with underlying jobs reporting completed:true,progress:100.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 22:31:04 +00:00
LocalAI [bot]
f6cc90d258 chore: ⬆️ Update mudler/parakeet.cpp to e270af73b94c9a5c37ec516230219ed4580e1db6 (#10212)
⬆️ Update mudler/parakeet.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-06-07 23:52:44 +02:00
Adira
2c804bef5a fix(config): skip vocab arrays and mmap GGUF headers to speed up startup (#10213)
When the models directory holds many GGUF files, startup parsed every
model's full GGUF — including the tokenizer vocab arrays
(tokenizer.ggml.tokens/scores/merges, often >100k entries) — once per
model while guessing defaults. On slow storage (e.g. a models directory
on a Docker volume) those hundreds of thousands of tiny reads dominate
boot time before the HTTP server comes up.

The default-guessing path and the VRAM metadata reader only consume
scalar metadata and array lengths, never the array contents. Parse with
SkipLargeMetadata (seek past large arrays) and UseMMap (fault in a few
header pages instead of issuing per-element read() syscalls). For a
256k-token vocab this cuts the parse from ~524k read() syscalls to 8.
The mapping is released when ParseGGUFFile returns.

Fixes #9790

Assisted-by: Claude:claude-opus-4-8 [Claude Code]

Signed-off-by: Adira Denis Muhando <dennisadira@gmail.com>
2026-06-07 23:33:52 +02:00
LocalAI [bot]
6070402477 chore(model gallery): 🤖 add 1 new models via gallery agent (#10209)
chore(model gallery): 🤖 add new models via gallery agent

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-06-07 22:09:32 +02:00
LocalAI [bot]
67f80a152b fix(mtp): don't auto-enable self-spec MTP for draft-only assistant GGUFs (#10208)
Gemma4 MTP (ggml-org/llama.cpp#23398) registers the prediction head as a
separate `gemma4-assistant` architecture. That assistant GGUF still carries
`<arch>.nextn_predict_layers`, so the architecture-agnostic detection in
HasEmbeddedMTPHead matched it and appended the `spec_type:draft-mtp` defaults.

Unlike the DeepSeek/Qwen embedded-head models, an assistant checkpoint cannot
self-speculate: it is a draft model that requires a paired target context
(`ctx_other`) and throws if loaded alone. Auto-applying the self-spec defaults
to a standalone assistant import therefore produces a broken config.

Guard the detection against draft-only assistant architectures (the `-assistant`
suffix is upstream's naming convention) so importing one no longer yields a
self-speculation config. Two-model target+draft pairing remains expressible
manually via `draft_model:` and is left to a follow-up.


Assisted-by: Claude:claude-opus-4-8 [Claude Code]

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 22:09:02 +02:00
LocalAI [bot]
a7cb587d96 feat(parakeet-cpp): real segment timestamps (NeMo-faithful) (#10207)
* feat(parakeet-cpp): real segment timestamps (NeMo-faithful)

Offline: replace the single synthetic whole-clip segment with multiple
segments grouped exactly like NeMo's get_segment_offsets - a new segment
after sentence-ending punctuation ('. ? !'), each carrying start/end and
its time-window token ids. The optional model option segment_gap_threshold
(NeMo's unit: encoder FRAMES, default 0=off) adds NeMo's silence-gap split,
converted to seconds via the JSON frame_sec the engine now reports.
Per-segment words are still gated behind timestamp_granularities=["word"];
a zero-word document falls back to a single text segment.

Streaming: when libparakeet.so exposes the ABI v4 JSON entry points
(probed), drive parakeet_capi_stream_feed_json / _finalize_json and
accumulate the streamed per-word timestamps into per-utterance segments
(EOU stays the boundary), so streaming FinalResult segments now carry
start/end. Falls back to the text-only feed against an older library.

Pure-Go specs cover splitWordsIntoSegments (punctuation + gap rules, NeMo
elif order, fallback), transcriptResultFromDoc (multi-segment, token
windows, word-granularity gate), and the streaming segmenter.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(audio): document parakeet-cpp segment timestamps + segment_gap_threshold

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(parakeet-cpp): update model-gated specs for multi-segment output

The offline AudioTranscription specs asserted the old single synthetic
segment (Segments HaveLen(1), Segments[0].Text == res.Text). With
NeMo-faithful segmentation a multi-sentence clip now yields multiple
punctuation-delimited segments, so assert the new contract instead:
one-or-more time-ordered segments, each with text and (under word
granularity) per-segment words whose span tracks the segment start/end.
Caught by running the model-gated suite on the dgx (GB10) against the
real tdt_ctc-110m + realtime_eou models.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 22:08:24 +02:00
LocalAI [bot]
f7c74ad2da chore: ⬆️ Update ggml-org/llama.cpp to 31e82494c0a3913c919c1027fa70500fbf4c07dd (#10191)
⬆️ Update ggml-org/llama.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-06-07 10:43:17 +02:00
LocalAI [bot]
7402d1fd20 chore(turboquant): bump to 7d9715f1 + fix compilation against rebased fork (#10205)
* chore(turboquant): bump TheTom/llama-cpp-turboquant to 7d9715f1

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* fix(turboquant): drop obsolete legacy-spec shim after fork rebased

The TheTom/llama-cpp-turboquant fork (pin c9aa86a) rebased past the
upstream common_params_speculative refactor (ggml-org/llama.cpp
#22397/#22838/#22964), the model_tgt rename (#22838) and get_media_marker
(#21962). The old fork-compat shim forced now-wrong legacy code paths,
breaking the build with errors like 'struct common_params_speculative has
no member named mparams_dft / type' and 'server_context_impl has no member
named model'.

Remove the obsolete LOCALAI_LEGACY_LLAMA_CPP_SPEC branches from the shared
grpc-server.cpp (stock llama-cpp and the modern fork both take the modern
path now), and narrow the one remaining gap (the fork still lacks
common_params::checkpoint_min_step) to a dedicated
LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP guard injected by
patch-grpc-server.sh. The patch script now only adds the turbo2/3/4
KV-cache types and injects that one macro.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* fix(turboquant): HIP-port the fork's CUDA additions (copy2d 3D-peer + cudaEventCreate)

The turboquant fork adds/modifies a few ggml-cuda.cu spots with CUDA APIs that
ggml's HIP/MUSA shim does not provide, breaking the -gpu-rocm-hipblas-turboquant
build. patches/0001-hip-guard-copy2d-peer-fastpath.patch (applied by
apply-patches.sh) ports them:

- Guard ggml_cuda_copy2d_across_devices's 3D-peer copy fast path with
  #if !defined(GGML_USE_HIP) && !defined(GGML_USE_MUSA) so HIP/MUSA fall through
  to the existing cudaMemcpyAsync staging fallback (HIP genuinely lacks
  cudaMemcpy3DPeerAsync, per the fork's own comment).
- Create the device event in ggml_backend_cuda_device_event_new with the
  HIP-aliased cudaEventCreateWithFlags(.., cudaEventDisableTiming) instead of the
  un-aliased plain cudaEventCreate, matching this file's own usage elsewhere.

CUDA builds are unaffected.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* ci(turboquant): drop the ROCm/hipblas build flavor

The TheTom/llama-cpp-turboquant fork is not ROCm-clean at the current pin:
beyond the CUDA-API gaps already patched (3D-peer copy, cudaEventCreate),
its llama.cpp base fails to compile the flash-attention MMA f16 kernels for
head-dim 640 under HIP (cols_per_warp evaluates to 0 -> division-by-zero /
non-constant static asserts in fattn-mma-f16.cuh). That is a deep
ggml-on-ROCm kernel issue, not something a small fork patch can paper over.

Drop -gpu-rocm-hipblas-turboquant from the build matrix so turboquant still
ships for cpu / cublas / vulkan / sycl. Re-add it once the fork's HIP path
compiles (or upstream ggml fixes the large-head-dim MMA kernels for ROCm).

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 10:42:06 +02:00
LocalAI [bot]
8c42695ef8 chore: ⬆️ Update ggml-org/whisper.cpp to a8ec021f2750a473ff4a8f3883bc9fdf5feafa84 (#10202)
⬆️ Update ggml-org/whisper.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-06-07 08:37:42 +02:00
LocalAI [bot]
72e3241431 chore: ⬆️ Update mudler/parakeet.cpp to abd0087dcc92ec5ad1f96f9fd86c49eb26a5ce67 (#10204)
⬆️ Update mudler/parakeet.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-06-07 00:37:28 +02:00
LocalAI [bot]
cd2bf95862 fix(docs): use relearn notice shortcode instead of unsupported alert (#10206)
The Hugo relearn theme does not provide an "alert" shortcode, so the
docs deploy failed at the Build site step:

  failed to extract shortcode: template for shortcode "alert" not found
  docs/content/features/distributed-mode.md:136

Convert the warning block to the theme-supported notice shortcode used
everywhere else in the docs.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-07 00:37:12 +02:00
LocalAI [bot]
f64b72dd7d feat: support Ideogram4 in stablediffusion-ggml backend + gallery (#10201)
* feat(stablediffusion-ggml): support Ideogram4 unconditional diffusion model

Bump stable-diffusion.cpp from 1f9ee88 to b9254dd, the upstream commit that
adds Ideogram4 support (leejet/stable-diffusion.cpp#1609). Ideogram4 derives
its classifier-free guidance from a separate unconditional diffusion model,
exposed upstream through the new sd_ctx_params_t.uncond_diffusion_model_path
field.

Wire that field into the gosd wrapper via a new uncond_diffusion_model_path
option. The _path suffix is deliberate: the Go loader only resolves options
whose name contains "path" to an absolute path under the model directory, so
this keeps the option consistent with diffusion_model_path and
high_noise_diffusion_model_path.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* feat(gallery): add Ideogram4 stablediffusion-ggml models

Single-file GGUF weights for Ideogram4 are now published
(stduhpf/ideogram-4-gguf), so add the model to the gallery. Ideogram4 is a
text-to-image model with strong, accurate in-image text rendering, driven by
a Qwen3-VL-8B text encoder and real classifier-free guidance from a separate
unconditional diffusion model (the uncond_diffusion_model_path support added
in the preceding commit).

Two index entries, both built on gallery/virtual.yaml with the full config
inlined in overrides (same pattern as the other models, no dedicated template
file):
- ideogram-4-iq4nl-ggml (4-bit, ~11.6GB diffusion)
- ideogram-4-q8_0-ggml  (8-bit, ~20GB diffusion)

Each bundles the diffusion + unconditional GGUF (stduhpf), the
Qwen3-VL-8B-Instruct text encoder (unsloth), and the FLUX.2 VAE (Comfy-Org
mirror, non-gated). cfg_scale is 7 to match the upstream Ideogram4 default,
since it performs real CFG unlike the guidance-distilled Flux/Z-Image models.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-06 22:50:12 +02:00
LocalAI [bot]
03c84cff28 feat(parakeet-cpp): nemotron-3.5-asr multilingual streaming model + request language support (#10199)
* feat(parakeet-cpp): honor request language (multilingual nemotron) on batched + streaming paths

Reads opts.GetLanguage() and threads it through to the new
parakeet_capi_transcribe_pcm_batch_json_lang and parakeet_capi_stream_begin_lang
C-API entry points, both probed with Dlsym so the backend still loads against an
older libparakeet.so (falling back to the non-lang paths, i.e. model default).

parakeet.cpp's batched C-API takes a single target_lang for the whole batch, so
the dispatcher only coalesces same-language requests: a request whose language
differs from the batch leader is held as a single carry-over and becomes the
leader of the next batch, never dropped and never left waiting (including on
shutdown). A new batcher test asserts no dispatched batch is ever mixed-language
and that every submitted request still receives a reply.

Assisted-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* feat(gallery): add parakeet-cpp-nemotron-3.5-asr-streaming-0.6b; bump parakeet.cpp pin

Adds the multilingual prompt-conditioned streaming model to the gallery (q8_0
default, OpenMDW-1.1) and bumps the parakeet-cpp backend pin to the parakeet.cpp
commit that ships nemotron support plus batched causal subsampling and the
batched target_lang C-API.

Assisted-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-06 13:53:10 +02:00
LocalAI [bot]
9bc69c9e5f chore(model gallery): 🤖 add 1 new models via gallery agent (#10200)
chore(model gallery): 🤖 add new models via gallery agent

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-06-06 13:52:46 +02:00
LocalAI [bot]
1e6c9cfd60 chore: ⬆️ Update ikawrakow/ik_llama.cpp to 6b9de3dbaa21ae95ea80638e5ee836795cc48c93 (#10190)
⬆️ Update ikawrakow/ik_llama.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-06-06 09:42:43 +02:00
LocalAI [bot]
0e6712f734 chore: ⬆️ Update mudler/parakeet.cpp to 843600590f96a31467a5199f827c253f34c110f7 (#10198)
chore(parakeet-cpp): bump pin to banded long-audio attention (843600590)

Update PARAKEET_VERSION to mudler/parakeet.cpp@843600590f
(merge of parakeet.cpp#9). Brings NeMo rel_pos_local_attn banded/Longformer
attention with the chunk-matmul construction: long audio now uses O(T*window)
attention instead of global O(T^2), fixing the encoder OOM on long clips
(~16.6-min clip: 54GB->9.4GB peak, ~4x faster) at NeMo's full [128,128] window.
Short clips are unchanged (global path). No C-ABI change.


Assisted-by: Claude:claude-opus-4-8

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-06 09:25:25 +02:00
LocalAI [bot]
0e4cee9a97 chore: bump LocalAGI + localrecall (fix pgvector hybrid search seqscan, #10186) (#10192)
chore: bump LocalAGI and localrecall (index-backed RRF hybrid search)

Bumps the agent stack to pull in the PostgreSQL hybrid-search fix:

- mudler/localrecall -> v0.6.3-...-9a3b3321a9cd (mudler/LocalRecall#46, merged)
- mudler/LocalAGI    -> ...-14aed1ae4336 (mudler/LocalAGI#477, merged)

localrecall's hybrid search previously sorted on a wrapped scalar
similarity expression, which blinded the planner into a full sequential
scan over every row and exceeded the statement timeout on large
collections, returning an empty result set. It now uses the canonical
Reciprocal Rank Fusion pattern (index-backed candidate retrieval + FULL
OUTER JOIN + weighted RRF).

Fixes #10186

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-06 09:16:59 +02:00
53 changed files with 1809 additions and 302 deletions

View File

@@ -1766,20 +1766,6 @@ include:
dockerfile: "./backend/Dockerfile.llama-cpp"
context: "./"
ubuntu-version: '2404'
- build-type: 'hipblas'
cuda-major-version: ""
cuda-minor-version: ""
platforms: 'linux/amd64'
tag-latest: 'auto'
tag-suffix: '-gpu-rocm-hipblas-turboquant'
builder-base-image: 'quay.io/go-skynet/ci-cache:base-grpc-rocm-amd64'
runs-on: 'ubuntu-latest'
base-image: "rocm/dev-ubuntu-24.04:7.2.1"
skip-drivers: 'false'
backend: "turboquant"
dockerfile: "./backend/Dockerfile.turboquant"
context: "./"
ubuntu-version: '2404'
- build-type: 'hipblas'
cuda-major-version: ""
cuda-minor-version: ""

View File

@@ -1,5 +1,5 @@
IK_LLAMA_VERSION?=1520eda980564241434b791ce2bbbd128c4be9ea
IK_LLAMA_VERSION?=6b9de3dbaa21ae95ea80638e5ee836795cc48c93
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
CMAKE_ARGS?=

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=7c158fbb4aec1bdc9c81d6ca0e785139f4826fae
LLAMA_VERSION?=31e82494c0a3913c919c1027fa70500fbf4c07dd
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -482,23 +482,13 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
if (!request->draftmodel().empty()) {
params.speculative.draft.mparams.path = request->draftmodel();
// Default to draft type if a draft model is set but no explicit type.
// Upstream (post ggml-org/llama.cpp#22838) made the speculative type a
// vector; the turboquant fork still uses the legacy scalar. The
// LOCALAI_LEGACY_LLAMA_CPP_SPEC macro is injected by
// backend/cpp/turboquant/patch-grpc-server.sh for fork builds only.
// Upstream renamed COMMON_SPECULATIVE_TYPE_DRAFT -> ..._DRAFT_SIMPLE
// in ggml-org/llama.cpp#22964; the fork still uses the old name.
#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC
if (params.speculative.type == COMMON_SPECULATIVE_TYPE_NONE) {
params.speculative.type = COMMON_SPECULATIVE_TYPE_DRAFT;
}
#else
// Upstream made the speculative type a vector (ggml-org/llama.cpp#22838)
// and renamed COMMON_SPECULATIVE_TYPE_DRAFT -> ..._DRAFT_SIMPLE (#22964).
const bool no_spec_type = params.speculative.types.empty() ||
(params.speculative.types.size() == 1 && params.speculative.types[0] == COMMON_SPECULATIVE_TYPE_NONE);
if (no_spec_type) {
params.speculative.types = { COMMON_SPECULATIVE_TYPE_DRAFT_SIMPLE };
}
#endif
}
// params.model_alias ??
@@ -574,9 +564,10 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
// tokens (0 disables the minimum). Match upstream's default (256). This
// field was renamed from `checkpoint_every_nt` in llama.cpp; the semantics
// also shifted from a fixed cadence to a minimum spacing. The turboquant
// fork branched before the field existed, so skip it on the legacy path
// (LOCALAI_LEGACY_LLAMA_CPP_SPEC is injected by patch-grpc-server.sh).
#ifndef LOCALAI_LEGACY_LLAMA_CPP_SPEC
// fork still lacks common_params::checkpoint_min_step, so skip it there
// (LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP is injected by
// backend/cpp/turboquant/patch-grpc-server.sh).
#ifndef LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP
params.checkpoint_min_step = 256;
#endif
@@ -752,7 +743,7 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
params.cache_idle_slots = false;
}
#ifndef LOCALAI_LEGACY_LLAMA_CPP_SPEC
#ifndef LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP
// --- minimum context-checkpoint spacing (upstream -cms / --checkpoint-min-step) ---
// 0 disables the minimum-spacing gate. Old option names (`checkpoint_every_nt`,
// `checkpoint_every_n_tokens`) are kept as aliases for backward compatibility
@@ -906,17 +897,6 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
// Speculative decoding options
} else if (!strcmp(optname, "spec_type") || !strcmp(optname, "speculative_type")) {
#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC
// Fork only knows a single scalar `type`. Take the first comma-
// separated value and assign it via the singular helper.
std::string first = optval_str;
const auto comma = first.find(',');
if (comma != std::string::npos) first = first.substr(0, comma);
auto type = common_speculative_type_from_name(first);
if (type != COMMON_SPECULATIVE_TYPE_COUNT) {
params.speculative.type = type;
}
#else
// Upstream switched to a vector of types (comma-separated for multi-type
// chaining via common_speculative_types_from_names). We keep accepting a
// single value here, but also tolerate comma-separated lists.
@@ -945,7 +925,6 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
if (!parsed.empty()) {
params.speculative.types = parsed;
}
#endif
} else if (!strcmp(optname, "spec_n_max") || !strcmp(optname, "draft_max")) {
if (optval != NULL) {
try { params.speculative.draft.n_max = std::stoi(optval_str); } catch (...) {}
@@ -983,21 +962,6 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
// shares the target context size. Accept the option for backward
// compatibility but silently ignore it.
// Everything below relies on struct shape introduced in ggml-org/llama.cpp#22838
// (parallel drafting): `ngram_mod`, `ngram_map_k`, `ngram_map_k4v`,
// `ngram_cache`, and the `draft.{cache_type_*, cpuparams*, tensor_buft_overrides}`
// fields. The turboquant fork branched before that, so its build defines
// LOCALAI_LEGACY_LLAMA_CPP_SPEC via patch-grpc-server.sh and these option
// keys become unrecognized (silently dropped, like any unknown opt) for it.
//
// The `#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC` / `#else` split below sits at the
// closing-brace position of the `draft_ctx_size` branch on purpose: in the
// legacy build the chain ends here (the brace closes draft_ctx_size), and in
// the modern build the chain continues with `} else if (...)` instead, so the
// brace count stays balanced under both branches of the preprocessor.
#ifdef LOCALAI_LEGACY_LLAMA_CPP_SPEC
}
#else
// --- ngram_mod family (upstream --spec-ngram-mod-*) ---
} else if (!strcmp(optname, "spec_ngram_mod_n_min")) {
if (optval != NULL) {
@@ -1127,7 +1091,6 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
}
if (!cur.empty()) flush(cur);
}
#endif // LOCALAI_LEGACY_LLAMA_CPP_SPEC — closes the `else`/`#ifdef` opened at draft_ctx_size
}
// Set params.n_parallel from environment variable if not set via options (fallback)
@@ -1177,15 +1140,11 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
params.tensor_buft_overrides.push_back({nullptr, nullptr});
}
}
// The draft tensor_buft_overrides are only populated under the modern
// (post-#22838) layout, whose population code is itself gated by
// LOCALAI_LEGACY_LLAMA_CPP_SPEC above. The turboquant fork lacks
// common_params_speculative::draft entirely, so skip the sentinel there too.
#ifndef LOCALAI_LEGACY_LLAMA_CPP_SPEC
// Terminate the draft tensor_buft_overrides list with a sentinel, mirroring
// the main-model handling above.
if (!params.speculative.draft.tensor_buft_overrides.empty()) {
params.speculative.draft.tensor_buft_overrides.push_back({nullptr, nullptr});
}
#endif
// TODO: Add yarn

View File

@@ -1,7 +1,7 @@
# Pinned to the HEAD of feature/turboquant-kv-cache on https://github.com/TheTom/llama-cpp-turboquant.
# Auto-bumped nightly by .github/workflows/bump_deps.yaml.
TURBOQUANT_VERSION?=5aeb2fdbe26cd4c534c6fa15de73cb5749bd0403
TURBOQUANT_VERSION?=7d9715f1f071fa07c7b2ad3dbfd320b314139e65
LLAMA_REPO?=https://github.com/TheTom/llama-cpp-turboquant
CMAKE_ARGS?=

View File

@@ -4,21 +4,19 @@
#
# 1. Augment the kv_cache_types[] allow-list so `LoadModel` accepts the
# fork-specific `turbo2` / `turbo3` / `turbo4` cache types.
# 2. Replace `get_media_marker()` (added upstream in ggml-org/llama.cpp#21962,
# server-side random per-instance marker) with the legacy "<__media__>"
# literal. The fork branched before that PR, so server-common.cpp has no
# get_media_marker symbol. The fork's mtmd_default_marker() still returns
# "<__media__>", and Go-side tooling falls back to that sentinel when the
# backend does not expose media_marker, so substituting the literal keeps
# behavior identical on the turboquant path.
# 3. Revert the `common_params_speculative` field references to the
# pre-refactor flat layout. Upstream ggml-org/llama.cpp#22397 split the
# struct into nested `draft` / `ngram_simple` / `ngram_mod` / etc. members;
# the turboquant fork branched before that PR and still exposes the flat
# `n_max`, `mparams_dft`, `ngram_size_n`, ... fields. The substitutions
# below map the new nested paths back to the legacy flat names so the
# shared grpc-server.cpp keeps compiling against the fork's common.h.
# Drop this block once the fork rebases past #22397.
# 2. Define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP at the top of the file
# so the grpc-server option parser skips the two references to
# common_params::checkpoint_min_step (the default and the option handler).
# That field does not exist in the fork yet; drop this once it does.
#
# The fork used to lag upstream on the whole common_params_speculative refactor
# (ggml-org/llama.cpp#22397/#22838/#22964), the model_tgt rename (#22838) and
# get_media_marker (#21962), which required a much larger compat shim here
# (flat-field sed renames + a coarse LOCALAI_LEGACY_LLAMA_CPP_SPEC define). The
# fork has since rebased past all of those, so the only remaining gap is
# checkpoint_min_step. If a future bump reintroduces a divergence, add a narrow
# guard in grpc-server.cpp keyed on a fork-specific macro and inject it here
# rather than resurrecting the coarse one.
#
# We patch the *copy* sitting in turboquant-<flavor>-build/, never the original
# under backend/cpp/llama-cpp/, so the stock llama-cpp build keeps compiling
@@ -72,72 +70,20 @@ else
echo "==> KV allow-list patch OK"
fi
if grep -q 'get_media_marker()' "$SRC"; then
echo "==> patching $SRC to replace get_media_marker() with legacy \"<__media__>\" literal"
# Only one call site today (ModelMetadata), but replace all occurrences to
# stay robust if upstream adds more. Use a temp file to avoid relying on
# sed -i portability (the builder image uses GNU sed, but keeping this
# consistent with the awk block above).
sed 's/get_media_marker()/"<__media__>"/g' "$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> get_media_marker() substitution OK"
# 2. Define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP at the top of the file so
# the grpc-server option parser skips the two references to
# common_params::checkpoint_min_step (the default assignment and the option
# handler). That field does not exist in the fork yet. Drop this block once
# the fork rebases past the bump that added checkpoint_min_step.
if grep -q '^#define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP' "$SRC"; then
echo "==> $SRC already defines LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP, skipping"
else
echo "==> $SRC has no get_media_marker() call, skipping media-marker patch"
fi
if grep -q 'params\.speculative\.draft\.\|params\.speculative\.ngram_simple\.' "$SRC"; then
echo "==> patching $SRC to revert common_params_speculative refs to pre-#22397 flat layout"
# Each substitution is the exact post-refactor path → legacy flat field.
# Order doesn't matter because the source paths are disjoint, but we keep
# the most-specific (mparams.path) first for readability.
sed -E \
-e 's/params\.speculative\.draft\.mparams\.path/params.speculative.mparams_dft.path/g' \
-e 's/params\.speculative\.draft\.n_max/params.speculative.n_max/g' \
-e 's/params\.speculative\.draft\.n_min/params.speculative.n_min/g' \
-e 's/params\.speculative\.draft\.p_min/params.speculative.p_min/g' \
-e 's/params\.speculative\.draft\.p_split/params.speculative.p_split/g' \
-e 's/params\.speculative\.draft\.n_gpu_layers/params.speculative.n_gpu_layers/g' \
-e 's/params\.speculative\.draft\.n_ctx/params.speculative.n_ctx/g' \
-e 's/params\.speculative\.ngram_simple\.size_n/params.speculative.ngram_size_n/g' \
-e 's/params\.speculative\.ngram_simple\.size_m/params.speculative.ngram_size_m/g' \
-e 's/params\.speculative\.ngram_simple\.min_hits/params.speculative.ngram_min_hits/g' \
"$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> speculative field rename OK"
else
echo "==> $SRC has no post-#22397 speculative field refs, skipping spec rename patch"
fi
# 4. Revert the `ctx_server.impl->model_tgt` rename introduced by upstream
# ggml-org/llama.cpp#22838 (parallel drafting). The turboquant fork still
# exposes the field as `model` on `server_context_impl`. The two call sites
# are in the Rerank and ModelMetadata RPC handlers.
if grep -q 'ctx_server\.impl->model_tgt' "$SRC"; then
echo "==> patching $SRC to revert ctx_server.impl->model_tgt -> ctx_server.impl->model"
sed -E 's/ctx_server\.impl->model_tgt/ctx_server.impl->model/g' "$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> model_tgt rename OK"
else
echo "==> $SRC has no ctx_server.impl->model_tgt refs, skipping model_tgt rename patch"
fi
# 5. Define LOCALAI_LEGACY_LLAMA_CPP_SPEC at the top of the file so the
# grpc-server option parser skips the new option-handler blocks (ngram_mod,
# ngram_map_k, ngram_map_k4v, ngram_cache, draft.cache_type_*, draft.cpuparams*,
# draft.tensor_buft_overrides) introduced for the post-#22838 layout, the
# draft.tensor_buft_overrides sentinel termination, and the
# common_params::checkpoint_min_step default/option (added with the
# 35c9b1f3 bump). Those blocks reference struct fields that simply do not
# exist in the fork.
if grep -q '^#define LOCALAI_LEGACY_LLAMA_CPP_SPEC' "$SRC"; then
echo "==> $SRC already defines LOCALAI_LEGACY_LLAMA_CPP_SPEC, skipping"
else
echo "==> patching $SRC to define LOCALAI_LEGACY_LLAMA_CPP_SPEC at the top"
# Insert the define before the very first `#include` so it precedes all the
# speculative-decoding code paths.
echo "==> patching $SRC to define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP at the top"
# Insert the define before the very first `#include` so it precedes the
# checkpoint_min_step references.
awk '
!done && /^#include/ {
print "#define LOCALAI_LEGACY_LLAMA_CPP_SPEC 1"
print "#define LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP 1"
print "// ^ injected by backend/cpp/turboquant/patch-grpc-server.sh"
print ""
done = 1
@@ -145,13 +91,13 @@ else
{ print }
END {
if (!done) {
print "patch-grpc-server.sh: no #include anchor found to insert LOCALAI_LEGACY_LLAMA_CPP_SPEC" > "/dev/stderr"
print "patch-grpc-server.sh: no #include anchor found to insert LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP" > "/dev/stderr"
exit 1
}
}
' "$SRC" > "$SRC.tmp"
mv "$SRC.tmp" "$SRC"
echo "==> LOCALAI_LEGACY_LLAMA_CPP_SPEC define OK"
echo "==> LOCALAI_TURBOQUANT_NO_CHECKPOINT_MIN_STEP define OK"
fi
echo "==> all patches applied"

View File

@@ -0,0 +1,55 @@
hip: port the turboquant CUDA additions that ggml's HIP shim doesn't cover
The turboquant fork adds/modifies a few ggml-cuda.cu spots with CUDA APIs
that ggml's HIP (and MUSA) compatibility layer does not provide, breaking
the -gpu-rocm-hipblas-turboquant build:
1. ggml_cuda_copy2d_across_devices() (host-staged cross-device copy for
split mul_mat output) uses the CUDA 3D-peer copy APIs
cudaMemcpy3DPeerParms / make_cudaPitchedPtr / make_cudaExtent /
cudaMemcpy3DPeerAsync. HIP genuinely does not support these (see the
fork's own comment "HIP does not support cudaMemcpy3DPeerAsync"), so
guard the peer fast path with #if !defined(GGML_USE_HIP) &&
!defined(GGML_USE_MUSA) -- matching how the fork already guards the
same API for the sibling 2D copy -- and fall through to the existing
cudaMemcpyAsync staging fallback below (functionally identical,
slightly slower on multi-GPU ROCm).
2. ggml_backend_cuda_device_event_new() creates its event with plain
cudaEventCreate, which ggml's HIP shim does not alias (it only aliases
cudaEventCreateWithFlags). Use cudaEventCreateWithFlags(...,
cudaEventDisableTiming) -- exactly what the rest of this file already
does (cf. lines ~1034, ~3461) and HIP-safe.
CUDA builds are unaffected. Drop the relevant hunk once the fork HIP-ports
these; apply-patches.sh fails fast if an anchor goes stale.
diff --git a/ggml/src/ggml-cuda/ggml-cuda.cu b/ggml/src/ggml-cuda/ggml-cuda.cu
index 0427e6b..6352e6a 100644
--- a/ggml/src/ggml-cuda/ggml-cuda.cu
+++ b/ggml/src/ggml-cuda/ggml-cuda.cu
@@ -1933,6 +1933,7 @@ static cudaError_t ggml_cuda_copy2d_across_devices(
size_t width, size_t height, cudaStream_t dst_stream, cudaStream_t src_stream) {
const auto & info = ggml_cuda_info();
+#if !defined(GGML_USE_HIP) && !defined(GGML_USE_MUSA) // 3D-peer copy types unmapped by ggml's HIP/MUSA shim; use staging fallback below
if (info.peer_access[src_device][dst_device]) {
cudaMemcpy3DPeerParms p = {};
p.dstDevice = dst_device;
@@ -1942,6 +1943,7 @@ static cudaError_t ggml_cuda_copy2d_across_devices(
p.extent = make_cudaExtent(width, height, 1);
return cudaMemcpy3DPeerAsync(&p, dst_stream);
}
+#endif // !defined(GGML_USE_HIP) && !defined(GGML_USE_MUSA)
// Fallback: stage all rows through a single contiguous pinned buffer
int prev_device = ggml_cuda_get_device();
@@ -5714,7 +5716,7 @@ static ggml_backend_event_t ggml_backend_cuda_device_event_new(ggml_backend_dev_
ggml_cuda_set_device(dev_ctx->device);
cudaEvent_t event;
- CUDA_CHECK(cudaEventCreate(&event));
+ CUDA_CHECK(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
return new ggml_backend_event {
/* .device = */ dev,

View File

@@ -1,6 +1,6 @@
# parakeet-cpp backend Makefile.
#
# Upstream pin lives below as PARAKEET_VERSION?=b11fe5bca78ad8b342dd559a43d76df3984bb447
# Upstream pin lives below as PARAKEET_VERSION?=e270af73b94c9a5c37ec516230219ed4580e1db6
# (.github/bump_deps.sh) can find and update it - matches the
# whisper.cpp / ds4 / vibevoice-cpp convention.
#
@@ -15,7 +15,7 @@
# That's what the L0 smoke test uses. The default target below does the
# proper clone-at-pin + cmake build so CI doesn't need a side-checkout.
PARAKEET_VERSION?=b11fe5bca78ad8b342dd559a43d76df3984bb447
PARAKEET_VERSION?=e270af73b94c9a5c37ec516230219ed4580e1db6
PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp
GOCMD?=go

View File

@@ -7,8 +7,12 @@ import "time"
type batchRequest struct {
pcm []float32
decoder int32
tag string
reply chan batchReply
// language is the per-request target locale ("" means the model default).
// parakeet.cpp's batched C-API takes ONE target_lang for the whole batch,
// so the dispatcher only coalesces requests that share a language.
language string
tag string
reply chan batchReply
}
// batchReply carries one per-item JSON object string (an element of the C-API's
@@ -43,13 +47,25 @@ func newBatcher(maxSize int, maxWait time.Duration, runBatch func([]*batchReques
// run is the dispatcher loop: accumulate submitted requests until either maxSize
// is reached or maxWait elapses since the first queued request, then dispatch.
// Exits when stop is closed (draining any partially-filled batch first).
//
// A batch carries ONE language (parakeet.cpp's batched C-API takes a single
// target_lang), so a request whose language differs from the batch leader is
// not coalesced: it is held in carry and becomes the leader of the next batch.
// carry is therefore never dropped and its caller never deadlocks: every batch
// (including a lone carry on stop) is dispatched, and runBatch replies to all.
func (b *batcher) run(stop <-chan struct{}) {
var carry *batchRequest
for {
var first *batchRequest
select {
case first = <-b.submit:
case <-stop:
return
if carry != nil {
// A mismatched request from the previous fill leads this batch.
first, carry = carry, nil
} else {
select {
case first = <-b.submit:
case <-stop:
return
}
}
batch := []*batchRequest{first}
@@ -64,12 +80,22 @@ func (b *batcher) run(stop <-chan struct{}) {
for len(batch) < b.maxSize {
select {
case r := <-b.submit:
if r.language != first.language {
// Different language: carry it to the next batch so this
// batch stays single-language, then dispatch what we have.
carry = r
break fill
}
batch = append(batch, r)
case <-timer.C:
break fill
case <-stop:
timer.Stop()
b.runBatch(batch)
// Don't strand a carried request's caller on shutdown.
if carry != nil {
b.runBatch([]*batchRequest{carry})
}
return
}
}

View File

@@ -105,4 +105,60 @@ var _ = Describe("batcher", func() {
go func() { <-rep }()
Eventually(dispatched, "2s").Should(Receive(Equal(1)))
})
It("never coalesces requests with different languages into one batch", func() {
// parakeet.cpp's batched C-API takes ONE target_lang per batch, so the
// dispatcher must keep every dispatched batch single-language. Submit a
// mix of languages and assert (a) no batch ever carries more than one
// distinct language and (b) every submitted request still gets a reply
// (the mismatched carry-over is never dropped).
var mu sync.Mutex
var langsPerBatch [][]string
run := func(reqs []*batchRequest) {
seen := map[string]struct{}{}
var distinct []string
for _, r := range reqs {
if _, ok := seen[r.language]; !ok {
seen[r.language] = struct{}{}
distinct = append(distinct, r.language)
}
}
mu.Lock()
langsPerBatch = append(langsPerBatch, distinct)
mu.Unlock()
echoReply(reqs)
}
// Large window + size so the fill loop stays open across submits and the
// language constraint (not the timer) is what splits the batches.
b := newBatcher(16, 200*time.Millisecond, run)
stop := make(chan struct{})
go b.run(stop)
defer close(stop)
langs := []string{"en", "en", "de", "de", "en", "fr", "fr"}
const N = 7
var wg sync.WaitGroup
got := make([]string, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
rep := make(chan batchReply, 1)
b.submit <- &batchRequest{tag: string(rune('a' + i)), language: langs[i], reply: rep}
got[i] = (<-rep).json
}(i)
}
wg.Wait()
mu.Lock()
defer mu.Unlock()
// Invariant: every dispatched batch is single-language.
for _, distinct := range langsPerBatch {
Expect(len(distinct)).To(Equal(1), "a batch coalesced more than one language: %v", distinct)
}
// Liveness: every request got a reply (carry-over never stranded).
for i := 0; i < N; i++ {
Expect(got[i]).To(Equal(string(rune('a' + i))))
}
})
})

View File

@@ -48,6 +48,13 @@ var (
// side reads them as const float*/const int*.
CppTranscribePcmBatchJSON func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32) uintptr
// CppTranscribePcmBatchJSONLang is the multilingual variant of the batched
// JSON entry point: identical, plus a trailing target_lang. "" (the model
// default, "auto") is passed for non-prompt models, which ignore it; an
// unknown locale on a prompt model returns 0 and sets last_error. Present
// only in newer libparakeet.so; nil falls back to CppTranscribePcmBatchJSON.
CppTranscribePcmBatchJSONLang func(ctx uintptr, samplesConcat []float32, nSamples []int32, nClips int32, sampleRate int32, decoder int32, targetLang string) uintptr
// Cache-aware streaming (RNN-T) entry points. stream_begin returns 0 for
// non-streaming models. feed/finalize return a malloc'd char* (uintptr,
// freed via CppFreeString); feed writes 1 to *eouOut on an <EOU>/<EOB>.
@@ -55,6 +62,18 @@ var (
CppStreamFeed func(s uintptr, pcm []float32, nSamples int32, eouOut unsafe.Pointer) uintptr
CppStreamFinalize func(s uintptr) uintptr
CppStreamFree func(s uintptr)
// CppStreamBeginLang is the multilingual variant of stream_begin: identical,
// plus a trailing target_lang ("" means the model default). Present only in
// newer libparakeet.so; nil falls back to CppStreamBegin.
CppStreamBeginLang func(ctx uintptr, targetLang string) uintptr
// Streaming JSON variants (ABI v4): feed/finalize returning a malloc'd char*
// JSON document {text,eou,frame_sec,words} (uintptr, freed via CppFreeString)
// so streaming segments can carry per-word timestamps. Present only in newer
// libparakeet.so; nil falls back to the text-only CppStreamFeed/Finalize path.
CppStreamFeedJSON func(s uintptr, pcm []float32, nSamples int32) uintptr
CppStreamFinalizeJSON func(s uintptr) uintptr
)
// streamChunkSamples is how much 16 kHz mono PCM we hand to stream_feed per
@@ -72,9 +91,26 @@ const streamChunkSamples = 16000
//
// "start"/"end"/"t" are seconds; "conf" is confidence in (0,1].
type transcriptJSON struct {
Text string `json:"text"`
Words []transcriptWord `json:"words"`
Tokens []transcriptToken `json:"tokens"`
Text string `json:"text"`
FrameSec float64 `json:"frame_sec"`
Words []transcriptWord `json:"words"`
Tokens []transcriptToken `json:"tokens"`
}
// streamFeedJSON mirrors the document returned by
// parakeet_capi_stream_feed_json / parakeet_capi_stream_finalize_json (ABI v4):
//
// {"text":"...","eou":0,"frame_sec":0.080000,
// "words":[{"w":"...","start":0.480,"end":0.640,"conf":0.9100}, ...]}
//
// "text" is the newly-finalized text since the last call; "eou" is 1 when an
// <EOU>/<EOB> fired this feed; "words" are the words finalized this call with
// absolute (stream-relative) start/end seconds.
type streamFeedJSON struct {
Text string `json:"text"`
Eou int `json:"eou"`
FrameSec float64 `json:"frame_sec"`
Words []transcriptWord `json:"words"`
}
type transcriptWord struct {
@@ -103,6 +139,10 @@ type ParakeetCpp struct {
engineMu sync.Mutex // sole guard of the one C engine (dispatcher + streaming)
bat *batcher
batStop chan struct{}
// segmentGapFrames is NeMo's segment_gap_threshold in ENCODER FRAMES (model
// YAML option, default 0=off). When >0 it adds NeMo's silence-gap split on
// top of the punctuation split; converted to seconds via the JSON frame_sec.
segmentGapFrames int
}
// Load is the LocalAI gRPC entry point for LoadModel: it calls
@@ -132,6 +172,11 @@ func (p *ParakeetCpp) Load(opts *pb.ModelOptions) error {
if maxWaitMs < 0 {
maxWaitMs = 0
}
// NeMo's segment_gap_threshold (encoder frames, default 0=off). Off by
// default matches NeMo's default (punctuation-only segments); when set it
// additionally splits segments on inter-word silence (see transcriptResultFromDoc).
p.segmentGapFrames = optInt(opts, "segment_gap_threshold", 0)
if CppTranscribePcmBatchJSON != nil {
p.batStop = make(chan struct{})
p.bat = newBatcher(maxSize, time.Duration(maxWaitMs)*time.Millisecond, p.runBatch)
@@ -187,8 +232,19 @@ func (p *ParakeetCpp) runBatch(reqs []*batchRequest) {
if len(reqs) > 0 {
dec = reqs[0].decoder
}
// All requests in a batch share one language (the batcher coalesces only
// same-language requests), so any element's language describes the batch.
lang := ""
if len(reqs) > 0 {
lang = reqs[0].language
}
p.engineMu.Lock()
cstr := CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec)
var cstr uintptr
if CppTranscribePcmBatchJSONLang != nil {
cstr = CppTranscribePcmBatchJSONLang(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec, lang)
} else {
cstr = CppTranscribePcmBatchJSON(p.ctxPtr, concat, nSamples, int32(len(reqs)), 16000, dec)
}
p.engineMu.Unlock()
if cstr == 0 {
err := fmt.Errorf("parakeet-cpp: batch transcribe failed: %s", CppLastError(p.ctxPtr))
@@ -226,8 +282,9 @@ func (p *ParakeetCpp) runBatch(reqs []*batchRequest) {
// OpenAI API, whose default is segment-level); token ids always populate
// Segment.Tokens.
//
// translate/diarize/prompt/temperature/language/threads are not applicable to
// parakeet and are ignored; streaming is handled by AudioTranscriptionStream
// translate/diarize/prompt/temperature/threads are not applicable to parakeet
// and are ignored; language is honored on the batched + streaming paths (see
// opts.GetLanguage() below); streaming is handled by AudioTranscriptionStream
// (L2).
func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.TranscriptRequest) (pb.TranscriptResult, error) {
if p.ctxPtr == 0 {
@@ -259,7 +316,7 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts), nil
return transcriptResultFromDoc(doc, opts, p.segmentGapFrames), nil
}
// Batched path: decode to PCM, submit to the batcher, wait for this request's
@@ -271,7 +328,7 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip
}
rep := make(chan batchReply, 1)
select {
case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, reply: rep}:
case p.bat.submit <- &batchRequest{pcm: pcm, decoder: 0, language: opts.GetLanguage(), reply: rep}:
case <-ctx.Done():
return pb.TranscriptResult{}, status.Error(codes.Canceled, "transcription cancelled")
}
@@ -288,34 +345,169 @@ func (p *ParakeetCpp) AudioTranscription(ctx context.Context, opts *pb.Transcrip
if err := json.Unmarshal([]byte(res.json), &doc); err != nil {
return pb.TranscriptResult{}, fmt.Errorf("parakeet-cpp: decode transcript json: %w", err)
}
return transcriptResultFromDoc(doc, opts), nil
return transcriptResultFromDoc(doc, opts, p.segmentGapFrames), nil
}
// segmentSeparators is NeMo's default segment_seperators (sentence-ending
// punctuation). Splitting on these matches NeMo's default segment timestamps.
var segmentSeparators = []rune{'.', '?', '!'}
// transcriptResultFromDoc maps a decoded transcriptJSON to a TranscriptResult,
// synthesising a single whole-clip segment and attaching word timings only when
// the caller requested word granularity. Shared by the batched and direct paths.
func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest) pb.TranscriptResult {
// grouping words into NeMo-faithful segments (see splitWordsIntoSegments). The
// optional gapFrames (NeMo's segment_gap_threshold, in encoder FRAMES; 0=off)
// additionally splits on inter-word silence; it is converted to a seconds gap
// with the document's frame_sec. Per-segment word timings are attached only when
// the caller requested word granularity; token ids populate each segment's
// Tokens by time-window membership. Shared by the batched and direct paths.
func transcriptResultFromDoc(doc transcriptJSON, opts *pb.TranscriptRequest, gapFrames int) pb.TranscriptResult {
text := strings.TrimSpace(doc.Text)
words := make([]*pb.TranscriptWord, 0, len(doc.Words))
for _, w := range doc.Words {
words = append(words, &pb.TranscriptWord{Start: secondsToNanos(w.Start), End: secondsToNanos(w.End), Text: w.W})
// Frame-unit gap threshold -> seconds (NeMo segment_gap_threshold). 0 = off.
gapSeconds := 0.0
if gapFrames > 0 {
if doc.FrameSec > 0 {
gapSeconds = float64(gapFrames) * doc.FrameSec
} else {
xlog.Warn("parakeet-cpp: segment_gap_threshold set but libparakeet.so " +
"did not report frame_sec; falling back to punctuation-only segments")
}
}
tokens := make([]int32, 0, len(doc.Tokens))
for _, t := range doc.Tokens {
tokens = append(tokens, t.ID)
groups := splitWordsIntoSegments(doc.Words, segmentSeparators, gapSeconds)
if len(groups) == 0 {
// No words (edge case): single whole-clip text segment.
return pb.TranscriptResult{
Text: text,
Segments: []*pb.TranscriptSegment{{Id: 0, Text: text}},
}
}
var segStart, segEnd int64
if len(words) > 0 {
segStart = words[0].Start
segEnd = words[len(words)-1].End
wantWords := wordsRequested(opts.TimestampGranularities)
segments := make([]*pb.TranscriptSegment, 0, len(groups))
for id, group := range groups {
parts := make([]string, len(group))
for i, gw := range group {
parts[i] = gw.W
}
seg := &pb.TranscriptSegment{
Id: int32(id),
Start: secondsToNanos(group[0].Start),
End: secondsToNanos(group[len(group)-1].End),
Text: strings.TrimSpace(strings.Join(parts, " ")),
Tokens: tokensInWindow(doc.Tokens, group[0].Start, group[len(group)-1].End),
}
if wantWords {
ws := make([]*pb.TranscriptWord, len(group))
for i, gw := range group {
ws[i] = &pb.TranscriptWord{Start: secondsToNanos(gw.Start), End: secondsToNanos(gw.End), Text: gw.W}
}
seg.Words = ws
}
segments = append(segments, seg)
}
seg := &pb.TranscriptSegment{Id: 0, Start: segStart, End: segEnd, Text: text, Tokens: tokens}
if wordsRequested(opts.TimestampGranularities) {
seg.Words = words
}
return pb.TranscriptResult{Text: text, Segments: []*pb.TranscriptSegment{seg}}
return pb.TranscriptResult{Text: text, Segments: segments}
}
// splitWordsIntoSegments groups words into segments exactly as NeMo's
// get_segment_offsets does (nemo/collections/asr/parts/utils/timestamp_utils.py).
// Walking the words, it closes a segment when (1) the gap rule is enabled
// (gapSeconds > 0) and the segment already has words and the gap from the
// previous word's end to this word's start is >= gapSeconds - the current word
// then STARTS a new segment - or, checked only when the gap rule did not apply
// (NeMo's elif), (2) the word ends with (or is) a separator, which closes the
// segment INCLUDING that word. Trailing words flush into a final segment.
// gapSeconds <= 0 disables the gap rule, matching NeMo's default
// segment_gap_threshold=None (punctuation-only segments).
func splitWordsIntoSegments(words []transcriptWord, separators []rune, gapSeconds float64) [][]transcriptWord {
var segments [][]transcriptWord
var cur []transcriptWord
for i, word := range words {
gapActive := gapSeconds > 0 && len(cur) > 0
if gapActive && (word.Start-words[i-1].End) >= gapSeconds {
segments = append(segments, cur)
cur = []transcriptWord{word}
continue
}
if !gapActive && endsWithSeparator(word.W, separators) {
cur = append(cur, word)
segments = append(segments, cur)
cur = nil
continue
}
cur = append(cur, word)
}
if len(cur) > 0 {
segments = append(segments, cur)
}
return segments
}
// endsWithSeparator reports whether w's last rune is in separators (matching
// NeMo's `word[-1] in delims or word in delims`).
func endsWithSeparator(w string, separators []rune) bool {
r := []rune(strings.TrimSpace(w))
if len(r) == 0 {
return false
}
last := r[len(r)-1]
for _, s := range separators {
if last == s {
return true
}
}
return false
}
// tokensInWindow returns the ids of tokens whose timestamp t falls in
// [start, end] (inclusive), assigning each token to the segment that spans its
// time. The last segment's end is the last word end, so the final token is
// included.
func tokensInWindow(tokens []transcriptToken, start, end float64) []int32 {
var ids []int32
for _, t := range tokens {
if t.T >= start && t.T <= end {
ids = append(ids, t.ID)
}
}
return ids
}
// streamSegmenter accumulates streaming words into per-utterance segments. EOU
// is the model's own utterance boundary; each closed segment takes its start/end
// from its first/last accumulated word.
type streamSegmenter struct {
segs []*pb.TranscriptSegment
cur []transcriptWord
nextID int32
}
func (s *streamSegmenter) add(doc streamFeedJSON) {
s.cur = append(s.cur, doc.Words...)
if doc.Eou != 0 {
s.flush()
}
}
func (s *streamSegmenter) flush() {
if len(s.cur) == 0 {
return
}
parts := make([]string, len(s.cur))
for i, w := range s.cur {
parts[i] = w.W
}
s.segs = append(s.segs, &pb.TranscriptSegment{
Id: s.nextID,
Start: secondsToNanos(s.cur[0].Start),
End: secondsToNanos(s.cur[len(s.cur)-1].End),
Text: strings.TrimSpace(strings.Join(parts, " ")),
})
s.nextID++
s.cur = nil
}
func (s *streamSegmenter) segments() []*pb.TranscriptSegment { return s.segs }
// wordsRequested reports whether the caller asked for word-level timestamps.
// The OpenAI transcription API gates word timings behind
// timestamp_granularities[] containing "word" and defaults to segment-level
@@ -361,7 +553,12 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
return status.Error(codes.Canceled, "transcription cancelled")
}
stream := CppStreamBegin(p.ctxPtr)
var stream uintptr
if CppStreamBeginLang != nil {
stream = CppStreamBeginLang(p.ctxPtr, opts.GetLanguage())
} else {
stream = CppStreamBegin(p.ctxPtr)
}
if stream == 0 {
// Not a cache-aware streaming model: run a normal offline
// transcription and emit it as one delta + a closing final result.
@@ -390,6 +587,14 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
return err
}
// ABI v4: when the streaming JSON entry points are present, drive them so the
// per-utterance segments carry per-word start/end timestamps. Falls through to
// the text-only loop below against an older libparakeet.so. Runs under the
// engineMu already held above.
if CppStreamFeedJSON != nil {
return p.streamJSON(ctx, stream, data, duration, results)
}
var (
full strings.Builder
segText strings.Builder
@@ -466,6 +671,71 @@ func (p *ParakeetCpp) AudioTranscriptionStream(ctx context.Context, opts *pb.Tra
return nil
}
// streamJSON drives the ABI v4 streaming JSON entry points: each feed/finalize
// returns a {text,eou,frame_sec,words} document. The newly-finalized text is
// emitted as a delta (unchanged streaming contract) while words are accumulated
// into per-utterance segments (closed on EOU) so the closing FinalResult carries
// timestamped segments. Runs under engineMu (already held by the caller).
func (p *ParakeetCpp) streamJSON(ctx context.Context, stream uintptr, data []float32,
duration float32, results chan *pb.TranscriptStreamResponse) error {
var (
full strings.Builder
seg streamSegmenter
)
// consume frees the malloc'd char* (a 0 return is an error), parses the JSON,
// emits the delta, and routes words through the segmenter.
consume := func(ret uintptr) error {
if ret == 0 {
msg := CppLastError(p.ctxPtr)
if msg == "" {
msg = "unknown error"
}
return fmt.Errorf("parakeet-cpp: stream feed/finalize failed: %s", msg)
}
raw := goStringFromCPtr(ret)
CppFreeString(ret)
var doc streamFeedJSON
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return fmt.Errorf("parakeet-cpp: decode stream json: %w", err)
}
if doc.Text != "" {
full.WriteString(doc.Text)
results <- &pb.TranscriptStreamResponse{Delta: doc.Text}
}
seg.add(doc)
return nil
}
for off := 0; off < len(data); off += streamChunkSamples {
if err := ctx.Err(); err != nil {
return status.Error(codes.Canceled, "transcription cancelled")
}
end := min(off+streamChunkSamples, len(data))
chunk := data[off:end]
if err := consume(CppStreamFeedJSON(stream, chunk, int32(len(chunk)))); err != nil {
return err
}
}
if err := consume(CppStreamFinalizeJSON(stream)); err != nil {
return err
}
seg.flush() // close any trailing utterance that never saw an EOU
text := strings.TrimSpace(full.String())
segments := seg.segments()
if len(segments) == 0 && text != "" {
segments = append(segments, &pb.TranscriptSegment{Id: 0, Text: text})
}
results <- &pb.TranscriptStreamResponse{
FinalResult: &pb.TranscriptResult{
Text: text,
Segments: segments,
Duration: duration,
},
}
return nil
}
// decodeWavMono16k converts any input audio to 16 kHz mono PCM and returns the
// float samples plus the clip duration in seconds. Mirrors the whisper
// backend: utils.AudioToWav (ffmpeg) normalises rate/channels, go-audio

View File

@@ -53,6 +53,10 @@ func ensureLibLoaded() {
purego.RegisterLibFunc(&CppStreamFeed, lib, "parakeet_capi_stream_feed")
purego.RegisterLibFunc(&CppStreamFinalize, lib, "parakeet_capi_stream_finalize")
purego.RegisterLibFunc(&CppStreamFree, lib, "parakeet_capi_stream_free")
if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_feed_json"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppStreamFeedJSON, lib, "parakeet_capi_stream_feed_json")
purego.RegisterLibFunc(&CppStreamFinalizeJSON, lib, "parakeet_capi_stream_finalize_json")
}
purego.RegisterLibFunc(&CppFreeString, lib, "parakeet_capi_free_string")
purego.RegisterLibFunc(&CppLastError, lib, "parakeet_capi_last_error")
})
@@ -107,13 +111,22 @@ var _ = Describe("ParakeetCpp", func() {
Expect(err).ToNot(HaveOccurred())
Expect(strings.TrimSpace(res.Text)).ToNot(BeEmpty(),
"expected non-empty transcript for %s", audioPath)
Expect(res.Segments).To(HaveLen(1),
"synthesises a single whole-clip segment")
Expect(res.Segments[0].Text).To(Equal(res.Text),
"single segment text must equal the top-level text")
// Default (no granularities) is segment-level: no per-word timings.
Expect(res.Segments[0].Words).To(BeEmpty(),
"word timings are opt-in via timestamp_granularities")
// NeMo-faithful segmentation: one or more punctuation-delimited
// segments, each with text and a monotonically-advancing time span.
Expect(res.Segments).ToNot(BeEmpty(), "expected at least one segment")
var prevEnd int64
for i, seg := range res.Segments {
Expect(strings.TrimSpace(seg.Text)).ToNot(BeEmpty(),
"segment %d must have text", i)
Expect(seg.End).To(BeNumerically(">=", seg.Start),
"segment %d end must not precede its start", i)
Expect(seg.Start).To(BeNumerically(">=", prevEnd),
"segments must be in time order")
prevEnd = seg.End
// Default (no granularities) is segment-level: no per-word timings.
Expect(seg.Words).To(BeEmpty(),
"word timings are opt-in via timestamp_granularities")
}
})
It("emits word-level timestamps when granularity=word", func() {
@@ -129,15 +142,28 @@ var _ = Describe("ParakeetCpp", func() {
TimestampGranularities: []string{"word"},
})
Expect(err).ToNot(HaveOccurred())
Expect(res.Segments).To(HaveLen(1))
seg := res.Segments[0]
Expect(seg.Words).ToNot(BeEmpty(),
"expected per-word timestamps with granularity=word")
// Monotonic, non-negative timings spanning the segment.
Expect(seg.Words[0].Start).To(BeNumerically(">=", int64(0)))
Expect(seg.End).To(BeNumerically(">=", seg.Start))
Expect(seg.Words[len(seg.Words)-1].End).To(Equal(seg.End),
"segment end tracks the last word")
Expect(res.Segments).ToNot(BeEmpty())
// With word granularity every segment carries its own words, and each
// segment's span tracks its first/last word; word starts advance
// monotonically across the whole transcript.
totalWords := 0
var prevStart int64 = -1
for i, seg := range res.Segments {
Expect(seg.Words).ToNot(BeEmpty(),
"segment %d must carry per-word timestamps with granularity=word", i)
Expect(seg.Start).To(Equal(seg.Words[0].Start),
"segment %d start tracks its first word", i)
Expect(seg.End).To(Equal(seg.Words[len(seg.Words)-1].End),
"segment %d end tracks its last word", i)
for _, w := range seg.Words {
Expect(w.End).To(BeNumerically(">=", w.Start))
Expect(w.Start).To(BeNumerically(">=", prevStart))
prevStart = w.Start
totalWords++
}
}
Expect(totalWords).To(BeNumerically(">", 0))
Expect(res.Segments[0].Words[0].Start).To(BeNumerically(">=", int64(0)))
})
})

View File

@@ -65,6 +65,25 @@ func main() {
purego.RegisterLibFunc(&CppTranscribePcmBatchJSON, lib, "parakeet_capi_transcribe_pcm_batch_json")
}
// Per-request language variants (multilingual nemotron). Same probe pattern:
// present only in libparakeet.so built with multilingual support, so the
// backend still loads against an older library and falls back to the
// non-lang batched + streaming entry points (model default / "auto").
if sym, err := purego.Dlsym(lib, "parakeet_capi_transcribe_pcm_batch_json_lang"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppTranscribePcmBatchJSONLang, lib, "parakeet_capi_transcribe_pcm_batch_json_lang")
}
if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_begin_lang"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppStreamBeginLang, lib, "parakeet_capi_stream_begin_lang")
}
// Streaming JSON entry points (ABI v4): surface per-word timestamps on the
// streaming path. Same probe pattern; absent in older libparakeet.so, where
// the backend falls back to the text-only streaming feed.
if sym, err := purego.Dlsym(lib, "parakeet_capi_stream_feed_json"); err == nil && sym != 0 {
purego.RegisterLibFunc(&CppStreamFeedJSON, lib, "parakeet_capi_stream_feed_json")
purego.RegisterLibFunc(&CppStreamFinalizeJSON, lib, "parakeet_capi_stream_finalize_json")
}
fmt.Fprintf(os.Stderr, "[parakeet-cpp] ABI=%d\n", CppAbiVersion())
flag.Parse()

View File

@@ -0,0 +1,127 @@
package main
import (
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func tw(text string, start, end float64) transcriptWord {
return transcriptWord{W: text, Start: start, End: end}
}
var _ = Describe("splitWordsIntoSegments (NeMo get_segment_offsets parity)", func() {
seps := []rune{'.', '?', '!'}
It("splits on sentence-ending punctuation, including the delimiter word", func() {
words := []transcriptWord{tw("hello", 0, 0.4), tw("world.", 0.4, 0.8), tw("bye", 1.0, 1.3)}
segs := splitWordsIntoSegments(words, seps, 0)
Expect(segs).To(HaveLen(2))
Expect(segs[0]).To(HaveLen(2))
Expect(segs[0][1].W).To(Equal("world."))
Expect(segs[1]).To(HaveLen(1))
Expect(segs[1][0].W).To(Equal("bye"))
})
It("keeps a single segment with no terminal punctuation and gap off", func() {
words := []transcriptWord{tw("a", 0, 0.2), tw("b", 0.2, 0.4), tw("c", 5.0, 5.2)}
segs := splitWordsIntoSegments(words, seps, 0)
Expect(segs).To(HaveLen(1))
})
It("splits on the gap rule when enabled, the gapped word starting the next segment", func() {
words := []transcriptWord{tw("a", 0, 0.2), tw("b", 0.2, 0.4), tw("c", 5.0, 5.2)}
segs := splitWordsIntoSegments(words, seps, 1.0) // c is 4.6s after b
Expect(segs).To(HaveLen(2))
Expect(segs[0]).To(HaveLen(2)) // a b
Expect(segs[1]).To(HaveLen(1)) // c
Expect(segs[1][0].W).To(Equal("c"))
})
It("checks the gap rule before punctuation (NeMo elif order)", func() {
// "b." would terminate, but c is far after it -> gap closes [a b.] at b.
words := []transcriptWord{tw("a", 0, 0.2), tw("b.", 0.2, 0.4), tw("c", 9.0, 9.2)}
segs := splitWordsIntoSegments(words, seps, 1.0)
Expect(segs).To(HaveLen(2))
Expect(segs[0]).To(HaveLen(2))
Expect(segs[1][0].W).To(Equal("c"))
})
It("still splits on punctuation when the gap rule is enabled but does not fire", func() {
words := []transcriptWord{tw("hi.", 0, 0.4), tw("bye", 0.4, 0.8)}
segs := splitWordsIntoSegments(words, seps, 5.0) // gap never reached
Expect(segs).To(HaveLen(2))
Expect(segs[0][0].W).To(Equal("hi."))
})
It("returns nothing for empty input", func() {
Expect(splitWordsIntoSegments(nil, seps, 0)).To(BeEmpty())
})
})
var _ = Describe("transcriptResultFromDoc (multi-segment)", func() {
doc := transcriptJSON{
Text: "hello world. bye now",
FrameSec: 0.08,
Words: []transcriptWord{
{W: "hello", Start: 0.0, End: 0.4},
{W: "world.", Start: 0.4, End: 0.8},
{W: "bye", Start: 1.0, End: 1.3},
{W: "now", Start: 1.3, End: 1.6},
},
Tokens: []transcriptToken{{ID: 1, T: 0.1}, {ID: 2, T: 0.5}, {ID: 3, T: 1.1}, {ID: 4, T: 1.4}},
}
It("emits one segment per punctuation-delimited group with start/end", func() {
res := transcriptResultFromDoc(doc, &pb.TranscriptRequest{}, 0)
Expect(res.Segments).To(HaveLen(2))
Expect(res.Segments[0].Text).To(Equal("hello world."))
Expect(res.Segments[0].Start).To(Equal(int64(0)))
Expect(res.Segments[0].End).To(Equal(secondsToNanos(0.8)))
Expect(res.Segments[1].Text).To(Equal("bye now"))
Expect(res.Segments[1].Start).To(Equal(secondsToNanos(1.0)))
Expect(res.Segments[1].Id).To(Equal(int32(1)))
})
It("assigns tokens to the segment whose time window contains them", func() {
res := transcriptResultFromDoc(doc, &pb.TranscriptRequest{}, 0)
Expect(res.Segments[0].Tokens).To(Equal([]int32{1, 2}))
Expect(res.Segments[1].Tokens).To(Equal([]int32{3, 4}))
})
It("attaches per-segment words only when word granularity requested", func() {
plain := transcriptResultFromDoc(doc, &pb.TranscriptRequest{}, 0)
Expect(plain.Segments[0].Words).To(BeEmpty())
withWords := transcriptResultFromDoc(doc, &pb.TranscriptRequest{TimestampGranularities: []string{"word"}}, 0)
Expect(withWords.Segments[0].Words).To(HaveLen(2))
})
It("falls back to a single text segment when there are no words", func() {
res := transcriptResultFromDoc(transcriptJSON{Text: "hi"}, &pb.TranscriptRequest{}, 0)
Expect(res.Segments).To(HaveLen(1))
Expect(res.Segments[0].Text).To(Equal("hi"))
})
})
var _ = Describe("streaming segment assembly", func() {
It("closes a segment with start/end from its words on EOU", func() {
acc := &streamSegmenter{}
acc.add(streamFeedJSON{Text: "hello world", Eou: 1, Words: []transcriptWord{
{W: "hello", Start: 0.0, End: 0.4}, {W: "world", Start: 0.4, End: 0.9},
}})
segs := acc.segments()
Expect(segs).To(HaveLen(1))
Expect(segs[0].Text).To(Equal("hello world"))
Expect(segs[0].Start).To(Equal(int64(0)))
Expect(segs[0].End).To(Equal(secondsToNanos(0.9)))
})
It("buffers words across feeds until EOU", func() {
acc := &streamSegmenter{}
acc.add(streamFeedJSON{Text: "hi", Eou: 0, Words: []transcriptWord{{W: "hi", Start: 0, End: 0.3}}})
Expect(acc.segments()).To(BeEmpty())
acc.add(streamFeedJSON{Text: "there", Eou: 1, Words: []transcriptWord{{W: "there", Start: 0.3, End: 0.7}}})
Expect(acc.segments()).To(HaveLen(1))
Expect(acc.segments()[0].Text).To(Equal("hi there"))
})
})

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=1f9ee88e09c258053fa59d5e05e23dfb10fa0b13
STABLEDIFFUSION_GGML_VERSION?=b9254dda0d10b91ee6f17fb7f4420097dd29824b
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -386,6 +386,7 @@ int load_model(const char *model, char *model_path, char* options[], int threads
const char *llm_vision_path = "";
const char *diffusion_model_path = stableDiffusionModel;
const char *high_noise_diffusion_model_path = "";
const char *uncond_diffusion_model_path = "";
const char *taesd_path = "";
const char *control_net_path = "";
const char *embedding_dir = "";
@@ -472,6 +473,7 @@ int load_model(const char *model, char *model_path, char* options[], int threads
if (!strcmp(optname, "llm_vision_path")) llm_vision_path = strdup(optval);
if (!strcmp(optname, "diffusion_model_path")) diffusion_model_path = strdup(optval);
if (!strcmp(optname, "high_noise_diffusion_model_path")) high_noise_diffusion_model_path = strdup(optval);
if (!strcmp(optname, "uncond_diffusion_model_path")) uncond_diffusion_model_path = strdup(optval);
if (!strcmp(optname, "taesd_path")) taesd_path = strdup(optval);
if (!strcmp(optname, "control_net_path")) control_net_path = strdup(optval);
if (!strcmp(optname, "embedding_dir")) {
@@ -571,6 +573,7 @@ int load_model(const char *model, char *model_path, char* options[], int threads
ctx_params.llm_vision_path = llm_vision_path;
ctx_params.diffusion_model_path = diffusion_model_path;
ctx_params.high_noise_diffusion_model_path = high_noise_diffusion_model_path;
ctx_params.uncond_diffusion_model_path = uncond_diffusion_model_path;
ctx_params.vae_path = vae_path;
ctx_params.audio_vae_path = audio_vae_path;
ctx_params.embeddings_connectors_path = embeddings_connectors_path;

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=99613cb720b65036237d44b52f753b51f75c2797
WHISPER_CPP_VERSION?=a8ec021f2750a473ff4a8f3883bc9fdf5feafa84
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -1,6 +1,6 @@
--extra-index-url https://download.pytorch.org/whl/cpu
transformers==5.0.0rc3
transformers==4.48.3
accelerate
torch==2.7.1+cpu
torch==2.4.1
torchaudio==2.4.1
coqui-tts

View File

@@ -1,5 +1,5 @@
torch==2.7.1+cpu
torch==2.4.1
torchaudio==2.4.1
transformers==5.0.0rc3
transformers==4.48.3
accelerate
coqui-tts

View File

@@ -1,6 +1,6 @@
--extra-index-url https://download.pytorch.org/whl/rocm7.0
torch==2.7.1+cpu
torch==2.10.0+rocm7.0
torchaudio==2.10.0+rocm7.0
transformers==5.0.0rc3
transformers==4.48.3
accelerate
coqui-tts

View File

@@ -1,8 +1,8 @@
--extra-index-url https://download.pytorch.org/whl/xpu
torch==2.7.1+cpu
torch==2.8.0+xpu
torchaudio==2.8.0+xpu
optimum[openvino]
setuptools
transformers==5.0.0rc3
transformers==4.48.3
accelerate
coqui-tts

View File

@@ -1,4 +1,4 @@
torch==2.7.1+cpu
transformers==5.0.0rc3
torch==2.7.1
transformers==4.48.3
accelerate
coqui-tts

View File

@@ -23,9 +23,9 @@ import (
"github.com/mudler/LocalAI/core/services/routing/pii"
"github.com/mudler/LocalAI/core/services/routing/router"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/signals"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/signals"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
@@ -308,10 +308,31 @@ func New(opts ...config.AppOption) (*Application, error) {
application.galleryService.SetNATSClient(distSvc.Nats)
if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil {
// Clean up stale in-progress operations from previous crashed instances
if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
if _, err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
xlog.Warn("Failed to clean stale gallery operations", "error", err)
}
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
// Reap stale ops periodically, not just at boot: an op orphaned by
// a replica that died mid-install (its foreground handler goroutine
// gone) would otherwise linger "processing" in the UI until the next
// restart. 30m matches the install/upgrade ceiling so a genuinely
// slow op is never reaped out from under itself.
gsvc := application.galleryService
go func() {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-options.Context.Done():
return
case <-ticker.C:
if _, err := gsvc.ReapStaleOperations(30 * time.Minute); err != nil {
xlog.Warn("Failed to reap stale gallery operations", "error", err)
}
}
}
}()
}
// Hydrate from the store first so the wildcard subscriber finds an
// already-populated statuses map for any operations still in flight

View File

@@ -214,7 +214,9 @@ func (uc *UpgradeChecker) runCheck(ctx context.Context) {
"from", info.InstalledVersion, "to", info.AvailableVersion)
var err error
if bm != nil {
err = bm.UpgradeBackend(ctx, name, nil)
// Background auto-upgrade: no live admin watching a progress bar,
// so opID is empty and the distributed path skips progress streaming.
err = bm.UpgradeBackend(ctx, "", name, nil)
} else {
err = gallery.UpgradeBackend(ctx, uc.systemState, uc.modelLoader,
uc.galleries, name, nil, uc.appConfig.RequireBackendIntegrity)

View File

@@ -39,7 +39,21 @@ func llamaCppDefaults(cfg *ModelConfig, modelPath string) {
}
}()
f, err := gguf.ParseGGUFFile(guessPath)
// Startup parses every model's GGUF header to guess defaults. We only need
// scalar metadata (architecture, head/ff counts, chat_template, token IDs,
// MTP head) plus array *lengths* — never the array *contents*. Two options
// keep this cheap, which matters when many models live on slow storage such
// as a Docker volume (see https://github.com/mudler/LocalAI/issues/9790):
//
// - SkipLargeMetadata: seek past large array-valued metadata (the tokenizer
// vocab: tokenizer.ggml.tokens/scores/merges, often >100k entries) instead
// of reading and allocating every element. Lengths stay populated.
// - UseMMap: read the header via a memory map so faulting in a few pages
// replaces hundreds of thousands of tiny read() syscalls (measured ~524k
// -> 8 for a 256k-token vocab), the dominant cost on slow filesystems.
//
// The mapping is released when ParseGGUFFile returns.
f, err := gguf.ParseGGUFFile(guessPath, gguf.UseMMap(), gguf.SkipLargeMetadata())
if err == nil {
guessGGUFFromFile(cfg, f, 0)
}

View File

@@ -1,13 +1,76 @@
package config_test
import (
"bytes"
"encoding/binary"
"os"
"path/filepath"
. "github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/schema"
gguf "github.com/gpustack/gguf-parser-go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// GGUF metadata value type tags (see github.com/gpustack/gguf-parser-go).
const (
ggufTypeUint32 uint32 = 4
ggufTypeString uint32 = 8
ggufTypeArray uint32 = 9
)
// writeTestGGUF emits a minimal but valid little-endian GGUF v3 header carrying
// the scalar metadata the llama-cpp hook guesses from plus a large string vocab
// array (tokenizer.ggml.tokens). The big array is exactly what SkipLargeMetadata
// + UseMMap are expected to avoid reading element-by-element, so it must survive a
// round-trip through the real hook without corrupting the guessed defaults.
func writeTestGGUF(path, chatTemplate string, vocab int) error {
wStr := func(b *bytes.Buffer, s string) {
binary.Write(b, binary.LittleEndian, uint64(len(s)))
b.WriteString(s)
}
kvStr := func(b *bytes.Buffer, k, v string) {
wStr(b, k)
binary.Write(b, binary.LittleEndian, ggufTypeString)
wStr(b, v)
}
kvU32 := func(b *bytes.Buffer, k string, v uint32) {
wStr(b, k)
binary.Write(b, binary.LittleEndian, ggufTypeUint32)
binary.Write(b, binary.LittleEndian, v)
}
var meta bytes.Buffer
kvStr(&meta, "general.architecture", "llama")
kvStr(&meta, "general.name", "ReproModel")
kvU32(&meta, "llama.context_length", 4096)
kvU32(&meta, "llama.attention.head_count", 32)
kvU32(&meta, "llama.feed_forward_length", 11008)
kvU32(&meta, "llama.block_count", 32)
kvU32(&meta, "tokenizer.ggml.bos_token_id", 1)
kvStr(&meta, "tokenizer.chat_template", chatTemplate)
// large array value — the one the optimization skips reading
wStr(&meta, "tokenizer.ggml.tokens")
binary.Write(&meta, binary.LittleEndian, ggufTypeArray)
binary.Write(&meta, binary.LittleEndian, ggufTypeString)
binary.Write(&meta, binary.LittleEndian, uint64(vocab))
for i := 0; i < vocab; i++ {
wStr(&meta, "token")
}
var out bytes.Buffer
binary.Write(&out, binary.LittleEndian, gguf.GGUFMagicGGUFLe)
binary.Write(&out, binary.LittleEndian, uint32(3)) // version
binary.Write(&out, binary.LittleEndian, uint64(0)) // tensor count
binary.Write(&out, binary.LittleEndian, uint64(9)) // metadata kv count
out.Write(meta.Bytes())
return os.WriteFile(path, out.Bytes(), 0o644)
}
var _ = Describe("Backend hooks and parser defaults", func() {
Context("MatchParserDefaults", func() {
It("matches Qwen3 family", func() {
@@ -137,6 +200,58 @@ var _ = Describe("Backend hooks and parser defaults", func() {
})
})
Context("llamaCppDefaults GGUF guessing", func() {
// Regression coverage for https://github.com/mudler/LocalAI/issues/9790:
// the hook reads GGUF headers with SkipLargeMetadata + UseMMap to avoid
// pulling the whole tokenizer vocab off (slow) disk on every startup. This
// verifies that skipping the vocab array still yields the correct guessed
// defaults from the remaining scalar metadata.
const chatTemplate = "{{ bos_token }}{% for m in messages %}{{ m.content }}{% endfor %}"
It("guesses defaults from a GGUF whose large vocab is skipped", func() {
dir := GinkgoT().TempDir()
modelFile := "repro.gguf"
Expect(writeTestGGUF(filepath.Join(dir, modelFile), chatTemplate, 50000)).To(Succeed())
// A pre-set context size short-circuits the GGUF run-estimate, which
// needs full tensor info this header-only fixture deliberately omits;
// the metadata-reading path the optimization touches is unaffected.
ctxSize := 4096
cfg := &ModelConfig{
Backend: "llama-cpp",
LLMConfig: LLMConfig{ContextSize: &ctxSize},
PredictionOptions: schema.PredictionOptions{
BasicModelRequest: schema.BasicModelRequest{Model: modelFile},
},
}
cfg.SetDefaults(ModelPath(dir))
// chat_template is a scalar string, not part of the skipped array,
// so it must be captured verbatim.
Expect(cfg.GetModelTemplate()).To(Equal(chatTemplate))
// scalar-derived defaults are still applied
Expect(cfg.ContextSize).NotTo(BeNil())
Expect(cfg.NGPULayers).NotTo(BeNil())
Expect(cfg.TemplateConfig.UseTokenizerTemplate).To(BeTrue())
Expect(cfg.KnownUsecaseStrings).To(ContainElement("FLAG_CHAT"))
})
It("falls back to the default context size when the GGUF is unreadable", func() {
dir := GinkgoT().TempDir()
Expect(os.WriteFile(filepath.Join(dir, "bad.gguf"), []byte("not a gguf"), 0o644)).To(Succeed())
cfg := &ModelConfig{
Backend: "llama-cpp",
PredictionOptions: schema.PredictionOptions{
BasicModelRequest: schema.BasicModelRequest{Model: "bad.gguf"},
},
}
cfg.SetDefaults(ModelPath(dir))
Expect(cfg.ContextSize).NotTo(BeNil())
})
})
Context("PromptCacheAll default", func() {
It("defaults to true when omitted from YAML", func() {
cfg := &ModelConfig{}

View File

@@ -30,11 +30,26 @@ func MTPSpecOptions() []string {
return out
}
// HasEmbeddedMTPHead reports whether the parsed GGUF declares a Multi-Token
// Prediction head. Detection reads `<arch>.nextn_predict_layers`, which is
// what `gguf_writer.add_nextn_predict_layers(n)` emits in upstream's
// isDraftOnlyAssistantArch reports whether an architecture names a standalone
// MTP *draft* model rather than a self-speculating trunk. Upstream's Gemma4 MTP
// (ggml-org/llama.cpp#23398) registers the head as a separate `gemma4-assistant`
// architecture whose GGUF still carries `nextn_predict_layers`, but which cannot
// run alone: it requires a paired target context (`ctx_other`). Such archs must
// not trigger the embedded-head self-speculation defaults. The `-assistant`
// suffix is upstream's naming convention for these draft-only checkpoints.
func isDraftOnlyAssistantArch(arch string) bool {
return strings.HasSuffix(arch, "-assistant")
}
// HasEmbeddedMTPHead reports whether the parsed GGUF declares a self-speculating
// Multi-Token Prediction head. Detection reads `<arch>.nextn_predict_layers`,
// which is what `gguf_writer.add_nextn_predict_layers(n)` emits in upstream's
// `conversion/qwen.py` MTP mixin. A positive layer count means the head is
// present in the same GGUF as the trunk.
//
// Draft-only assistant architectures (e.g. Gemma4's `gemma4-assistant`) carry
// the same key but are separate draft checkpoints meant to be paired with a
// target model, so they are deliberately excluded here.
func HasEmbeddedMTPHead(f *gguf.GGUFFile) (uint32, bool) {
if f == nil {
return 0, false
@@ -43,6 +58,9 @@ func HasEmbeddedMTPHead(f *gguf.GGUFFile) (uint32, bool) {
if arch == "" {
return 0, false
}
if isDraftOnlyAssistantArch(arch) {
return 0, false
}
v, ok := f.Header.MetadataKV.Get(arch + ".nextn_predict_layers")
if !ok {
return 0, false

View File

@@ -3,10 +3,33 @@ package config_test
import (
. "github.com/mudler/LocalAI/core/config"
gguf "github.com/gpustack/gguf-parser-go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// ggufWithArch fabricates a minimal in-memory GGUF carrying the given
// `general.architecture` and a positive `<arch>.nextn_predict_layers` count,
// so HasEmbeddedMTPHead can be exercised without a real model file.
func ggufWithArch(arch string, nextn uint32) *gguf.GGUFFile {
return &gguf.GGUFFile{
Header: gguf.GGUFHeader{
MetadataKV: gguf.GGUFMetadataKVs{
{
Key: "general.architecture",
ValueType: gguf.GGUFMetadataValueTypeString,
Value: arch,
},
{
Key: arch + ".nextn_predict_layers",
ValueType: gguf.GGUFMetadataValueTypeUint32,
Value: nextn,
},
},
},
}
}
var _ = Describe("MTP auto-defaults", func() {
Context("MTPSpecOptions", func() {
It("returns the upstream-recommended speculative tuple", func() {
@@ -82,5 +105,20 @@ var _ = Describe("MTP auto-defaults", func() {
Expect(ok).To(BeFalse())
Expect(n).To(BeZero())
})
It("detects a same-GGUF embedded head (DeepSeek/Qwen style)", func() {
n, ok := HasEmbeddedMTPHead(ggufWithArch("qwen3moe", 1))
Expect(ok).To(BeTrue())
Expect(n).To(Equal(uint32(1)))
})
It("ignores a gemma4-assistant draft-only model", func() {
// The assistant GGUF carries nextn_predict_layers but is a separate
// draft model that requires a paired target (ctx_other); it cannot
// self-speculate, so it must not trigger the embedded-head defaults.
n, ok := HasEmbeddedMTPHead(ggufWithArch("gemma4-assistant", 48))
Expect(ok).To(BeFalse())
Expect(n).To(BeZero())
})
})
})

View File

@@ -180,18 +180,21 @@ func (s *GalleryStore) Cancel(id string) error {
return s.UpdateStatus(id, "cancelled", "")
}
// CleanStale marks abandoned in-progress operations as failed.
// Should be called on startup to recover from crashed instances that
// left records in pending/downloading/processing state.
func (s *GalleryStore) CleanStale(age time.Duration) error {
// CleanStale marks abandoned in-progress operations as failed and returns the
// number of rows reaped. Called on startup AND periodically to recover from
// crashed/restarted instances that left records in pending/downloading/
// processing state — an op orphaned after startup would otherwise linger
// "processing" until the next restart.
func (s *GalleryStore) CleanStale(age time.Duration) (int64, error) {
cutoff := time.Now().Add(-age)
return s.db.Model(&GalleryOperationRecord{}).
res := s.db.Model(&GalleryOperationRecord{}).
Where("updated_at < ? AND status IN ?", cutoff, activeStatuses).
Updates(map[string]any{
"status": "failed",
"error": "stale operation cleaned up on startup",
"error": "stale operation reaped (abandoned by a crashed or restarted instance)",
"updated_at": time.Now(),
}).Error
})
return res.RowsAffected, res.Error
}
// CleanOld removes operations older than the given duration.

View File

@@ -71,7 +71,7 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend,
var err error
if op.Upgrade {
err = g.backendManager.UpgradeBackend(ctx, op.GalleryElementName, progressCallback)
err = g.backendManager.UpgradeBackend(ctx, op.ID, op.GalleryElementName, progressCallback)
} else if op.Delete {
err = g.backendManager.DeleteBackend(op.GalleryElementName)
} else {

View File

@@ -0,0 +1,106 @@
package galleryop_test
import (
"context"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/testutil"
)
// Reproduces "a cancelled/orphaned op resurrects as 'processing' after a pod
// restart". CancelOperation flipped the in-memory status to cancelled and
// broadcast a NATS event, but never persisted the terminal status to the
// gallery store. On the next replica restart the still-"pending" row hydrated
// straight back into processingBackends and the UI spun again. CancelOperation
// must persist the cancellation so it survives a restart.
var _ = Describe("GalleryService.CancelOperation persistence", func() {
It("persists the cancelled status to the gallery store", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
// Seed an in-flight op as if a replica was mid-install.
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "op-cancel",
GalleryElementName: "llama-cpp-development",
OpType: "backend_install",
Status: "pending",
Progress: 0,
})).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svc.SetGalleryStore(store)
// Make the op locally cancellable so CancelOperation proceeds.
svc.StoreCancellation("op-cancel", context.CancelFunc(func() {}))
Expect(svc.CancelOperation("op-cancel")).To(Succeed())
// The persisted row must now be terminal — otherwise it re-hydrates as
// pending on the next restart.
rec, err := store.Get("op-cancel")
Expect(err).ToNot(HaveOccurred())
Expect(rec.Status).To(Equal("cancelled"))
// And a fresh service hydrating from the store must NOT see it as active.
fresh := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
fresh.SetGalleryStore(store)
Expect(fresh.Hydrate()).To(Succeed())
Expect(fresh.GetStatus("op-cancel")).To(BeNil(),
"a cancelled op must not hydrate back as active after a restart")
})
})
// Reproduces "an op orphaned by a replica that died mid-flight stays 'pending'
// forever". CleanStale (which marks abandoned active ops failed) only ran once
// on startup, so an op orphaned AFTER startup was never reaped until the next
// restart. The service must reap stale ops on an interval, not just at boot.
var _ = Describe("GalleryService.ReapStaleOperations", func() {
It("marks abandoned active ops terminal once they pass the age cutoff", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "orphan-op",
GalleryElementName: "llama-cpp-development",
OpType: "backend_install",
Status: "pending",
Progress: 0,
})).To(Succeed())
// Force the row's updated_at into the past so it is older than the cutoff.
Expect(db.Exec(
"UPDATE gallery_operations SET updated_at = ? WHERE id = ?",
time.Now().Add(-1*time.Hour), "orphan-op",
).Error).To(Succeed())
// A fresh, still-progressing op must NOT be reaped.
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "live-op",
GalleryElementName: "vllm-development",
OpType: "backend_install",
Status: "downloading",
Progress: 50,
})).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svc.SetGalleryStore(store)
reaped, err := svc.ReapStaleOperations(30 * time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(reaped).To(Equal(int64(1)))
orphan, err := store.Get("orphan-op")
Expect(err).ToNot(HaveOccurred())
Expect(orphan.Status).To(Equal("failed"))
live, err := store.Get("live-op")
Expect(err).ToNot(HaveOccurred())
Expect(live.Status).To(Equal("downloading"), "a recently-updated op must not be reaped")
})
})

View File

@@ -20,7 +20,7 @@ type BackendManager interface {
InstallBackend(ctx context.Context, op *ManagementOp[gallery.GalleryBackend, any], progressCb ProgressCallback) error
DeleteBackend(name string) error
ListBackends() (gallery.SystemBackends, error)
UpgradeBackend(ctx context.Context, name string, progressCb ProgressCallback) error
UpgradeBackend(ctx context.Context, opID, name string, progressCb ProgressCallback) error
CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error)
// IsDistributed reports whether installs fan out across worker nodes.
// The HTTP layer uses this to refuse hardware-specific (non-meta) installs

View File

@@ -96,7 +96,10 @@ func (b *LocalBackendManager) ListBackends() (gallery.SystemBackends, error) {
return gallery.ListSystemBackends(b.systemState)
}
func (b *LocalBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb ProgressCallback) error {
// UpgradeBackend ignores opID: a single-node install reports progress through
// the local progressCb already; opID only matters for distributed per-node
// streaming (see DistributedBackendManager.UpgradeBackend).
func (b *LocalBackendManager) UpgradeBackend(ctx context.Context, _ string, name string, progressCb ProgressCallback) error {
return gallery.UpgradeBackend(ctx, b.systemState, b.modelLoader, b.backendGalleries, name, progressCb, b.requireBackendIntegrity)
}

View File

@@ -0,0 +1,92 @@
package galleryop_test
import (
"errors"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
)
// These specs reproduce the distributed "Reinstall spins forever" bug:
// processingBackends (the UI spinner source) is built from OpCache.GetStatus,
// which historically returned every cached op unconditionally. Cleanup only
// happened when a client polled /api/backends/job/:uid, but the Manage-page
// Reinstall/Upgrade buttons never poll, so a completed install stayed in
// processingBackends forever. GetStatus must self-evict terminal ops.
var _ = Describe("OpCache.GetStatus eviction", func() {
var (
svc *galleryop.GalleryService
cache *galleryop.OpCache
)
BeforeEach(func() {
svc = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
cache = galleryop.NewOpCache(svc)
})
It("keeps an op that is still processing", func() {
cache.SetBackend("llama-cpp", "uuid-1")
svc.UpdateStatus("uuid-1", &galleryop.OpStatus{Message: "processing backend: llama-cpp", Progress: 0})
processing, _ := cache.GetStatus()
Expect(processing).To(HaveKeyWithValue("llama-cpp", "uuid-1"))
Expect(cache.Exists("llama-cpp")).To(BeTrue())
})
It("evicts a completed op so it no longer shows as processing", func() {
cache.SetBackend("llama-cpp", "uuid-1")
svc.UpdateStatus("uuid-1", &galleryop.OpStatus{Processed: true, Progress: 100, Message: "completed"})
processing, _ := cache.GetStatus()
Expect(processing).NotTo(HaveKey("llama-cpp"))
Expect(cache.Exists("llama-cpp")).To(BeFalse())
})
It("keeps a failed op so the operations panel can surface the error and offer Dismiss", func() {
cache.SetBackend("piper", "uuid-2")
svc.UpdateStatus("uuid-2", &galleryop.OpStatus{Processed: true, Error: errors.New("boom")})
processing, _ := cache.GetStatus()
Expect(processing).To(HaveKeyWithValue("piper", "uuid-2"))
Expect(cache.Exists("piper")).To(BeTrue())
})
It("evicts a cancelled op", func() {
cache.SetBackend("vllm", "uuid-3")
svc.UpdateStatus("uuid-3", &galleryop.OpStatus{Processed: true, Cancelled: true, Message: "cancelled"})
processing, _ := cache.GetStatus()
Expect(processing).NotTo(HaveKey("vllm"))
})
It("does not evict an op with no status yet (queued)", func() {
cache.SetBackend("whisper", "uuid-4")
processing, taskTypes := cache.GetStatus()
Expect(processing).To(HaveKeyWithValue("whisper", "uuid-4"))
Expect(taskTypes).To(HaveKeyWithValue("whisper", "Waiting"))
})
// Regression guard: GetStatus is called concurrently by four HTTP handlers
// (~1s poll). An earlier version evicted by deleting from m.Map() — which
// returns the live internal map by reference — causing a fatal
// "concurrent map writes" crash. Run under -race; must not panic or race.
It("is safe under concurrent GetStatus + Set/complete", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
for i := 0; i < 2000; i++ {
_, _ = cache.GetStatus()
}
close(done)
}()
for i := 0; i < 2000; i++ {
id := "uuid-c"
cache.SetBackend("concurrent-backend", id)
// Half the time mark it completed so GetStatus evicts it.
if i%2 == 0 {
svc.UpdateStatus(id, &galleryop.OpStatus{Processed: true, Progress: 100, Message: "completed"})
}
_, _ = cache.GetStatus()
}
<-done
})
})

View File

@@ -408,12 +408,34 @@ func (m *OpCache) Exists(key string) bool {
}
func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
processingModelsData := m.Map()
taskTypes := map[string]string{}
processingModelsData := map[string]string{}
for k, v := range processingModelsData {
// Iterate a snapshot (Keys() copies) and build a fresh result map. We must
// NOT delete from m.Map() during the range: Map() returns the live internal
// map by reference, so a bare delete here would be an unsynchronized write
// to a map four HTTP handlers read every ~1s — a concurrent-map-write crash.
// Collect evictions and apply them via the locked DeleteUUID after the loop.
var evict []string
for _, k := range m.status.Keys() {
v := m.status.Get(k)
if v == "" {
continue // raced with a concurrent Delete
}
status := m.galleryService.GetStatus(v)
// Terminal ops must not keep showing as "processing". Cleanup was
// previously only triggered by a client polling /api/backends/job/:uid,
// but the Manage-page Reinstall/Upgrade buttons never poll, so completed
// ops leaked into processingBackends forever and the card spun
// "reinstalling" indefinitely. Evict here on the list read (the UI always
// calls this). We only evict SUCCESS/cancelled terminals (Error == nil):
// failed ops are kept so /api/operations can surface the error and offer
// Dismiss. DeleteUUID broadcasts the eviction so peer replicas converge.
if status != nil && status.Processed && status.Error == nil {
evict = append(evict, v)
continue
}
processingModelsData[k] = v
taskTypes[k] = "Installation"
if status != nil && status.Deletion {
taskTypes[k] = "Deletion"
@@ -422,6 +444,10 @@ func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
}
}
for _, v := range evict {
m.DeleteUUID(v)
}
return processingModelsData, taskTypes
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
@@ -31,9 +32,9 @@ type GalleryService struct {
// natsClient is the wider MessagingClient (Publisher + subscribe methods)
// when wired by the distributed startup path; broadcastSubs holds the
// progress + cancel subscriptions opened by SubscribeBroadcasts.
natsClient messaging.MessagingClient
galleryStore *distributed.GalleryStore
broadcastSubs []messaging.Subscription
natsClient messaging.MessagingClient
galleryStore *distributed.GalleryStore
broadcastSubs []messaging.Subscription
// OnBackendOpCompleted is fired after every successful install/upgrade/delete
// on the backend channel. The Application wires this to UpgradeChecker.TriggerCheck
@@ -274,6 +275,29 @@ func (g *GalleryService) GetAllStatus() map[string]*OpStatus {
return g.statuses
}
// ReapStaleOperations marks abandoned in-progress operations (pending/
// downloading/processing) older than `age` as failed, so an op orphaned by a
// replica that died mid-flight does not linger as "processing" forever. The
// store's CleanStale runs once on startup; this exposes it for periodic
// invocation (a post-startup orphan is otherwise not reaped until the next
// restart). No-op when no gallery store is wired. Returns rows reaped.
func (g *GalleryService) ReapStaleOperations(age time.Duration) (int64, error) {
g.Lock()
store := g.galleryStore
g.Unlock()
if store == nil {
return 0, nil
}
n, err := store.CleanStale(age)
if err != nil {
return 0, err
}
if n > 0 {
xlog.Info("Reaped stale gallery operations", "count", n)
}
return n, nil
}
// CancelOperation cancels an in-progress operation by its ID.
//
// In distributed mode the UI's cancel click may land on a different replica
@@ -295,6 +319,7 @@ func (g *GalleryService) CancelOperation(id string) error {
}
nc := g.natsClient
store := g.galleryStore
if !localExists && nc == nil {
g.Unlock()
@@ -315,6 +340,17 @@ func (g *GalleryService) CancelOperation(id string) error {
}
g.Unlock()
// Persist the terminal status so the cancel survives a restart. Without
// this the row stays in its active state and re-hydrates straight back into
// processingBackends on the next replica boot — the UI spins again on an op
// the admin already cancelled. The peer that broadcasts wins the write; a
// no-op when standalone (store nil).
if store != nil {
if err := store.Cancel(id); err != nil {
xlog.Warn("Failed to persist gallery operation cancellation", "op_id", id, "error", err)
}
}
// I/O and user-provided callback after Unlock — the cancel-wildcard
// subscriber loops back into applyCancel on this same replica, which
// would otherwise deadlock on g.Mutex.

View File

@@ -194,6 +194,14 @@ type BackendUpgradeRequest struct {
// but the field lets future per-replica metadata (e.g. progress reporting
// scoped to a slot) ride the same wire without a v3 type.
ReplicaIndex int32 `json:"replica_index,omitempty"`
// OpID identifies the admin-side operation. When non-empty the worker
// publishes BackendInstallProgressEvent values to
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the force-reinstall
// runs, so the master can stream per-node progress for upgrades exactly as
// it already does for installs (an upgrade IS a force-reinstall, so the
// install-progress subject is reused rather than minting a new one — no new
// NATS permission or rolling-update compat surface). Empty on legacy callers.
OpID string `json:"op_id,omitempty"`
}
// BackendUpgradeReply mirrors BackendInstallReply minus Address — upgrade does

View File

@@ -533,7 +533,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
// backend.upgrade, we try the legacy backend.install Force=true path so a
// new master + old worker still converges. Drop the fallback once every
// worker in the fleet is on 2026-05-08 or newer.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, opID, name string, progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(d.backendGalleries)
installed, err := d.ListBackends()
@@ -549,17 +549,39 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
targetNodeIDs[n.NodeID] = true
}
// Empty opID: the caller (galleryop) doesn't thread an op ID into
// UpgradeBackend today, so we can't tag per-node sink writes with the
// right OpStatus key. Until the upgrade path takes a ManagementOp the
// way InstallBackend does, the sink stays no-op here.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
result, err := d.enqueueAndDrainBackendOp(ctx, opID, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// Per-node progress sink: fan each worker download tick into the legacy
// single-bar progressCb and the per-node OpStatus.Nodes view, exactly as
// InstallBackend does. Defined per-node so each closure captures its own
// node.Name. Without this an upgrade blocks opaque at progress 0 for the
// whole 15m round-trip (the original "reinstalling but nothing happens").
onProgress := func(ev messaging.BackendInstallProgressEvent) {
if progressCb != nil {
progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage)
}
if d.progressSink != nil && opID != "" {
d.progressSink.UpdateNodeProgress(opID, ev.NodeID, galleryop.NodeProgress{
NodeID: ev.NodeID,
NodeName: node.Name,
Status: galleryop.NodeStatusDownloading,
FileName: ev.FileName,
Current: ev.Current,
Total: ev.Total,
Percentage: ev.Percentage,
Phase: ev.Phase,
})
}
}
var onProgressArg func(messaging.BackendInstallProgressEvent)
if progressCb != nil || d.progressSink != nil {
onProgressArg = onProgress
}
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0, opID, onProgressArg)
if err != nil {
// Rolling-update fallback: an older worker doesn't know
// backend.upgrade. Try the legacy install-with-force path.
if errors.Is(err, nats.ErrNoResponders) {
instReply, instErr := d.adapter.installWithForceFallback(node.ID, name, string(galleriesJSON), "", "", "", 0)
instReply, instErr := d.adapter.installWithForceFallback(node.ID, name, string(galleriesJSON), "", "", "", 0, opID, onProgressArg)
if instErr != nil {
return instErr
}

View File

@@ -317,7 +317,7 @@ func (stubLocalBackendManager) DeleteBackend(_ string) error { return gallery.Er
func (stubLocalBackendManager) ListBackends() (gallery.SystemBackends, error) {
return gallery.SystemBackends{}, nil
}
func (stubLocalBackendManager) UpgradeBackend(_ context.Context, _ string, _ galleryop.ProgressCallback) error {
func (stubLocalBackendManager) UpgradeBackend(_ context.Context, _ string, _ string, _ galleryop.ProgressCallback) error {
return nil
}
func (stubLocalBackendManager) CheckUpgrades(_ context.Context) (map[string]gallery.UpgradeInfo, error) {
@@ -782,7 +782,7 @@ var _ = Describe("DistributedBackendManager", func() {
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(n2.ID),
messaging.BackendUpgradeReply{Success: false, Error: "registry unauthorized"})
err := mgr.UpgradeBackend(ctx, "vllm-development", nil)
err := mgr.UpgradeBackend(ctx, "", "vllm-development", nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("worker-a"))
Expect(err.Error()).To(ContainSubstring("image manifest not found"))
@@ -797,7 +797,7 @@ var _ = Describe("DistributedBackendManager", func() {
scriptInstalled("vllm-development", n1.ID)
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(n1.ID),
messaging.BackendUpgradeReply{Success: true})
Expect(mgr.UpgradeBackend(ctx, "vllm-development", nil)).To(Succeed())
Expect(mgr.UpgradeBackend(ctx, "", "vllm-development", nil)).To(Succeed())
})
})
@@ -819,7 +819,7 @@ var _ = Describe("DistributedBackendManager", func() {
// if the manager attempts it, the scripted-client default returns
// fakeNoRespondersErr and the assertion below fails loudly.
Expect(mgr.UpgradeBackend(ctx, "cpu-insightface-development", nil)).To(Succeed())
Expect(mgr.UpgradeBackend(ctx, "", "cpu-insightface-development", nil)).To(Succeed())
mc.mu.Lock()
defer mc.mu.Unlock()
@@ -835,7 +835,7 @@ var _ = Describe("DistributedBackendManager", func() {
n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051")
scriptNoBackends(n1.ID)
err := mgr.UpgradeBackend(ctx, "vllm-development", nil)
err := mgr.UpgradeBackend(ctx, "", "vllm-development", nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("not installed on any node"))
@@ -865,7 +865,7 @@ var _ = Describe("DistributedBackendManager", func() {
func(req messaging.BackendInstallRequest) bool { return req.Force },
messaging.BackendInstallReply{Success: true, Address: "10.0.0.1:50100"})
Expect(mgr.UpgradeBackend(ctx, "vllm-development", nil)).To(Succeed())
Expect(mgr.UpgradeBackend(ctx, "", "vllm-development", nil)).To(Succeed())
})
It("returns the upgrade error when it is not ErrNoResponders", func() {
@@ -875,7 +875,7 @@ var _ = Describe("DistributedBackendManager", func() {
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(n.ID),
messaging.BackendUpgradeReply{Success: false, Error: "disk full"})
err := mgr.UpgradeBackend(ctx, "vllm-development", nil)
err := mgr.UpgradeBackend(ctx, "", "vllm-development", nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("disk full"))
})

View File

@@ -0,0 +1,135 @@
package nodes
import (
"context"
"runtime"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/testutil"
)
// These specs reproduce the distributed "pending ops behind dead nodes leak
// forever" bug. ListDuePendingBackendOps only returns rows whose node is
// StatusHealthy, so an op queued against a node that goes offline (heartbeat
// stale) or draining (admin action) is never retried, never aged out, and
// never deleted. On a live cluster these rows sat at attempts=0 indefinitely
// and kept the UI operation alive. DeleteStalePendingBackendOps garbage-collects
// them: draining nodes immediately (models already purged), offline nodes only
// after a grace window so a brief heartbeat blip does not nuke in-flight work.
var _ = Describe("DeleteStalePendingBackendOps", func() {
var (
registry *NodeRegistry
ctx context.Context
)
BeforeEach(func() {
if runtime.GOOS == "darwin" {
Skip("testcontainers requires Docker, not available on macOS CI")
}
db := testutil.SetupTestDB()
var err error
registry, err = NewNodeRegistry(db)
Expect(err).ToNot(HaveOccurred())
ctx = context.Background()
})
// registerBackend registers an auto-approved backend node and returns its ID.
registerBackend := func(name, address string) string {
node := &BackendNode{Name: name, NodeType: NodeTypeBackend, Address: address}
Expect(registry.Register(ctx, node, true)).To(Succeed())
fetched, err := registry.GetByName(ctx, name)
Expect(err).ToNot(HaveOccurred())
return fetched.ID
}
// setHeartbeat forces a node's last_heartbeat (Register/MarkOffline leave it
// at "now"; we age it to simulate a node that went silent a while ago).
setHeartbeat := func(nodeID string, t time.Time) {
Expect(registry.db.WithContext(ctx).Model(&BackendNode{}).
Where("id = ?", nodeID).
Update("last_heartbeat", t).Error).To(Succeed())
}
pendingCountFor := func(nodeID string) int64 {
var n int64
Expect(registry.db.WithContext(ctx).Model(&PendingBackendOp{}).
Where("node_id = ?", nodeID).Count(&n).Error).To(Succeed())
return n
}
It("clears ops behind an offline node whose heartbeat is past the grace window", func() {
dead := registerBackend("nvidia-thor", "10.0.0.9:50051")
Expect(registry.UpsertPendingBackendOp(ctx, dead, "llama-cpp-development", OpBackendInstall, nil)).To(Succeed())
Expect(registry.MarkOffline(ctx, dead)).To(Succeed())
setHeartbeat(dead, time.Now().Add(-1*time.Hour))
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(1)))
Expect(pendingCountFor(dead)).To(Equal(int64(0)))
})
It("clears ops behind a draining node immediately, even with a fresh heartbeat", func() {
// Mirrors the live mac-mini-m4 case: draining but still heartbeating.
drain := registerBackend("mac-mini-m4", "10.0.0.3:50051")
Expect(registry.UpsertPendingBackendOp(ctx, drain, "llama-cpp-development", OpBackendInstall, nil)).To(Succeed())
Expect(registry.MarkDraining(ctx, drain)).To(Succeed())
setHeartbeat(drain, time.Now()) // fresh heartbeat
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(1)))
Expect(pendingCountFor(drain)).To(Equal(int64(0)))
})
It("clears ops behind an unhealthy node with a stale heartbeat (never ages to offline)", func() {
// A node marked unhealthy on a NATS ErrNoResponders never transitions to
// offline, so its ops must be reaped via the same stale-heartbeat path.
sick := registerBackend("agx-orin-sick", "10.0.0.7:50051")
Expect(registry.UpsertPendingBackendOp(ctx, sick, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed())
Expect(registry.MarkUnhealthy(ctx, sick)).To(Succeed())
setHeartbeat(sick, time.Now().Add(-1*time.Hour))
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(1)))
Expect(pendingCountFor(sick)).To(Equal(int64(0)))
})
It("keeps ops behind an unhealthy node that is still heartbeating (recovering)", func() {
recovering := registerBackend("agx-orin-flap", "10.0.0.8:50051")
Expect(registry.UpsertPendingBackendOp(ctx, recovering, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed())
Expect(registry.MarkUnhealthy(ctx, recovering)).To(Succeed())
setHeartbeat(recovering, time.Now()) // fresh heartbeat → recovering
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(0)))
Expect(pendingCountFor(recovering)).To(Equal(int64(1)))
})
It("keeps ops behind a node that only just went offline (within grace)", func() {
blip := registerBackend("agx-orin", "10.0.0.4:50051")
Expect(registry.UpsertPendingBackendOp(ctx, blip, "parakeet-cpp-development", OpBackendInstall, nil)).To(Succeed())
Expect(registry.MarkOffline(ctx, blip)).To(Succeed())
setHeartbeat(blip, time.Now().Add(-1*time.Minute)) // gone only 1m, grace 10m
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(0)))
Expect(pendingCountFor(blip)).To(Equal(int64(1)))
})
It("keeps ops behind a healthy node", func() {
healthy := registerBackend("dgx-spark", "10.0.0.1:50051")
Expect(registry.UpsertPendingBackendOp(ctx, healthy, "llama-cpp-development", OpBackendUpgrade, nil)).To(Succeed())
removed, err := registry.DeleteStalePendingBackendOps(ctx, 10*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(removed).To(Equal(int64(0)))
Expect(pendingCountFor(healthy)).To(Equal(int64(1)))
})
})

View File

@@ -189,6 +189,13 @@ func (rc *ReplicaReconciler) reconcileState(ctx context.Context) {
// passed on nodes that are currently healthy. On success the row is deleted;
// on failure attempts++ and next_retry_at moves out via exponential backoff.
func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// Garbage-collect ops behind nodes that went offline/draining. These are
// invisible to ListDuePendingBackendOps (which filters status=healthy), so
// without this sweep they leak forever and keep the UI operation spinning.
if _, err := rc.registry.DeleteStalePendingBackendOps(ctx, stalePendingBackendOpGrace); err != nil {
xlog.Warn("Reconciler: failed to clear stale pending backend ops", "error", err)
}
ops, err := rc.registry.ListDuePendingBackendOps(ctx)
if err != nil {
xlog.Warn("Reconciler: failed to list pending backend ops", "error", err)
@@ -223,10 +230,13 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// the same worker. Falls back to the legacy backend.install
// Force=true path on nats.ErrNoResponders for old workers that
// don't subscribe to backend.upgrade yet (rolling-update window).
reply, err := rc.adapter.UpgradeBackend(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
// Reconciler retries are background reconciliation with no live
// admin watching a progress bar, so opID/onProgress are empty —
// the adapter skips the progress subscription entirely.
reply, err := rc.adapter.UpgradeBackend(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0, "", nil)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
instReply, instErr := rc.adapter.installWithForceFallback(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0)
instReply, instErr := rc.adapter.installWithForceFallback(op.NodeID, op.Backend, string(op.Galleries), "", "", "", 0, "", nil)
if instErr != nil {
applyErr = instErr
} else if !instReply.Success {
@@ -293,6 +303,13 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// amount of further retrying will help.
const maxPendingBackendOpAttempts = 10
// stalePendingBackendOpGrace is how long a node may be offline before its
// pending backend ops are garbage-collected. Draining nodes are cleared
// immediately regardless of this window (see DeleteStalePendingBackendOps).
// ListDuePendingBackendOps never surfaces ops behind non-healthy nodes, so
// without this sweep they would leak forever and keep the UI op spinning.
const stalePendingBackendOpGrace = 15 * time.Minute
// probeLoadedModels gRPC-health-checks model addresses that the DB says are
// loaded. If a model's backend process is gone (OOM, crash, manual restart)
// we remove the row so ghosts don't linger. Only probes rows older than

View File

@@ -1776,6 +1776,38 @@ func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) erro
return nil
}
// DeleteStalePendingBackendOps garbage-collects pending backend ops whose target
// node can never drain them. ListDuePendingBackendOps only returns rows behind a
// StatusHealthy node, so ops behind a node that went offline or draining are
// otherwise never retried, aged out, or deleted — they leak forever and keep the
// UI operation spinning. Draining nodes are cleared immediately (an explicit
// admin action; their model rows are already purged). Offline nodes are cleared
// only once their last heartbeat is older than `grace`, so a brief heartbeat blip
// does not nuke an install that is still legitimately in flight. Returns the
// number of rows deleted.
func (r *NodeRegistry) DeleteStalePendingBackendOps(ctx context.Context, grace time.Duration) (int64, error) {
cutoff := time.Now().Add(-grace)
// Draining nodes are cleared immediately (admin action; model rows already
// purged). Offline AND unhealthy nodes are cleared only once their heartbeat
// is older than the grace window: a node marked unhealthy on a NATS
// ErrNoResponders never transitions to offline (health.go skips re-marking
// it), so without including unhealthy here its ops would leak exactly like
// the offline case. A node with a fresh heartbeat (last_heartbeat > cutoff)
// is recovering and keeps its op for retry.
res := r.db.WithContext(ctx).
Where(`node_id IN (SELECT id FROM backend_nodes WHERE status = ?)
OR node_id IN (SELECT id FROM backend_nodes WHERE status IN ? AND last_heartbeat <= ?)`,
StatusDraining, []string{StatusOffline, StatusUnhealthy}, cutoff).
Delete(&PendingBackendOp{})
if res.Error != nil {
return 0, fmt.Errorf("deleting stale pending backend ops: %w", res.Error)
}
if res.RowsAffected > 0 {
xlog.Info("Cleared pending backend ops behind non-healthy nodes", "deleted", res.RowsAffected)
}
return res.RowsAffected, nil
}
// RecordPendingBackendOpFailure bumps Attempts, captures the error, and
// pushes NextRetryAt out with exponential backoff capped at 15 minutes.
func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error {

View File

@@ -365,7 +365,7 @@ func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ strin
return f.installReply, f.installErr
}
func (f *fakeUnloader) UpgradeBackend(nodeID, backend, _, _, _, _ string, replica int) (*messaging.BackendUpgradeReply, error) {
func (f *fakeUnloader) UpgradeBackend(nodeID, backend, _, _, _, _ string, replica int, _ string, _ func(messaging.BackendInstallProgressEvent)) (*messaging.BackendUpgradeReply, error) {
f.mu.Lock()
f.upgradeCalls = append(f.upgradeCalls, upgradeCall{nodeID, backend, replica})
f.mu.Unlock()

View File

@@ -35,7 +35,7 @@ type backendStopRequest struct {
// backend.upgrade subject.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendUpgradeReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
StopBackend(nodeID, backend string) error
@@ -127,38 +127,8 @@ func (a *RemoteUnloaderAdapter) InstallBackend(
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "opID", opID)
// Subscribe to the per-op progress subject BEFORE publishing the install
// request so we don't miss early events. When onProgress is nil OR opID
// is empty (the reconciler-driven retry path), skip subscription entirely:
// silent installs cost nothing extra.
var sub messaging.Subscription
if onProgress != nil && opID != "" {
progressSubject := messaging.SubjectNodeBackendInstallProgress(nodeID, opID)
s, subErr := a.nats.Subscribe(progressSubject, func(raw []byte) {
var ev messaging.BackendInstallProgressEvent
if err := json.Unmarshal(raw, &ev); err != nil {
xlog.Debug("malformed install progress event", "subject", progressSubject, "error", err)
return
}
// Goroutine guard: a slow onProgress callback must not stall
// the NATS reader thread.
//
// NOTE: events spawn one goroutine each, so ordering at the
// consumer is best-effort. In practice the worker debounces to
// ~250ms which is far larger than goroutine scheduling jitter,
// so reordering is rare. The worker's final Flush() event is
// intended to win as the terminal tick. A future hardening pass
// could add a Seq uint64 field to BackendInstallProgressEvent
// and drop stale-by-seq at the bridge if reordering becomes a
// real UX issue.
go onProgress(ev)
})
if subErr != nil {
xlog.Warn("Failed to subscribe to install progress subject; proceeding without progress streaming",
"subject", progressSubject, "error", subErr)
} else {
sub = s
}
}
// request so we don't miss early events.
sub := a.subscribeProgress(nodeID, opID, onProgress)
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
@@ -182,18 +152,58 @@ func (a *RemoteUnloaderAdapter) InstallBackend(
return reply, err
}
// subscribeProgress subscribes to the per-op backend-install progress subject
// so the master can stream per-node download ticks while a worker installs or
// upgrades. Returns nil (and subscribes to nothing) when onProgress is nil or
// opID is empty — the reconciler-driven retry path and legacy callers stay
// silent at no cost. Shared by InstallBackend, UpgradeBackend, and the legacy
// force-install fallback: an upgrade is a force-reinstall, so it reuses the
// install-progress subject rather than minting a new one (no new NATS
// permission, no new rolling-update compat surface). Caller must Unsubscribe
// the returned subscription after the request completes.
func (a *RemoteUnloaderAdapter) subscribeProgress(nodeID, opID string, onProgress func(messaging.BackendInstallProgressEvent)) messaging.Subscription {
if onProgress == nil || opID == "" {
return nil
}
progressSubject := messaging.SubjectNodeBackendInstallProgress(nodeID, opID)
s, subErr := a.nats.Subscribe(progressSubject, func(raw []byte) {
var ev messaging.BackendInstallProgressEvent
if err := json.Unmarshal(raw, &ev); err != nil {
xlog.Debug("malformed backend progress event", "subject", progressSubject, "error", err)
return
}
// Goroutine guard: a slow onProgress callback must not stall the NATS
// reader thread. Events spawn one goroutine each, so ordering at the
// consumer is best-effort; the worker debounces to ~250ms which dwarfs
// goroutine scheduling jitter, and its final Flush() is the terminal tick.
go onProgress(ev)
})
if subErr != nil {
xlog.Warn("Failed to subscribe to backend progress subject; proceeding without progress streaming",
"subject", progressSubject, "error", subErr)
return nil
}
return s
}
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
// The worker stops every live process for this backend, force-reinstalls
// from the gallery (overwriting the on-disk artifact), and replies. The
// next routine InstallBackend call spawns a fresh process with the new
// binary - upgrade itself does not start a process.
//
// When opID is non-empty and onProgress is set, the master subscribes to the
// per-op progress subject before firing the request so a long force-reinstall
// streams per-node download ticks instead of blocking opaque at progress 0.
//
// Timeout: configured via DistributedConfig.BackendUpgradeTimeoutOrDefault
// (default 15m). Real-world worst case observed: 8-10 minutes for large
// CUDA-l4t backend images on Jetson over WiFi.
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendUpgradeReply, error) {
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex, "opID", opID)
sub := a.subscribeProgress(nodeID, opID, onProgress)
reply, err := messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
Backend: backendType,
@@ -202,7 +212,13 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
OpID: opID,
}, a.upgradeTimeout)
if sub != nil {
_ = sub.Unsubscribe()
}
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
@@ -216,10 +232,12 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO
// doesn't subscribe to the new subject). It re-fires the legacy
// backend.install with Force=true. Drop this once every worker is on
// 2026-05-08 or newer.
func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
sub := a.subscribeProgress(nodeID, opID, onProgress)
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
@@ -228,7 +246,13 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: true,
OpID: opID,
}, a.upgradeTimeout)
if sub != nil {
_ = sub.Unsubscribe()
}
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)

View File

@@ -282,7 +282,7 @@ var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
mc.scriptReply(messaging.SubjectNodeBackendUpgrade("n1"), messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0)
_, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))

View File

@@ -1,6 +1,7 @@
package nodes
import (
"sync"
"time"
. "github.com/onsi/ginkgo/v2"
@@ -18,7 +19,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(reply.Success).To(BeTrue())
})
@@ -27,7 +28,55 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
})
// Reproducer for "upgrade reports progress:0 the whole time" (Bug B). The
// install path streamed per-node download ticks; the upgrade path did a bare
// request→single-reply with no progress subscription, so a long force-reinstall
// blocked opaque. The adapter must subscribe to the per-op progress subject
// (reused from install) BEFORE the request and deliver each tick to onProgress.
It("streams per-node progress ticks during the upgrade", func() {
mc := newScriptedMessagingClient()
nodeID := "node-slow"
opID := "op-upgrade-1"
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
messaging.BackendUpgradeReply{Success: true})
// The worker would publish these while force-reinstalling. The harness
// replays them as soon as the adapter subscribes to the per-op subject.
mc.scheduleProgressPublish(nodeID, opID, []messaging.BackendInstallProgressEvent{
{NodeID: nodeID, FileName: "llama-cpp.tar", Current: "10 MB", Total: "100 MB", Percentage: 10},
{NodeID: nodeID, FileName: "llama-cpp.tar", Current: "100 MB", Total: "100 MB", Percentage: 100},
})
var mu sync.Mutex
var got []messaging.BackendInstallProgressEvent
onProgress := func(ev messaging.BackendInstallProgressEvent) {
mu.Lock()
got = append(got, ev)
mu.Unlock()
}
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0, opID, onProgress)
Expect(err).ToNot(HaveOccurred())
Expect(reply.Success).To(BeTrue())
// Confirm it subscribed to the (reused) install-progress subject for this op.
Expect(mc.subscribeCalls()).To(ContainElement(messaging.SubjectNodeBackendInstallProgress(nodeID, opID)))
// Progress events are delivered asynchronously (goroutine-per-event), so
// poll for both and assert on the set — ordering is best-effort by design.
Eventually(func() []float64 {
mu.Lock()
defer mu.Unlock()
pcts := make([]float64, 0, len(got))
for _, e := range got {
pcts = append(pcts, e.Percentage)
}
return pcts
}, 2*time.Second, 20*time.Millisecond).Should(ConsistOf(float64(10), float64(100)))
})
})

View File

@@ -186,17 +186,29 @@ func (s *backendSupervisor) upgradeBackend(req messaging.BackendUpgradeRequest)
}
}
// When the master tagged this upgrade with an OpID, stream gallery download
// progress back on the per-op subject (reused from install — an upgrade is a
// force-reinstall). Old masters omit OpID and stay on the silent path. The
// deferred Flush guarantees a terminal-percentage event even if the upgrade
// errors out, so the master's per-node bar never hangs mid-download.
var downloadCb func(file, current, total string, percentage float64)
if req.OpID != "" && s.nats != nil {
publisher := nodes.NewDebouncedInstallProgressPublisher(s.nats, s.nodeID, req.OpID, req.Backend, installProgressDebounce)
downloadCb = publisher.OnDownload
defer publisher.Flush()
}
if req.URI != "" {
xlog.Info("Upgrading backend from external URI", "backend", req.Backend, "uri", req.URI)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
); err != nil {
return fmt.Errorf("upgrading backend from external URI: %w", err)
}
} else {
xlog.Info("Upgrading backend from gallery", "backend", req.Backend)
if err := gallery.InstallBackendFromGallery(
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, true, /* force */
context.Background(), galleries, s.systemState, s.ml, req.Backend, downloadCb, true, /* force */
s.cfg.RequireBackendIntegrity,
); err != nil {
return fmt.Errorf("upgrading backend from gallery: %w", err)

View File

@@ -187,6 +187,21 @@ curl http://localhost:8080/v1/audio/transcriptions \
For real-time use, load a cache-aware streaming model (e.g. `realtime_eou_120m-v1-*.gguf`) and pass `-F stream=true`. Deltas are emitted as the audio is decoded, with end-of-utterance events closing each segment.
### Segment timestamps
Transcriptions are split into segments the same way NVIDIA NeMo does: a new segment starts after sentence-ending punctuation (`.`, `?`, `!`), and each segment carries `start`/`end` times. This is the default (NeMo's punctuation-only segmentation) and needs no configuration. While streaming, each end-of-utterance closes a segment, now with timestamps.
You can additionally split on silence by setting `segment_gap_threshold` (NeMo's `segment_gap_threshold`, in **encoder frames**; off by default). When set, a gap between two words wider than the threshold also starts a new segment. The value is in frames to match NeMo exactly; the backend converts it to seconds using the model's frame stride (`frame_sec`, reported by the engine):
```yaml
name: parakeet-110m
backend: parakeet-cpp
parameters:
model: tdt_ctc-110m-f16.gguf
options:
- segment_gap_threshold:12 # split on silence > 12 encoder frames (default 0 = off, punctuation-only)
```
### Dynamic batching
The backend can coalesce concurrent transcription requests into a single batched engine call, which improves throughput on GPU when many requests arrive at once. Batching is **off by default** (`batch_max_size:1`, one request at a time); raise it to opt in. Two `options:` knobs control it:

View File

@@ -133,9 +133,9 @@ When S3 is not configured, model files are transferred directly from the fronten
For high-throughput or very large model files, S3 can be more efficient since it avoids streaming through the frontend.
{{% alert icon="⚠️" color="warning" %}}
{{% notice warning %}}
The worker HTTP file transfer server is authenticated by `LOCALAI_REGISTRATION_TOKEN`. If the token is **empty**, the server **fails open** — anyone who can reach the port gets read/write access to the worker's models/staging/data directories (a remote model-poisoning / exfiltration vector). The worker logs a loud warning at startup in this case. Always set `LOCALAI_REGISTRATION_TOKEN` in distributed mode, and set `LOCALAI_DISTRIBUTED_REQUIRE_AUTH=true` (frontend **and** workers) to make a missing token *or* missing NATS credentials a hard startup error rather than a silent fail-open. Firewall the file-transfer port (gRPC base 1) so only the frontend can reach it.
{{% /alert %}}
{{% /notice %}}
### Watching Backend Installs

View File

@@ -1,4 +1,111 @@
---
- name: "gemma-4-26b-a4b-it-qat"
url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls:
- https://huggingface.co/unsloth/gemma-4-26B-A4B-it-qat-GGUF
description: |
Hugging Face |
GitHub |
Launch Blog |
Documentation
License: Apache 2.0 | Authors: Google DeepMind
> [!Note]
> This model card is for the new versions of the Gemma 4 family optimized with Quantization-Aware Training (QAT), which allows preserving similar quality to bfloat16 while dramatically reducing the memory requirements to load the model.
> Four versions of the QAT checkpoints are available:
> * **Unquantized QAT checkpoints** (Q4_0): Half-precision weights extracted from the QAT pipeline, ideal for custom downstream compilation and research. Available for Gemma 4 E2B, E4B, 12B, 26B A4B, and 31B, and their drafter models.
> * **GGUF** (Q4_0): Ready-to-deploy formats for broad ecosystem compatibility. Available for Gemma 4 E2B, E4B, 12B, 26B A4B, and 31B.
> * **Mobile-optimized** (wNa8o8): A custom schema engineered explicitly for mobile hardware efficiency. It features targeted 2-bit decoding layers, optimized KV caches, and static activations to maximize VRAM savings. Available for Gemma 4 E2B and E4B.
> * **Compressed Tensors** (w4a16): QAT checkpoints serialized in the compressed-tensors format for native, optimized inference with vLLM. Available for Gemma 4 E2B, E4B, 12B
...
license: "apache-2.0"
tags:
- llm
- gguf
- gemma
icon: https://ai.google.dev/gemma/images/gemma4_banner.png
overrides:
backend: llama-cpp
function:
automatic_tool_parsing_fallback: true
grammar:
disable: true
known_usecases:
- chat
mmproj: llama-cpp/mmproj/gemma-4-26B-A4B-it-qat-GGUF/mmproj-F32.gguf
options:
- use_jinja:true
parameters:
min_p: 0
model: llama-cpp/models/gemma-4-26B-A4B-it-qat-GGUF/gemma-4-26B-A4B-it-qat-UD-Q4_K_XL.gguf
repeat_penalty: 1
temperature: 1
top_k: 64
top_p: 0.95
template:
use_tokenizer_template: true
files:
- filename: llama-cpp/models/gemma-4-26B-A4B-it-qat-GGUF/gemma-4-26B-A4B-it-qat-UD-Q4_K_XL.gguf
sha256: dcf179a91153e3a7ece792e48ef872180d9d6ef9b7677f0a0bd3e83cfe624d5e
uri: https://huggingface.co/unsloth/gemma-4-26B-A4B-it-qat-GGUF/resolve/main/gemma-4-26B-A4B-it-qat-UD-Q4_K_XL.gguf
- filename: llama-cpp/mmproj/gemma-4-26B-A4B-it-qat-GGUF/mmproj-F32.gguf
sha256: ef269e294502d6ee3722cbf129681b2586c2e6ceb79d0507963c92146e058cd4
uri: https://huggingface.co/unsloth/gemma-4-26B-A4B-it-qat-GGUF/resolve/main/mmproj-F32.gguf
- name: "gemma-4-12b-it-qat-q4_0"
url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls:
- https://huggingface.co/google/gemma-4-12B-it-qat-q4_0-gguf
description: |
Hugging Face |
GitHub |
Launch Blog |
Documentation
License: Apache 2.0 | Authors: Google DeepMind
> [!Note]
> This model card is for the new versions of the Gemma 4 family optimized with Quantization-Aware Training (QAT), which allows preserving similar quality to bfloat16 while dramatically reducing the memory requirements to load the model.
> Four versions of the QAT checkpoints are available:
> * **Unquantized QAT checkpoints** (Q4_0): Half-precision weights extracted from the QAT pipeline, ideal for custom downstream compilation and research. Available for Gemma 4 E2B, E4B, 12B, 26B A4B, and 31B, and their drafter models.
> * **GGUF** (Q4_0): Ready-to-deploy formats for broad ecosystem compatibility. Available for Gemma 4 E2B, E4B, 12B, 26B A4B, and 31B.
> * **Mobile-optimized** (wNa8o8): A custom schema engineered explicitly for mobile hardware efficiency. It features targeted 2-bit decoding layers, optimized KV caches, and static activations to maximize VRAM savings. Available for Gemma 4 E2B and E4B.
> * **Compressed Tensors** (w4a16): QAT checkpoints serialized in the compressed-tensors format for native, optimized inference with vLLM. Available for Gemma 4 E2B, E4B, 12B
...
license: "apache-2.0"
tags:
- llm
- gguf
icon: https://ai.google.dev/gemma/images/gemma4_banner.png
overrides:
backend: llama-cpp
function:
automatic_tool_parsing_fallback: true
grammar:
disable: true
known_usecases:
- chat
mmproj: llama-cpp/mmproj/gemma-4-12B-it-qat-q4_0-gguf/mmproj-gemma-4-12b-it-qat-q4_0.gguf
options:
- use_jinja:true
parameters:
min_p: 0
model: llama-cpp/models/gemma-4-12B-it-qat-q4_0-gguf/gemma-4-12b-it-qat-q4_0.gguf
repeat_penalty: 1
temperature: 1
top_k: 64
top_p: 0.95
template:
use_tokenizer_template: true
files:
- filename: llama-cpp/models/gemma-4-12B-it-qat-q4_0-gguf/gemma-4-12b-it-qat-q4_0.gguf
sha256: faff1a63667fac17ac5e777f47114688fcefea96e220e211aaa8d62c2c4561f1
uri: https://huggingface.co/google/gemma-4-12B-it-qat-q4_0-gguf/resolve/main/gemma-4-12b-it-qat-q4_0.gguf
- filename: llama-cpp/mmproj/gemma-4-12B-it-qat-q4_0-gguf/mmproj-gemma-4-12b-it-qat-q4_0.gguf
sha256: e70b0e5cd80323d5d588b4ed06780356b7b1ba03995a4b8164c6ae9db0ff5989
uri: https://huggingface.co/google/gemma-4-12B-it-qat-q4_0-gguf/resolve/main/mmproj-gemma-4-12b-it-qat-q4_0.gguf
- name: "step-3.7-flash"
url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls:
@@ -26112,6 +26219,106 @@
- filename: ae.safetensors
sha256: afc8e28272cd15db3919bacdb6918ce9c1ed22e96cb12c4d5ed0fba823529e38
uri: https://huggingface.co/ChuckMcSneed/FLUX.1-dev/resolve/main/ae.safetensors
- name: ideogram-4-iq4nl-ggml
url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls:
- https://huggingface.co/ideogram-ai/ideogram-4-fp8
- https://huggingface.co/stduhpf/ideogram-4-gguf
description: |
Ideogram 4 is a text-to-image diffusion model known for state-of-the-art prompt adherence and exceptional, accurate text rendering inside images. It is driven by a Qwen3-VL-8B text encoder and performs real classifier-free guidance from a separate unconditional diffusion model.
This is the iQ4_NL (4-bit) quantization, a good balance of quality and footprint (~5.8GB diffusion + ~5.8GB unconditional). The bundle also pulls the Qwen3-VL-8B-Instruct text encoder and the FLUX.2 VAE. Quantized GGUF weights by stduhpf for use with stable-diffusion.cpp.
license: ideogram-non-commercial-model-agreement
tags:
- ideogram
- ideogram4
- text-to-image
- image-generation
- gguf
- quantized
- 8b
- diffusion
last_checked: "2026-06-06"
overrides:
backend: stablediffusion-ggml
step: 25
# Ideogram4 runs real classifier-free guidance from a separate
# unconditional diffusion model, so it needs a CFG scale > 1 (unlike the
# guidance-distilled Flux / Z-Image models). 7 matches the upstream
# stable-diffusion.cpp default used in the Ideogram4 example.
cfg_scale: 7
options:
- diffusion_model
- uncond_diffusion_model_path:ideogram4_unconditional-iQ4_NL.gguf
- llm_path:Qwen3-VL-8B-Instruct-Q4_K_M.gguf
- vae_path:flux2-vae.safetensors
- sampler:euler
- offload_params_to_cpu:true
parameters:
model: ideogram4-iQ4_NL.gguf
files:
- filename: ideogram4-iQ4_NL.gguf
sha256: 578502024f23e8e988e0cb297201f1ac88dddad5706726ad222d918727e0211d
uri: huggingface://stduhpf/ideogram-4-gguf/ideogram4-iQ4_NL.gguf
- filename: ideogram4_unconditional-iQ4_NL.gguf
sha256: 4140e58c6818dac8221fa590a6814246b5336bb23246fbbb96b9048e887f47cf
uri: huggingface://stduhpf/ideogram-4-gguf/ideogram4_unconditional-iQ4_NL.gguf
- filename: Qwen3-VL-8B-Instruct-Q4_K_M.gguf
sha256: 108e7ff92b78eefd3db4741885104acba514255c11b617d3c7b197a5f46efe89
uri: huggingface://unsloth/Qwen3-VL-8B-Instruct-GGUF/Qwen3-VL-8B-Instruct-Q4_K_M.gguf
- filename: flux2-vae.safetensors
sha256: 868fe7b343cc8f3a19dbcfcafbc3d5f888802be3f89bd81b65b3621a066ce8f3
uri: https://huggingface.co/Comfy-Org/Ideogram-4/resolve/main/vae/flux2-vae.safetensors
- name: ideogram-4-q8_0-ggml
url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls:
- https://huggingface.co/ideogram-ai/ideogram-4-fp8
- https://huggingface.co/stduhpf/ideogram-4-gguf
description: |
Ideogram 4 is a text-to-image diffusion model known for state-of-the-art prompt adherence and exceptional, accurate text rendering inside images. It is driven by a Qwen3-VL-8B text encoder and performs real classifier-free guidance from a separate unconditional diffusion model.
This is the Q8_0 (8-bit) quantization for highest quality (~10.1GB diffusion + ~10.1GB unconditional). The bundle also pulls the Qwen3-VL-8B-Instruct text encoder and the FLUX.2 VAE. Quantized GGUF weights by stduhpf for use with stable-diffusion.cpp.
license: ideogram-non-commercial-model-agreement
tags:
- ideogram
- ideogram4
- text-to-image
- image-generation
- gguf
- quantized
- 8b
- diffusion
last_checked: "2026-06-06"
overrides:
backend: stablediffusion-ggml
step: 25
# Ideogram4 runs real classifier-free guidance from a separate
# unconditional diffusion model, so it needs a CFG scale > 1 (unlike the
# guidance-distilled Flux / Z-Image models). 7 matches the upstream
# stable-diffusion.cpp default used in the Ideogram4 example.
cfg_scale: 7
options:
- diffusion_model
- uncond_diffusion_model_path:ideogram4_unconditional-Q8_0.gguf
- llm_path:Qwen3-VL-8B-Instruct-Q4_K_M.gguf
- vae_path:flux2-vae.safetensors
- sampler:euler
- offload_params_to_cpu:true
parameters:
model: ideogram4-Q8_0.gguf
files:
- filename: ideogram4-Q8_0.gguf
sha256: feb6cae997927ba0e339bf6ef64b14df9353064f60805d53f84c592643addcfd
uri: huggingface://stduhpf/ideogram-4-gguf/ideogram4-Q8_0.gguf
- filename: ideogram4_unconditional-Q8_0.gguf
sha256: 9261d1473d328aa7edbe1b3fa48a9b9bd2e19fe78439fe6a293af1016c63debd
uri: huggingface://stduhpf/ideogram-4-gguf/ideogram4_unconditional-Q8_0.gguf
- filename: Qwen3-VL-8B-Instruct-Q4_K_M.gguf
sha256: 108e7ff92b78eefd3db4741885104acba514255c11b617d3c7b197a5f46efe89
uri: huggingface://unsloth/Qwen3-VL-8B-Instruct-GGUF/Qwen3-VL-8B-Instruct-Q4_K_M.gguf
- filename: flux2-vae.safetensors
sha256: 868fe7b343cc8f3a19dbcfcafbc3d5f888802be3f89bd81b65b3621a066ce8f3
uri: https://huggingface.co/Comfy-Org/Ideogram-4/resolve/main/vae/flux2-vae.safetensors
- name: whisper-1
url: github:mudler/LocalAI/gallery/whisper-base.yaml@master
urls:
@@ -31887,6 +32094,41 @@
- filename: parakeet-cpp/tdt_ctc-1.1b-f16.gguf
uri: huggingface://mudler/parakeet-cpp-gguf/tdt_ctc-1.1b-f16.gguf
sha256: cd53f64eefac2623a12f2f118ef50b56622dc3012f42c815c6adf0d08292f387
- name: parakeet-cpp-nemotron-3.5-asr-streaming-0.6b
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:
- https://huggingface.co/mudler/parakeet-cpp-gguf
- https://huggingface.co/nvidia/nemotron-3.5-asr-streaming-0.6b
- https://github.com/mudler/parakeet.cpp
description: |
Multilingual (40+ locales), prompt-conditioned, cache-aware streaming FastConformer RNN-T, 0.6B.
Q8_0 GGUF for the parakeet-cpp backend (C++/ggml port of NVIDIA NeMo). Byte-identical to NeMo at
WER 0 offline and streaming, about 2.5x faster than NeMo on CPU with no GPU. Select a language with
the request "language" field (for example en, de, es, ja-JP), or leave it empty for automatic
detection. License OpenMDW-1.1.
license: other
tags:
- parakeet
- parakeet-cpp
- nemotron
- asr
- speech-recognition
- stt
- multilingual
- streaming
- gguf
- ggml
overrides:
backend: parakeet-cpp
known_usecases:
- transcript
name: parakeet-cpp-nemotron-3.5-asr-streaming-0.6b
parameters:
model: parakeet-cpp/nemotron-3.5-asr-streaming-0.6b-q8_0.gguf
files:
- filename: parakeet-cpp/nemotron-3.5-asr-streaming-0.6b-q8_0.gguf
uri: huggingface://mudler/parakeet-cpp-gguf/nemotron-3.5-asr-streaming-0.6b-q8_0.gguf
sha256: ba2f13eccd4a5245be728f77e6149bd6a4fdcdd133ff2e08ac6005bcef7a99f1
- name: parakeet-crispasr
url: github:mudler/LocalAI/gallery/virtual.yaml@master
urls:

4
go.mod
View File

@@ -219,8 +219,8 @@ require (
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87
github.com/mudler/localrecall v0.6.1-0.20260507074622-a7724fef6f81 // indirect
github.com/mudler/LocalAGI v0.0.0-20260606071251-14aed1ae4336
github.com/mudler/localrecall v0.6.3-0.20260606070048-9a3b3321a9cd // indirect
github.com/mudler/skillserver v0.0.7-0.20260520220837-a7317cbf9145
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/oxffaa/gopher-parse-sitemap v0.0.0-20191021113419-005d2eb1def4 // indirect

8
go.sum
View File

@@ -966,8 +966,8 @@ github.com/mr-tron/base58 v1.3.0 h1:K6Y13R2h+dku0wOqKtecgRnBUBPrZzLZy5aIj8lCcJI=
github.com/mr-tron/base58 v1.3.0/go.mod h1:2BuubE67DCSWwVfx37JWNG8emOC0sHEU4/HpcYgCLX8=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87 h1:az+2umaD/sT1rRvI3WZHWXjzdJVJHxcyxp0SNYbqlFk=
github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87/go.mod h1:x77p9W1zKZr+W+UcEwg8/qdp00p4XXOI69wE7WlXZc0=
github.com/mudler/LocalAGI v0.0.0-20260606071251-14aed1ae4336 h1:iKBkSnpisOvMVxFoYsAObvAuOqXBakRPMD0PWxWG5EE=
github.com/mudler/LocalAGI v0.0.0-20260606071251-14aed1ae4336/go.mod h1:U+g6u8mF2wQxhkdBl3dr8G4db1cv3n7KTKmraoJ7D0c=
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU=
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4=
github.com/mudler/edgevpn v0.34.0 h1:qDrD/rCPFY/FdURbXudIZWihVKY4VOX3nMn3CcbeQEU=
@@ -976,8 +976,8 @@ github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX
github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc/go.mod h1:O7SwdSWMilAWhBZMK9N9Y/oBDyMMzshE3ju8Xkexwig=
github.com/mudler/go-processmanager v0.1.1 h1:c/1NRZOZpW8HuFv9RhBG57nQu1oDMRomEHedwBFMlrw=
github.com/mudler/go-processmanager v0.1.1/go.mod h1:h6kmHUZeafr+k5hRYpGLMzJFH4hItHffgpRo2QIkP+o=
github.com/mudler/localrecall v0.6.1-0.20260507074622-a7724fef6f81 h1:8D9NJ/ikhsJCxUwbdzIzadw6RqDrW+L0FPqpQQSeux8=
github.com/mudler/localrecall v0.6.1-0.20260507074622-a7724fef6f81/go.mod h1:28k5n19raUrkuwXkacdNsBlj8yuSnGhpT16tu+2+4dU=
github.com/mudler/localrecall v0.6.3-0.20260606070048-9a3b3321a9cd h1:trn9D5UHAE6zdRyD2uX04W1tLSslAwozVwcyNTd72Ak=
github.com/mudler/localrecall v0.6.3-0.20260606070048-9a3b3321a9cd/go.mod h1:28k5n19raUrkuwXkacdNsBlj8yuSnGhpT16tu+2+4dU=
github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8 h1:Ry8RiWy8fZ6Ff4E7dPmjRsBrnHOnPeOOj2LhCgyjQu0=
github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8/go.mod h1:EA8Ashhd56o32qN7ouPKFSRUs/Z+LrRCF4v6R2Oarm8=
github.com/mudler/skillserver v0.0.7-0.20260520220837-a7317cbf9145 h1:z59tA3IDYPt71nzH1jpxeaA1LuDw8aZfpTQFNU43Zb8=

View File

@@ -15,7 +15,11 @@ func (defaultGGUFReader) ReadMetadata(ctx context.Context, uri string) (*GGUFMet
urlStr := u.ResolveURL()
if strings.HasPrefix(uri, downloader.LocalPrefix) {
f, err := gguf.ParseGGUFFile(urlStr)
// Only architecture scalars are read below, never the tokenizer vocab
// arrays, so skip them and memory-map the header to avoid a syscall
// storm on slow storage. Same rationale as the startup guessing path in
// core/config/hooks_llamacpp.go (https://github.com/mudler/LocalAI/issues/9790).
f, err := gguf.ParseGGUFFile(urlStr, gguf.UseMMap(), gguf.SkipLargeMetadata())
if err != nil {
return nil, err
}