Compare commits

..

25 Commits

Author SHA1 Message Date
Ettore Di Giacinto
969cb850b5 docs(distributed): document per-node breakdown in the operations bar
Adds a short subsection covering the expandable "N nodes" chevron in
the OperationsBar admin UI, the meaning of each status pill, and
how it relates to the /api/operations nodes array.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:52:12 +00:00
Ettore Di Giacinto
6240e16ad8 feat(ui): per-node breakdown in OperationsBar
When an install op fans out to more than one worker, the operations
bar now shows a "N nodes" chevron that expands into a per-node list.
Each row carries the node's status (color-coded pill), the current
file being downloaded, byte counts, percentage, and a thin per-node
progress bar. Yellow "Worker busy" pill marks running_on_worker
status with a tooltip explaining the NATS round-trip timed out but
the worker is still installing in the background.

Backward compatible: ops without a nodes field (legacy or single-node
mode) render as before. State for expand/collapse is local to the
component, keyed by jobID/id - reload starts collapsed.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:50:51 +00:00
Ettore Di Giacinto
780e720593 feat(operations): expose per-node breakdown on /api/operations
When an operation's OpStatus has Nodes entries (populated by the
Phase 4 progress sink wiring), surface them as a "nodes" array on the
/api/operations response, sorted by node_name for stable rendering.

Backward compatible: legacy clients ignore the field; ops without any
node entries (single-node mode, model installs) omit the array entirely
thanks to the empty-slice guard.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:47:11 +00:00
Ettore Di Giacinto
a72649b486 feat(distributed): write per-node OpStatus entries during install fan-out
DistributedBackendManager now accepts a nodeProgressSink and feeds it
two streams:

1. enqueueAndDrainBackendOp emits a per-node terminal entry on each
   status it appends to BackendOpResult (queued, success, error,
   running_on_worker). The opID is threaded through the function so
   the sink gets the right gallery op identity.

2. The install apply closure fans each BackendInstallProgressEvent
   into the sink as a downloading entry, alongside the legacy
   progressCb path so the aggregate single-bar view stays correct.

Production wiring passes the GalleryService (which implements
UpdateNodeProgress via Task 2) as the sink. Single-node tests pass
nil. DeleteBackend and UpgradeBackend pass an empty opID so the
sink path no-ops for ops that aren't gallery-tracked the same way
as Install.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:44:18 +00:00
Ettore Di Giacinto
f96df5eb85 feat(galleryop): UpdateNodeProgress merges per-node ticks by NodeID
GalleryService.UpdateNodeProgress(opID, nodeID, np) merges a NodeProgress
into OpStatus.Nodes (keyed by NodeID, no duplicates) and mirrors the
latest tick into the aggregate Progress / FileName /
DownloadedFileSize / TotalFileSize fields so the legacy single-bar
OperationsBar view keeps working unchanged alongside the new per-node
breakdown.

Concurrent-safe via the existing g.Mutex.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:35:11 +00:00
Ettore Di Giacinto
e14d9ae8e3 feat(galleryop): add NodeProgress + OpStatus.Nodes for per-node breakdown
Adds the data model the UI needs to render an expandable per-node
breakdown of a fanned-out backend install. NodeProgress carries node
identity (ID + name), per-node status (queued / running_on_worker /
success / error / downloading), the current file + bytes + percentage
from the Phase 2 progress stream, and any per-node error.

OpStatus.Nodes is the slice the /api/operations handler will surface
in a follow-up.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:32:01 +00:00
Ettore Di Giacinto
a97dc3bf57 docs(distributed): note progress-event ordering trade-off in InstallBackend
Document near the goroutine dispatch why ordering at the consumer is
best-effort, why it rarely matters in practice (worker debounce >>
goroutine jitter), and what a future hardening pass would look like
(Seq field + stale-by-seq drop). Stops the next reader from accidentally
"fixing" the goroutine pool away.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:15:08 +00:00
Ettore Di Giacinto
a560329430 docs(distributed): document install progress streaming
Note the new nodes.<nodeID>.backend.install.<opID>.progress subject and
the silent-worker compatibility behavior so operators know to expect
real-time progress and what happens on a mixed-version cluster.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:06:50 +00:00
Ettore Di Giacinto
07b2e4e703 test(distributed): InstallBackend tolerates silent (pre-Phase-2) workers
A worker on pre-Phase-2 code never publishes progress events. The new
master subscribes optimistically; this spec pins that a silent worker
still produces a green install with no progressCb ticks. The install
reply is the source of truth for terminal state; the progress stream
is a best-effort UX enrichment.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:06:08 +00:00
Ettore Di Giacinto
f03aacf7e7 feat(distributed): forward backend install progress into galleryop OpStatus
DistributedBackendManager.InstallBackend now passes the gallery op ID
and a progress bridge into the adapter call. Each
BackendInstallProgressEvent from the worker becomes a
galleryop.ProgressCallback tick - which the existing backendHandler
already turns into OpStatus.UpdateStatus, so the admin UI/SSE polling
sees per-byte progress for distributed installs without any UI-side
change.

UpgradeBackend is intentionally left silent for now: its wire request
(BackendUpgradeRequest) does not carry OpID, and rolling-update
fallback is the rarer path. Will be picked up in a follow-up if the
worker upgrade path also gets a progress channel.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 22:00:49 +00:00
Ettore Di Giacinto
e8e75aadb6 feat(distributed): RemoteUnloaderAdapter subscribes to install progress
InstallBackend gains opID + onProgress parameters. When both are set,
the adapter subscribes to nodes.<nodeID>.backend.install.<opID>.progress
BEFORE publishing the install request, decodes each message into the
caller's onProgress callback in a goroutine (so a slow callback never
stalls the NATS reader thread), and unsubscribes after RequestJSON
returns.

When onProgress is nil OR opID is empty (the reconciler retry path),
subscription is skipped entirely - silent installs cost nothing extra.

Subscribe failure is logged at Warn and the install proceeds without
progress streaming; the NATS round-trip still owns terminal status.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 21:46:33 +00:00
Ettore Di Giacinto
352ebe241d feat(distributed): worker publishes debounced install progress over NATS
When BackendInstallRequest.OpID is set, the worker's backend.install
handler wires a debounced publisher (250ms window) into the gallery
download callback. Each tick becomes a BackendInstallProgressEvent on
nodes.<nodeID>.backend.install.<opID>.progress; the publisher always
emits a final event on Flush so the UI sees the terminal percentage.

Old masters that do not set OpID continue to run silent installs: no
behavior change for them. Lock ordering: the publisher releases its
mutex before calling messaging.Publish so a slow network never stalls
the install loop.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 21:32:38 +00:00
Ettore Di Giacinto
c851768163 style(messaging): drop em-dash from BackendInstallProgress test comment
Per project convention (no em-dashes anywhere). Comment substance is
unchanged.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 21:22:43 +00:00
Ettore Di Giacinto
58f29496e5 feat(messaging): add BackendInstallProgressEvent wire type and subject
New NATS subject nodes.<nodeID>.backend.install.<opID>.progress lets the
worker publish transient progress events (file, current/total bytes,
percentage, phase) while a long-running install pulls its OCI image.
BackendInstallRequest gains an optional OpID field so the worker knows
which subject to publish on.

Transient pub/sub (not JetStream): the install reply remains ground
truth for success/failure; dropped progress events are tolerable.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 21:20:13 +00:00
Ettore Di Giacinto
871f5e0b7a feat(distributed): clear pending install rows when backend.list confirms
DistributedBackendManager.ListBackends now proactively clears
pending_backend_ops install rows whose (nodeID, backend) is reported
installed by backend.list. Operator UI updates immediately instead of
waiting up to installTimeout (default 15m) for the next reconciler
tick after NextRetryAt.

Only install rows are cleared; upgrade and delete intents are not
satisfied by presence in backend.list and continue to drain through
their normal reconciler paths.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 21:04:31 +00:00
Ettore Di Giacinto
148fb1c0d3 docs(distributed): document LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT / _UPGRADE_TIMEOUT
Add the two new operator-tunable env vars to the Frontend Configuration
table in the distributed-mode docs. Explains the 15m default, when to
raise it (slow links pulling multi-GB OCI images), and the new
"still installing in background" admin-UI state when the round-trip
times out but the worker is still working.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 20:58:14 +00:00
Ettore Di Giacinto
fbb20ed2b3 test(distributed): end-to-end install-timeout-then-reconcile
Wires Task 1-6 end-to-end so any seam mismatch surfaces in CI rather
than during a real cluster install. NATS times out, the queue row
stays alive with running_on_worker status, the worker eventually
reports the backend installed via backend.list, the manager surfaces
it via ListBackends.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 20:40:56 +00:00
Ettore Di Giacinto
17ea9f93f9 feat(galleryop): surface ErrWorkerStillInstalling as non-error OpStatus
When the distributed backend manager returns an error that wraps
ErrWorkerStillInstalling, backendHandler now completes the op with a
"still installing in background" message rather than marking it as a
red failure. Admin UI sees a yellow in-progress state; reconciler
confirms completion on its next pass.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 20:34:30 +00:00
Ettore Di Giacinto
4b66c3ad45 fix(distributed): don't increment Attempts on in-flight install timeout
An in-flight timeout (worker still pulling the OCI image) is not a
failed attempt, it's a delayed one. Incrementing Attempts let
genuinely-progressing slow installs (e.g. 30 GB CUDA images on Wi-Fi)
trip the reconciler's maxPendingBackendOpAttempts cap and dead-letter
the queue row while the worker was still legitimately working.

RecordPendingBackendOpInFlight now only updates LastError and NextRetryAt.
Also documents "running_on_worker" in the NodeOpStatus.Status enum
comment so Task 6 implementers see the full surface.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 20:32:59 +00:00
Ettore Di Giacinto
169ff75633 feat(distributed): treat NATS install timeout as in-progress, not failure
When a worker times out replying to backend.install but the install is
still running on the worker, enqueueAndDrainBackendOp now reports a
running_on_worker status and pushes NextRetryAt out by the install
timeout so the reconciler does not immediately re-fire another install
while the worker is still pulling the image. The pending_backend_ops
row stays in place for the next reconciler pass to confirm via
backend.list.

InstallBackend wraps the result in galleryop.ErrWorkerStillInstalling
so callers can branch (galleryop renders yellow in-progress instead of
red error). UpgradeBackend uses the same wrap.

Adds RemoteUnloaderAdapter.InstallTimeout() so the manager can push
NextRetryAt by the configured timeout without reaching into a private
field, and NodeRegistry.RecordPendingBackendOpInFlight as the soft
cousin of RecordPendingBackendOpFailure.

Also includes incidental gofmt-driven struct-field alignment in
registry.go on lines unrelated to the change (touched files are
re-formatted to canonical form per project policy).

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 20:25:13 +00:00
Ettore Di Giacinto
4f89882057 feat(distributed): introduce galleryop.ErrWorkerStillInstalling sentinel
When the NATS request-reply for backend.install (or .upgrade) times out
the worker is almost always still pulling the OCI image. Wrap the timeout
in a typed sentinel so the manager above can distinguish "worker hung"
from "worker still working" and leave the pending_backend_ops row in
place for the reconciler to confirm via backend.list.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 20:08:45 +00:00
Ettore Di Giacinto
f9b47c6eab feat(distributed): inject NATS install/upgrade timeouts into RemoteUnloaderAdapter
Removes the hardcoded 3m / 15m literals from RemoteUnloaderAdapter and
threads in DistributedConfig.BackendInstallTimeoutOrDefault() and
BackendUpgradeTimeoutOrDefault() at construction. Install now defaults
to 15m (was 3m); cold OCI image pulls on Jetson Wi-Fi routinely blew
past the old ceiling. Scripted messaging client captures the timeout
so tests can assert the configured value actually reaches the NATS
request.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 19:56:43 +00:00
Ettore Di Giacinto
4306b730ed feat(cli): surface LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT and _UPGRADE_TIMEOUT
Parses the two new env vars on the run CLI and threads them through the
existing AppOption builder so DistributedConfig picks them up. Invalid
duration strings now fail loudly at startup rather than silently falling
back to the default.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 19:46:04 +00:00
Ettore Di Giacinto
71d940f1e0 style(distributed): gofmt alignment after timeout fields
Re-aligns the Validate() negative-duration map and the Default* const
block so the new BackendInstall/UpgradeTimeout entries do not leave
the surrounding columns mis-padded.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 19:44:17 +00:00
Ettore Di Giacinto
0e2b84d8e3 feat(distributed): add configurable NATS backend install/upgrade timeouts
Adds BackendInstallTimeout and BackendUpgradeTimeout to DistributedConfig
with 15m defaults, following the existing MCPToolTimeout / WorkerWaitTimeout
pattern. These will replace the hardcoded literals in RemoteUnloaderAdapter
so admin-driven backend installs across the cluster survive long OCI image
pulls that previously timed out at 3m.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-22 19:39:54 +00:00
53 changed files with 2039 additions and 435 deletions

View File

@@ -16,8 +16,7 @@ side (`pkg/oci/cosignverify` plus the gallery YAML).
per-arch manifest before checking signatures.
- **Storage:** Signatures are written as OCI 1.1 referrers
(`--registry-referrers-mode=oci-1-1`) in the new Sigstore bundle format
(current cosign releases do this by default; no `--new-bundle-format`
flag). No `:sha256-<hex>.sig` tag clutter.
(`--new-bundle-format`). No `:sha256-<hex>.sig` tag clutter.
- **Consumer:** `pkg/oci/cosignverify` discovers the bundle via the
referrers API, hands it to `sigstore-go`, and verifies it against the
policy declared in the gallery YAML (`Gallery.Verification`).
@@ -34,14 +33,15 @@ to sign. The job needs:
- `permissions: { id-token: write, contents: read }` at the job level so
the runner can exchange its GitHub OIDC token for a Fulcio cert.
- `sigstore/cosign-installer@v3` step (current cosign releases already
default to the new bundle format).
- `sigstore/cosign-installer@v3` step (cosign ≥ 2.2 for
`--new-bundle-format`).
- After each `docker buildx imagetools create`, resolve the resulting
list digest with `docker buildx imagetools inspect <tag> --format
'{{.Manifest.Digest}}'` and sign:
```sh
cosign sign --yes --recursive \
--new-bundle-format \
--registry-referrers-mode=oci-1-1 \
"${REGISTRY_REPO}@${DIGEST}"
```

View File

@@ -66,8 +66,7 @@ jobs:
# cosign signs each pushed manifest list with --recursive so the
# index and every per-arch entry get an attached Sigstore bundle.
# Recent cosign releases always emit the new bundle format, so
# there's no extra CLI flag to opt into it.
# 2.2+ is required for --new-bundle-format.
- name: Install cosign
if: github.event_name != 'pull_request'
uses: sigstore/cosign-installer@v3
@@ -154,6 +153,7 @@ jobs:
# manifest before checking signatures need the per-arch
# signatures, not just the list-level one.
cosign sign --yes --recursive \
--new-bundle-format \
--registry-referrers-mode=oci-1-1 \
"quay.io/go-skynet/local-ai-backends@${digest}"
@@ -180,6 +180,7 @@ jobs:
' <<< "$DOCKER_METADATA_OUTPUT_JSON")
digest=$(docker buildx imagetools inspect "$first_tag" --format '{{.Manifest.Digest}}')
cosign sign --yes --recursive \
--new-bundle-format \
--registry-referrers-mode=oci-1-1 \
"localai/localai-backends@${digest}"

View File

@@ -1,5 +1,5 @@
IK_LLAMA_VERSION?=b3d39cff8bffbd67296d6badd4076a1486a0715c
IK_LLAMA_VERSION?=48a55f74e4c6e2aeda363dd386c1ac9170a0af71
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
CMAKE_ARGS?=

View File

@@ -36,11 +36,15 @@ fi
# flash-attn-4 4.0 stable lands.
EXTRA_PIP_INSTALL_FLAGS+=" --prerelease=allow"
# JetPack 7 / L4T arm64 sglang + torch wheels come straight from PyPI now
# (torch 2.11+ ships aarch64 + cu130 manylinux wheels and sglang 0.5.11+
# ships a cp312 aarch64 wheel pinned to that torch). They're cp312-only,
# so bump the venv Python accordingly.
# https://pytorch.org/blog/vllm-and-pytorch-work-together-to-improve-the-developer-experience-on-aarch64/
# JetPack 7 / L4T arm64 wheels are built for cp312 and shipped via
# pypi.jetson-ai-lab.io. Bump the venv Python so the prebuilt sglang
# wheel resolves cleanly. The actual install on l4t13 goes through
# pyproject.toml (see the elif branch below) so [tool.uv.sources] can
# pin only torch/torchvision/torchaudio/sglang to the jetson-ai-lab
# index — leaving PyPI as the path for transitive deps like
# markdown-it-py / anthropic / propcache that the L4T mirror's proxy
# 503s on. No --index-strategy flag here: the explicit index keeps the
# scoping clean.
if [ "x${BUILD_PROFILE}" == "xl4t13" ]; then
PYTHON_VERSION="3.12"
PYTHON_PATCH="12"
@@ -106,6 +110,27 @@ if [ "x${BUILD_TYPE}" == "x" ] || [ "x${FROM_SOURCE:-}" == "xtrue" ]; then
fi
uv pip install ${EXTRA_PIP_INSTALL_FLAGS:-} .
popd
# L4T arm64 (JetPack 7): drive the install through pyproject.toml so that
# [tool.uv.sources] can pin torch/torchvision/torchaudio/sglang to the
# jetson-ai-lab index, while everything else (transitive deps and
# PyPI-resolvable packages like transformers / accelerate) comes from
# PyPI. Bypasses installRequirements because uv pip install -r
# requirements.txt does not honor sources — see
# backend/python/sglang/pyproject.toml for the rationale. Mirrors the
# equivalent path in backend/python/vllm/install.sh.
elif [ "x${BUILD_PROFILE}" == "xl4t13" ]; then
ensureVenv
if [ "x${PORTABLE_PYTHON}" == "xtrue" ]; then
export C_INCLUDE_PATH="${C_INCLUDE_PATH:-}:$(_portable_dir)/include/python${PYTHON_VERSION}"
fi
pushd "${backend_dir}"
# Build deps first (matches installRequirements' requirements-install.txt
# pass — sglang/sgl-kernel sdists need packaging/setuptools-scm in the
# venv before they can build under --no-build-isolation).
uv pip install ${EXTRA_PIP_INSTALL_FLAGS:-} -r requirements-install.txt
uv pip install ${EXTRA_PIP_INSTALL_FLAGS:-} --requirement pyproject.toml
popd
runProtogen
else
installRequirements
fi

View File

@@ -0,0 +1,68 @@
# L4T arm64 (JetPack 7 / sbsa cu130) install spec for the sglang backend.
#
# Why this file exists, and why only the l4t13 BUILD_PROFILE consumes it:
#
# pypi.jetson-ai-lab.io hosts the L4T-specific torch / sglang / sgl-kernel
# wheels we need on aarch64 + cuda13, but it ALSO transparently proxies the
# rest of PyPI through `/+f/<sha>/<filename>` URLs that 503 frequently.
# With `--extra-index-url` + `--index-strategy=unsafe-best-match` (the
# historical fix in install.sh) uv would pick those proxy URLs for ordinary
# PyPI packages — markdown-it-py, anthropic, propcache, etc. — and trip on
# the 503s. See e.g. CI run 25439791228 (markdown-it-py-4.0.0).
#
# `explicit = true` on the index makes uv consult the L4T mirror ONLY for
# packages mapped under [tool.uv.sources]. Everything else goes to PyPI.
# This breaks the historical 503 path without losing access to the L4T
# wheels we actually need from there. Mirrors the equivalent fix already
# in backend/python/vllm/pyproject.toml.
#
# `uv pip install -r requirements.txt` does NOT honor [tool.uv.sources]
# (sources are project-mode only, not pip-compat mode), so install.sh's
# l4t13 branch invokes `uv pip install --requirement pyproject.toml`
# directly. Other BUILD_PROFILEs continue to use the requirements-*.txt
# pipeline through libbackend.sh's installRequirements and never read
# this file.
[project]
name = "localai-sglang-l4t13"
version = "0.0.0"
requires-python = ">=3.12,<3.13"
dependencies = [
# Mirror of requirements.txt — kept in sync manually for now since the
# l4t13 path bypasses installRequirements (see install.sh).
"grpcio==1.80.0",
"protobuf",
"certifi",
"setuptools",
"pillow",
# L4T-specific accelerator stack (sourced from jetson-ai-lab below).
"torch",
"torchvision",
"torchaudio",
# sglang on jetson — the [all] extra is deliberately omitted because it
# pulls outlines/decord, and decord has no aarch64 cp312 wheel anywhere
# (PyPI nor the jetson-ai-lab index ships only legacy cp35-cp37). With
# [all] uv backtracks through versions trying to satisfy decord and
# lands on sglang==0.1.16. The 0.5.0 floor matches the only major
# series the jetson-ai-lab sbsa/cu130 mirror currently publishes
# (sglang==0.5.1.post2 as of 2026-05-06). Bumping to >=0.5.11 here
# would make the build unsatisfiable until the mirror catches up.
# Gemma 4 / MTP recipes are therefore not supported on l4t13 — those
# features land on cublas12/cublas13 hosts that pull the newer wheel
# from PyPI. backend.py keeps backward compat with the 0.5.x SamplingParams
# field rename via runtime detection.
"sglang>=0.5.0",
# PyPI-resolvable packages that complete the runtime.
"accelerate",
"transformers",
]
[[tool.uv.index]]
name = "jetson-ai-lab"
url = "https://pypi.jetson-ai-lab.io/sbsa/cu130"
explicit = true
[tool.uv.sources]
torch = { index = "jetson-ai-lab" }
torchvision = { index = "jetson-ai-lab" }
torchaudio = { index = "jetson-ai-lab" }
sglang = { index = "jetson-ai-lab" }

View File

@@ -1,15 +0,0 @@
# sglang 0.5.11+ ships an aarch64 manylinux wheel on PyPI whose Requires-Dist
# pins torch==2.11.0 / torchaudio==2.11.0, locking an ABI-consistent set with
# the cu130 torch wheel installed above. 0.5.11 is the floor for Gemma 4
# support (sgl-project/sglang#21952).
#
# The [all] extra is deliberately NOT used on aarch64: it pulls the
# [diffusion] sub-extra which requires `xatlas`, and xatlas ships no
# aarch64 wheel and its sdist depends on scikit_build_core without
# declaring it in build-system.requires — so under --no-build-isolation
# uv can't build it. Upstream sglang gates st_attn and vsa on
# platform_machine != aarch64 in the diffusion extra but forgot xatlas.
# Plain `sglang` carries everything backend.py uses (Engine, ServerArgs,
# FunctionCallParser, ReasoningParser); the [all] extras are optional
# accelerators not required at import time.
sglang>=0.5.11

View File

@@ -1,9 +0,0 @@
# JetPack 7 / L4T arm64 + CUDA 13. Since PyTorch 2.11 (April 2026), PyPI ships
# aarch64 + cu130 manylinux wheels for torch/torchvision/torchaudio directly,
# so we no longer need a custom --extra-index-url for the L4T mirror.
# https://pytorch.org/blog/vllm-and-pytorch-work-together-to-improve-the-developer-experience-on-aarch64/
accelerate
torch
torchvision
torchaudio
transformers

View File

@@ -13,14 +13,14 @@ else
fi
# Handle l4t build profiles (Python 3.12, pip fallback) if needed.
# Since PyTorch 2.11 (April 2026) PyPI ships aarch64 + cu130 manylinux wheels
# directly for torch/torchvision/torchaudio and an aarch64 vllm wheel pinned
# to that torch, so the jetson-ai-lab mirror is no longer needed.
# https://pytorch.org/blog/vllm-and-pytorch-work-together-to-improve-the-developer-experience-on-aarch64/
# unsafe-best-match is required on l4t13 because the jetson-ai-lab index
# lists transitive deps at limited versions — without it uv pins to the
# first matching index and fails to resolve a compatible wheel from PyPI.
if [ "x${BUILD_PROFILE}" == "xl4t13" ]; then
PYTHON_VERSION="3.12"
PYTHON_PATCH="12"
PY_STANDALONE_TAG="20251120"
EXTRA_PIP_INSTALL_FLAGS="${EXTRA_PIP_INSTALL_FLAGS:-} --index-strategy=unsafe-best-match"
fi
if [ "x${BUILD_PROFILE}" == "xl4t12" ]; then
@@ -42,11 +42,18 @@ if [ "x${BUILD_TYPE}" == "xhipblas" ]; then
else
uv pip install vllm==0.14.0 --extra-index-url https://wheels.vllm.ai/rocm/0.14.0/rocm700
fi
elif [ "x${BUILD_PROFILE}" == "xcublas13" ] || [ "x${BUILD_PROFILE}" == "xl4t13" ]; then
# cublas13 (x86_64) and l4t13 (aarch64) both pull vllm from PyPI now:
# vllm 0.19+ defaults to cu130 wheels on x86_64 and vllm 0.20+ ships an
# aarch64 manylinux wheel pinned to torch==2.11.0. No extra index needed
# in either case.
elif [ "x${BUILD_PROFILE}" == "xl4t13" ]; then
# JetPack 7 / L4T arm64 cu130 — vllm comes from the prebuilt SBSA wheel
# at jetson-ai-lab. Version is unpinned: the index ships whatever build
# matches the cu130/cp312 ABI. unsafe-best-match lets uv fall through
# to PyPI for transitive deps not present on the jetson-ai-lab index.
if [ "x${USE_PIP}" == "xtrue" ]; then
pip install vllm --extra-index-url https://pypi.jetson-ai-lab.io/sbsa/cu130
else
uv pip install --index-strategy=unsafe-best-match vllm --extra-index-url https://pypi.jetson-ai-lab.io/sbsa/cu130
fi
elif [ "x${BUILD_PROFILE}" == "xcublas13" ]; then
# vllm 0.19+ defaults to cu130 wheels on PyPI, no extra index needed.
if [ "x${USE_PIP}" == "xtrue" ]; then
pip install vllm --torch-backend=auto
else

View File

@@ -1,15 +1,11 @@
# JetPack 7 / L4T arm64 + CUDA 13. PyPI ships aarch64 + cu130 manylinux wheels
# for torch/torchvision/torchaudio directly since PyTorch 2.11 (April 2026),
# so no custom index is needed. flash-attn is dropped here: PyPI has no
# aarch64 wheel for it, but vLLM 0.20+ bundles its own vllm_flash_attn
# (fa2 + fa3) inside the main wheel, so it is not required at runtime.
# https://pytorch.org/blog/vllm-and-pytorch-work-together-to-improve-the-developer-experience-on-aarch64/
--extra-index-url https://pypi.jetson-ai-lab.io/sbsa/cu130
accelerate
torch
torchvision
torchaudio
transformers
bitsandbytes
flash-attn
diffusers
librosa
soundfile

View File

@@ -43,11 +43,14 @@ if [ "x${BUILD_PROFILE}" == "xcublas13" ]; then
EXTRA_PIP_INSTALL_FLAGS+=" --index-strategy=unsafe-best-match"
fi
# JetPack 7 / L4T arm64 vllm + torch wheels come straight from PyPI now
# (torch 2.11+ ships aarch64 + cu130 manylinux wheels and vllm 0.20+ ships
# an aarch64 wheel pinned to that torch). They're cp312-only, so bump the
# venv Python accordingly. JetPack 6 keeps cp310 + USE_PIP=true.
# https://pytorch.org/blog/vllm-and-pytorch-work-together-to-improve-the-developer-experience-on-aarch64/
# JetPack 7 / L4T arm64 wheels (torch, vllm, flash-attn) live on
# pypi.jetson-ai-lab.io and are built for cp312, so bump the venv Python
# accordingly. JetPack 6 keeps cp310 + USE_PIP=true.
#
# l4t13 uses pyproject.toml (see the elif branch below) to pin only the
# L4T-specific wheels to the jetson-ai-lab index via [tool.uv.sources].
# That keeps PyPI as the resolution path for transitive deps like
# anthropic/openai/propcache, which the L4T mirror's proxy 503s on.
if [ "x${BUILD_PROFILE}" == "xl4t12" ]; then
USE_PIP=true
fi
@@ -100,6 +103,25 @@ if [ "x${BUILD_TYPE}" == "xintel" ]; then
export CMAKE_PREFIX_PATH="$(python -c 'import site; print(site.getsitepackages()[0])'):${CMAKE_PREFIX_PATH:-}"
VLLM_TARGET_DEVICE=xpu uv pip install ${EXTRA_PIP_INSTALL_FLAGS:-} --no-deps .
popd
# L4T arm64 (JetPack 7): drive the install through pyproject.toml so that
# [tool.uv.sources] can pin torch/vllm/flash-attn/torchvision/torchaudio
# to the jetson-ai-lab index, while everything else (transitive deps and
# PyPI-resolvable packages like transformers) comes from PyPI. Bypasses
# installRequirements because uv pip install -r requirements.txt does not
# honor sources — see backend/python/vllm/pyproject.toml for the rationale.
elif [ "x${BUILD_PROFILE}" == "xl4t13" ]; then
ensureVenv
if [ "x${PORTABLE_PYTHON}" == "xtrue" ]; then
export C_INCLUDE_PATH="${C_INCLUDE_PATH:-}:$(_portable_dir)/include/python${PYTHON_VERSION}"
fi
pushd "${backend_dir}"
# Build deps first (matches installRequirements' requirements-install.txt
# pass — fastsafetensors and friends need pybind11 in the venv before
# their sdists can build under --no-build-isolation).
uv pip install ${EXTRA_PIP_INSTALL_FLAGS:-} -r requirements-install.txt
uv pip install ${EXTRA_PIP_INSTALL_FLAGS:-} --requirement pyproject.toml
popd
runProtogen
# FROM_SOURCE=true on a CPU build skips the prebuilt vllm wheel in
# requirements-cpu-after.txt and compiles vllm locally against the host's
# actual CPU. Not used by default because it takes ~30-40 minutes, but

View File

@@ -0,0 +1,61 @@
# L4T arm64 (JetPack 7 / sbsa cu130) install spec for the vllm backend.
#
# Why this file exists, and why only the l4t13 BUILD_PROFILE consumes it:
#
# pypi.jetson-ai-lab.io hosts the L4T-specific torch / vllm / flash-attn
# wheels we need on aarch64 + cuda13, but it ALSO transparently proxies the
# rest of PyPI through `/+f/<sha>/<filename>` URLs that 503 frequently. With
# `--extra-index-url` + `--index-strategy=unsafe-best-match` (the historical
# fix in install.sh) uv would pick those proxy URLs for ordinary PyPI
# packages — `anthropic`, `openai`, `propcache`, `annotated-types` — and
# trip on the 503s. See e.g. CI run 25212201349 (anthropic-0.97.0).
#
# `explicit = true` on the index makes uv consult the L4T mirror ONLY for
# packages mapped under [tool.uv.sources]. Everything else goes to PyPI.
# This breaks the historical 503 path without losing access to the L4T
# wheels we actually need from there.
#
# `uv pip install -r requirements.txt` does NOT honor [tool.uv.sources]
# (sources are project-mode only, not pip-compat mode), so install.sh's
# l4t13 branch invokes `uv pip install --requirement pyproject.toml`
# directly. Other BUILD_PROFILEs continue to use the requirements-*.txt
# pipeline through libbackend.sh's installRequirements and never read
# this file.
[project]
name = "localai-vllm-l4t13"
version = "0.0.0"
requires-python = ">=3.12,<3.13"
dependencies = [
# Mirror of requirements.txt — kept in sync manually for now since the
# l4t13 path bypasses installRequirements (see install.sh).
"grpcio==1.80.0",
"protobuf",
"certifi",
"setuptools",
"pillow",
"charset-normalizer>=3.4.7",
"chardet",
# L4T-specific accelerator stack (sourced from jetson-ai-lab below).
"torch",
"torchvision",
"torchaudio",
"flash-attn",
"vllm",
# PyPI-resolvable packages that complete the runtime — accelerate,
# transformers, bitsandbytes carry their own wheels for aarch64.
"accelerate",
"transformers",
"bitsandbytes",
]
[[tool.uv.index]]
name = "jetson-ai-lab"
url = "https://pypi.jetson-ai-lab.io/sbsa/cu130"
explicit = true
[tool.uv.sources]
torch = { index = "jetson-ai-lab" }
torchvision = { index = "jetson-ai-lab" }
torchaudio = { index = "jetson-ai-lab" }
flash-attn = { index = "jetson-ai-lab" }
vllm = { index = "jetson-ai-lab" }

View File

@@ -1,4 +0,0 @@
# vLLM 0.20+ ships an aarch64 manylinux wheel on PyPI whose Requires-Dist pins
# torch==2.11.0 / torchvision==0.26.0 / torchaudio==2.11.0, locking an ABI-
# consistent set with the cu130 torch wheel installed above.
vllm

View File

@@ -1,8 +0,0 @@
# JetPack 7 / L4T arm64 + CUDA 13. Since PyTorch 2.11 (April 2026), PyPI ships
# aarch64 + cu130 manylinux wheels for torch/torchvision/torchaudio directly,
# so we no longer need a custom --extra-index-url for the L4T mirror.
# https://pytorch.org/blog/vllm-and-pytorch-work-together-to-improve-the-developer-experience-on-aarch64/
accelerate
torch
transformers
bitsandbytes

View File

@@ -233,7 +233,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
xlog.Info("File stager initialized (HTTP direct transfer)")
}
// Create RemoteUnloaderAdapter — needed by SmartRouter and startup.go
remoteUnloader := nodes.NewRemoteUnloaderAdapter(registry, natsClient)
remoteUnloader := nodes.NewRemoteUnloaderAdapter(
registry,
natsClient,
cfg.Distributed.BackendInstallTimeoutOrDefault(),
cfg.Distributed.BackendUpgradeTimeoutOrDefault(),
)
// All dependencies ready — build SmartRouter with all options at once
var conflictResolver nodes.ConcurrencyConflictResolver

View File

@@ -17,9 +17,9 @@ import (
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/vram"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/sanitize"
@@ -200,7 +200,7 @@ func New(opts ...config.AppOption) (*Application, error) {
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),
)
application.galleryService.SetBackendManager(
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry),
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry, application.galleryService),
)
}
}

View File

@@ -277,7 +277,7 @@ func gRPCPredictOpts(c config.ModelConfig, modelPath string) *pb.PredictOptions
MinP: float32(*c.MinP),
Tokens: int32(*c.Maxtokens),
Threads: int32(*c.Threads),
PromptCacheAll: *c.PromptCacheAll,
PromptCacheAll: c.PromptCacheAll,
PromptCacheRO: c.PromptCacheRO,
PromptCachePath: promptCachePath,
F16KV: *c.F16,

View File

@@ -39,19 +39,19 @@ type RunCMD struct {
LocalaiConfigDir string `env:"LOCALAI_CONFIG_DIR" type:"path" default:"${basepath}/configuration" help:"Directory for dynamic loading of certain configuration files (currently api_keys.json and external_backends.json)" group:"storage"`
LocalaiConfigDirPollInterval time.Duration `env:"LOCALAI_CONFIG_DIR_POLL_INTERVAL" help:"Typically the config path picks up changes automatically, but if your system has broken fsnotify events, set this to an interval to poll the LocalAI Config Dir (example: 1m)" group:"storage"`
// The alias on this option is there to preserve functionality with the old `--config-file` parameter
ModelsConfigFile string `env:"LOCALAI_MODELS_CONFIG_FILE,CONFIG_FILE" aliases:"config-file" help:"YAML file containing a list of model backend configs" group:"storage"`
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"backends" default:"${backends}"`
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of galleries" group:"models" default:"${galleries}"`
AutoloadGalleries bool `env:"LOCALAI_AUTOLOAD_GALLERIES,AUTOLOAD_GALLERIES" group:"models" default:"true"`
AutoloadBackendGalleries bool `env:"LOCALAI_AUTOLOAD_BACKEND_GALLERIES,AUTOLOAD_BACKEND_GALLERIES" group:"backends" default:"true"`
BackendImagesReleaseTag string `env:"LOCALAI_BACKEND_IMAGES_RELEASE_TAG,BACKEND_IMAGES_RELEASE_TAG" help:"Fallback release tag for backend images" group:"backends" default:"latest"`
BackendImagesBranchTag string `env:"LOCALAI_BACKEND_IMAGES_BRANCH_TAG,BACKEND_IMAGES_BRANCH_TAG" help:"Fallback branch tag for backend images" group:"backends" default:"master"`
BackendDevSuffix string `env:"LOCALAI_BACKEND_DEV_SUFFIX,BACKEND_DEV_SUFFIX" help:"Development suffix for backend images" group:"backends" default:"development"`
ModelsConfigFile string `env:"LOCALAI_MODELS_CONFIG_FILE,CONFIG_FILE" aliases:"config-file" help:"YAML file containing a list of model backend configs" group:"storage"`
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"backends" default:"${backends}"`
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of galleries" group:"models" default:"${galleries}"`
AutoloadGalleries bool `env:"LOCALAI_AUTOLOAD_GALLERIES,AUTOLOAD_GALLERIES" group:"models" default:"true"`
AutoloadBackendGalleries bool `env:"LOCALAI_AUTOLOAD_BACKEND_GALLERIES,AUTOLOAD_BACKEND_GALLERIES" group:"backends" default:"true"`
BackendImagesReleaseTag string `env:"LOCALAI_BACKEND_IMAGES_RELEASE_TAG,BACKEND_IMAGES_RELEASE_TAG" help:"Fallback release tag for backend images" group:"backends" default:"latest"`
BackendImagesBranchTag string `env:"LOCALAI_BACKEND_IMAGES_BRANCH_TAG,BACKEND_IMAGES_BRANCH_TAG" help:"Fallback branch tag for backend images" group:"backends" default:"master"`
BackendDevSuffix string `env:"LOCALAI_BACKEND_DEV_SUFFIX,BACKEND_DEV_SUFFIX" help:"Development suffix for backend images" group:"backends" default:"development"`
AutoUpgradeBackends bool `env:"LOCALAI_AUTO_UPGRADE_BACKENDS,AUTO_UPGRADE_BACKENDS" help:"Automatically upgrade backends when new versions are detected" group:"backends" default:"false"`
PreferDevelopmentBackends bool `env:"LOCALAI_PREFER_DEV_BACKENDS,PREFER_DEV_BACKENDS" help:"Prefer development backend versions (shows development backends by default in UI)" group:"backends" default:"false"`
PreloadModels string `env:"LOCALAI_PRELOAD_MODELS,PRELOAD_MODELS" help:"A List of models to apply in JSON at start" group:"models"`
Models []string `env:"LOCALAI_MODELS,MODELS" help:"A List of model configuration URLs to load" group:"models"`
PreloadModelsConfig string `env:"LOCALAI_PRELOAD_MODELS_CONFIG,PRELOAD_MODELS_CONFIG" help:"A List of models to apply at startup. Path to a YAML config file" group:"models"`
Models []string `env:"LOCALAI_MODELS,MODELS" help:"A List of model configuration URLs to load" group:"models"`
PreloadModelsConfig string `env:"LOCALAI_PRELOAD_MODELS_CONFIG,PRELOAD_MODELS_CONFIG" help:"A List of models to apply at startup. Path to a YAML config file" group:"models"`
F16 bool `name:"f16" env:"LOCALAI_F16,F16" help:"Enable GPU acceleration" group:"performance"`
Threads int `env:"LOCALAI_THREADS,THREADS" short:"t" help:"Number of threads used for parallel computation. Usage of the number of physical cores in the system is suggested" group:"performance"`
@@ -145,16 +145,18 @@ type RunCMD struct {
DefaultAPIKeyExpiry string `env:"LOCALAI_DEFAULT_API_KEY_EXPIRY" help:"Default expiry for API keys (e.g. 90d, 1y; empty = no expiry)" group:"auth"`
// Distributed / Horizontal Scaling
Distributed bool `env:"LOCALAI_DISTRIBUTED" default:"false" help:"Enable distributed mode (requires PostgreSQL + NATS)" group:"distributed"`
InstanceID string `env:"LOCALAI_INSTANCE_ID" help:"Unique instance ID for distributed mode (auto-generated UUID if empty)" group:"distributed"`
NatsURL string `env:"LOCALAI_NATS_URL" help:"NATS server URL (e.g., nats://localhost:4222)" group:"distributed"`
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3-compatible storage endpoint URL (e.g., http://minio:9000)" group:"distributed"`
StorageBucket string `env:"LOCALAI_STORAGE_BUCKET" default:"localai" help:"S3 bucket name for object storage" group:"distributed"`
StorageRegion string `env:"LOCALAI_STORAGE_REGION" default:"us-east-1" help:"S3 region" group:"distributed"`
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
Distributed bool `env:"LOCALAI_DISTRIBUTED" default:"false" help:"Enable distributed mode (requires PostgreSQL + NATS)" group:"distributed"`
InstanceID string `env:"LOCALAI_INSTANCE_ID" help:"Unique instance ID for distributed mode (auto-generated UUID if empty)" group:"distributed"`
NatsURL string `env:"LOCALAI_NATS_URL" help:"NATS server URL (e.g., nats://localhost:4222)" group:"distributed"`
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3-compatible storage endpoint URL (e.g., http://minio:9000)" group:"distributed"`
StorageBucket string `env:"LOCALAI_STORAGE_BUCKET" default:"localai" help:"S3 bucket name for object storage" group:"distributed"`
StorageRegion string `env:"LOCALAI_STORAGE_REGION" default:"us-east-1" help:"S3 region" group:"distributed"`
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
BackendInstallTimeout string `env:"LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT" help:"NATS round-trip timeout for backend.install requests sent to worker nodes (default 15m). Increase for slow links pulling multi-GB images." group:"distributed"`
BackendUpgradeTimeout string `env:"LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT" help:"NATS round-trip timeout for backend.upgrade requests (default 15m)." group:"distributed"`
Version bool
}
@@ -255,6 +257,20 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if r.StorageSecretKey != "" {
opts = append(opts, config.WithStorageSecretKey(r.StorageSecretKey))
}
if r.BackendInstallTimeout != "" {
d, err := time.ParseDuration(r.BackendInstallTimeout)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT %q: %w", r.BackendInstallTimeout, err)
}
opts = append(opts, config.WithBackendInstallTimeout(d))
}
if r.BackendUpgradeTimeout != "" {
d, err := time.ParseDuration(r.BackendUpgradeTimeout)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT %q: %w", r.BackendUpgradeTimeout, err)
}
opts = append(opts, config.WithBackendUpgradeTimeout(d))
}
if r.RegistrationToken != "" {
opts = append(opts, config.WithRegistrationToken(r.RegistrationToken))
}

View File

@@ -40,7 +40,10 @@ type DistributedConfig struct {
// model-row cleanup on MarkUnhealthy / MarkDraining).
DisablePerModelHealthCheck bool
MCPCIJobTimeout time.Duration // MCP CI job execution timeout (default 10m)
MCPCIJobTimeout time.Duration // MCP CI job execution timeout (default 10m)
BackendInstallTimeout time.Duration // NATS round-trip timeout for backend.install (default 15m)
BackendUpgradeTimeout time.Duration // NATS round-trip timeout for backend.upgrade (default 15m)
MaxUploadSize int64 // Maximum upload body size in bytes (default 50 GB)
@@ -68,13 +71,15 @@ func (c DistributedConfig) Validate() error {
}
// Check for negative durations
for name, d := range map[string]time.Duration{
"mcp-tool-timeout": c.MCPToolTimeout,
"mcp-discovery-timeout": c.MCPDiscoveryTimeout,
"worker-wait-timeout": c.WorkerWaitTimeout,
"drain-timeout": c.DrainTimeout,
"health-check-interval": c.HealthCheckInterval,
"stale-node-threshold": c.StaleNodeThreshold,
"mcp-ci-job-timeout": c.MCPCIJobTimeout,
"mcp-tool-timeout": c.MCPToolTimeout,
"mcp-discovery-timeout": c.MCPDiscoveryTimeout,
"worker-wait-timeout": c.WorkerWaitTimeout,
"drain-timeout": c.DrainTimeout,
"health-check-interval": c.HealthCheckInterval,
"stale-node-threshold": c.StaleNodeThreshold,
"mcp-ci-job-timeout": c.MCPCIJobTimeout,
"backend-install-timeout": c.BackendInstallTimeout,
"backend-upgrade-timeout": c.BackendUpgradeTimeout,
} {
if d < 0 {
return fmt.Errorf("%s must not be negative", name)
@@ -137,24 +142,48 @@ func WithStorageSecretKey(key string) AppOption {
}
}
func WithBackendInstallTimeout(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.BackendInstallTimeout = d
}
}
func WithBackendUpgradeTimeout(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.BackendUpgradeTimeout = d
}
}
var EnableAutoApproveNodes = func(o *ApplicationConfig) {
o.Distributed.AutoApproveNodes = true
}
// Defaults for distributed timeouts.
const (
DefaultMCPToolTimeout = 360 * time.Second
DefaultMCPDiscoveryTimeout = 60 * time.Second
DefaultWorkerWaitTimeout = 5 * time.Minute
DefaultDrainTimeout = 30 * time.Second
DefaultHealthCheckInterval = 15 * time.Second
DefaultStaleNodeThreshold = 60 * time.Second
DefaultMCPCIJobTimeout = 10 * time.Minute
DefaultMCPToolTimeout = 360 * time.Second
DefaultMCPDiscoveryTimeout = 60 * time.Second
DefaultWorkerWaitTimeout = 5 * time.Minute
DefaultDrainTimeout = 30 * time.Second
DefaultHealthCheckInterval = 15 * time.Second
DefaultStaleNodeThreshold = 60 * time.Second
DefaultMCPCIJobTimeout = 10 * time.Minute
DefaultBackendInstallTimeout = 15 * time.Minute
DefaultBackendUpgradeTimeout = 15 * time.Minute
)
// DefaultMaxUploadSize is the default maximum upload body size (50 GB).
const DefaultMaxUploadSize int64 = 50 << 30
// BackendInstallTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendInstallTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendInstallTimeout, DefaultBackendInstallTimeout)
}
// BackendUpgradeTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendUpgradeTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendUpgradeTimeout, DefaultBackendUpgradeTimeout)
}
// MCPToolTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) MCPToolTimeoutOrDefault() time.Duration {
return cmp.Or(c.MCPToolTimeout, DefaultMCPToolTimeout)

View File

@@ -0,0 +1,36 @@
package config_test
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
)
var _ = Describe("DistributedConfig backend NATS timeouts", func() {
Context("BackendInstallTimeoutOrDefault", func() {
It("returns 15 minutes when unset", func() {
c := config.DistributedConfig{}
Expect(c.BackendInstallTimeoutOrDefault()).To(Equal(15 * time.Minute))
})
It("returns the configured value when set", func() {
c := config.DistributedConfig{BackendInstallTimeout: 42 * time.Minute}
Expect(c.BackendInstallTimeoutOrDefault()).To(Equal(42 * time.Minute))
})
})
Context("BackendUpgradeTimeoutOrDefault", func() {
It("returns 15 minutes when unset", func() {
c := config.DistributedConfig{}
Expect(c.BackendUpgradeTimeoutOrDefault()).To(Equal(15 * time.Minute))
})
It("returns the configured value when set", func() {
c := config.DistributedConfig{BackendUpgradeTimeout: 30 * time.Minute}
Expect(c.BackendUpgradeTimeoutOrDefault()).To(Equal(30 * time.Minute))
})
})
})

View File

@@ -136,36 +136,4 @@ var _ = Describe("Backend hooks and parser defaults", func() {
Expect(cfg.EngineArgs["enable_chunked_prefill"]).To(Equal(true))
})
})
Context("PromptCacheAll default", func() {
It("defaults to true when omitted from YAML", func() {
cfg := &ModelConfig{}
cfg.SetDefaults()
Expect(cfg.PromptCacheAll).NotTo(BeNil())
Expect(*cfg.PromptCacheAll).To(BeTrue())
})
It("preserves an explicit false from YAML", func() {
falseV := false
cfg := &ModelConfig{
LLMConfig: LLMConfig{PromptCacheAll: &falseV},
}
cfg.SetDefaults()
Expect(cfg.PromptCacheAll).NotTo(BeNil())
Expect(*cfg.PromptCacheAll).To(BeFalse())
})
It("preserves an explicit true from YAML", func() {
trueV := true
cfg := &ModelConfig{
LLMConfig: LLMConfig{PromptCacheAll: &trueV},
}
cfg.SetDefaults()
Expect(cfg.PromptCacheAll).NotTo(BeNil())
Expect(*cfg.PromptCacheAll).To(BeTrue())
})
})
})

View File

@@ -209,7 +209,7 @@ type LLMConfig struct {
RMSNormEps float32 `yaml:"rms_norm_eps,omitempty" json:"rms_norm_eps,omitempty"`
NGQA int32 `yaml:"ngqa,omitempty" json:"ngqa,omitempty"`
PromptCachePath string `yaml:"prompt_cache_path,omitempty" json:"prompt_cache_path,omitempty"`
PromptCacheAll *bool `yaml:"prompt_cache_all,omitempty" json:"prompt_cache_all,omitempty"`
PromptCacheAll bool `yaml:"prompt_cache_all,omitempty" json:"prompt_cache_all,omitempty"`
PromptCacheRO bool `yaml:"prompt_cache_ro,omitempty" json:"prompt_cache_ro,omitempty"`
MirostatETA *float64 `yaml:"mirostat_eta,omitempty" json:"mirostat_eta,omitempty"`
MirostatTAU *float64 `yaml:"mirostat_tau,omitempty" json:"mirostat_tau,omitempty"`
@@ -494,13 +494,6 @@ func (cfg *ModelConfig) SetDefaults(opts ...ConfigLoaderOption) {
cfg.Reranking = &falseV
}
if cfg.PromptCacheAll == nil {
// Match upstream llama.cpp's default (common/common.h: cache_prompt = true)
// and let cache_idle_slots / kv_unified actually do useful work; users can
// opt out with an explicit `prompt_cache_all: false` in the model YAML.
cfg.PromptCacheAll = &trueV
}
if threads == 0 {
// Threads can't be 0
threads = 4

View File

@@ -649,6 +649,7 @@
align-items: center;
gap: var(--spacing-md);
padding: var(--spacing-xs) 0;
flex-wrap: wrap;
}
.operation-info {
@@ -739,6 +740,110 @@
color: var(--color-error);
}
/* Operations bar: per-node breakdown (multi-worker installs) */
.operation-expand {
background: none;
border: none;
color: var(--color-text-muted);
cursor: pointer;
padding: 0 var(--spacing-xs);
font-size: var(--text-xs);
display: inline-flex;
align-items: center;
gap: 0.25rem;
}
.operation-expand:hover {
color: var(--color-text-primary);
}
.operation-expand-label {
font-size: var(--text-xs);
}
.operation-nodes-list {
list-style: none;
margin: var(--spacing-xs) 0 0;
padding: var(--spacing-xs) 0 0;
border-top: 1px solid var(--color-border-subtle);
flex-basis: 100%;
width: 100%;
}
.operation-node {
display: flex;
align-items: center;
gap: var(--spacing-sm);
padding: var(--spacing-xs) 0;
font-size: var(--text-xs);
color: var(--color-text-muted);
flex-wrap: wrap;
}
.operation-node-status {
padding: 2px 6px;
border-radius: var(--radius-md);
font-size: 0.65rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.025em;
white-space: nowrap;
}
.operation-node-status-success {
background: var(--color-success-light);
color: var(--color-success);
}
.operation-node-status-error {
background: var(--color-error-light);
color: var(--color-error);
}
.operation-node-status-queued {
background: var(--color-bg-tertiary);
color: var(--color-text-muted);
}
.operation-node-status-running_on_worker {
background: var(--color-warning-light);
color: var(--color-warning);
}
.operation-node-status-downloading {
background: var(--color-primary-light);
color: var(--color-primary);
}
.operation-node-name {
font-weight: 500;
color: var(--color-text-secondary);
}
.operation-node-file {
font-family: var(--font-mono);
color: var(--color-text-tertiary);
overflow: hidden;
text-overflow: ellipsis;
max-width: 30ch;
white-space: nowrap;
}
.operation-node-bytes {
font-variant-numeric: tabular-nums;
color: var(--color-text-tertiary);
}
.operation-node-pct {
font-variant-numeric: tabular-nums;
color: var(--color-primary);
font-weight: 500;
}
.operation-node-error {
color: var(--color-error);
}
.operation-node-bar-container {
flex-basis: 100%;
height: 3px;
background: var(--color-surface-sunken);
border-radius: var(--radius-full);
overflow: hidden;
margin-top: 0.25rem;
}
.operation-node-bar {
height: 100%;
background: var(--color-primary);
border-radius: var(--radius-full);
transition: width var(--duration-slow, 0.3s) var(--ease-spring, ease);
}
/* Toast */
.toast-container {
position: fixed;

View File

@@ -1,14 +1,33 @@
import { useState } from 'react'
import { useOperations } from '../hooks/useOperations'
const nodeStatusLabels = {
success: 'Done',
error: 'Failed',
queued: 'Queued',
running_on_worker: 'Worker busy',
downloading: 'Downloading',
}
const runningOnWorkerTooltip = 'NATS round-trip timed out, but the worker is still installing in the background. The reconciler will confirm completion.'
export default function OperationsBar() {
const { operations, cancelOperation, dismissFailedOp } = useOperations()
const [expanded, setExpanded] = useState({})
if (operations.length === 0) return null
const toggle = (key) => setExpanded((m) => ({ ...m, [key]: !m[key] }))
return (
<div className="operations-bar">
{operations.map(op => (
<div key={op.jobID || op.id} className="operation-item">
{operations.map(op => {
const key = op.jobID || op.id
const nodes = Array.isArray(op.nodes) ? op.nodes : []
const canExpand = nodes.length > 1
const isOpen = !!expanded[key]
return (
<div key={key} className="operation-item">
<div className="operation-info">
{op.error ? (
<i className="fas fa-circle-exclamation" style={{ color: 'var(--color-error)', marginRight: 'var(--spacing-xs)' }} />
@@ -80,8 +99,55 @@ export default function OperationsBar() {
<i className="fas fa-xmark" />
</button>
) : null}
{canExpand && (
<button
type="button"
className="operation-expand"
onClick={() => toggle(key)}
aria-expanded={isOpen}
title={isOpen ? 'Hide per-node detail' : `Show ${nodes.length} nodes`}
>
<i className={`fas fa-chevron-${isOpen ? 'up' : 'down'}`} />
<span className="operation-expand-label">{nodes.length} nodes</span>
</button>
)}
{canExpand && isOpen && (
<ul className="operation-nodes-list">
{nodes.map((n) => (
<li key={n.node_id} className={`operation-node operation-node-${n.status}`}>
<span
className={`operation-node-status operation-node-status-${n.status}`}
title={n.status === 'running_on_worker' ? runningOnWorkerTooltip : undefined}
>
{nodeStatusLabels[n.status] || n.status}
</span>
<span className="operation-node-name">{n.node_name || n.node_id}</span>
{n.file_name && <span className="operation-node-file">{n.file_name}</span>}
{(n.current || n.total) && (
<span className="operation-node-bytes">
{n.current || '?'} / {n.total || '?'}
</span>
)}
{n.percentage > 0 && (
<span className="operation-node-pct">{Math.round(n.percentage)}%</span>
)}
{n.error && (
<span className="operation-node-error" title={n.error}>
{n.error.length > 80 ? n.error.slice(0, 80) + '...' : n.error}
</span>
)}
{n.percentage > 0 && n.percentage < 100 && (
<div className="operation-node-bar-container">
<div className="operation-node-bar" style={{ width: `${n.percentage}%` }} />
</div>
)}
</li>
))}
</ul>
)}
</div>
))}
)
})}
</div>
)
}

View File

@@ -1,10 +1,9 @@
import { useState, useEffect, useCallback, useRef, useMemo } from 'react'
import { useParams, useSearchParams, useOutletContext, Link, Navigate } from 'react-router-dom'
import { backendLogsApi, nodesApi } from '../utils/api'
import { useParams, useSearchParams, useOutletContext, Link } from 'react-router-dom'
import { backendLogsApi } from '../utils/api'
import { formatTimestamp } from '../utils/format'
import { apiUrl } from '../utils/basePath'
import LoadingSpinner from '../components/LoadingSpinner'
import { useDistributedMode } from '../hooks/useDistributedMode'
function wsUrl(path) {
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
@@ -275,158 +274,11 @@ function BackendLogsDetail({ modelId }) {
)
}
// DistributedBackendLogsResolver runs only in distributed mode. The local
// /api/backend-logs WebSocket has no backend behind it here (inference lives
// on workers), so we resolve modelId → hosting node(s) and forward to the
// per-node logs page. One hit redirects automatically; multiple hits render
// a picker so the operator can pick which worker's logs to inspect.
function DistributedBackendLogsResolver({ modelId, fromTimestamp }) {
const [hits, setHits] = useState(null) // [{ node, model }] once resolved
const [error, setError] = useState(null)
useEffect(() => {
let cancelled = false
;(async () => {
try {
const nodes = await nodesApi.list()
const nodeList = Array.isArray(nodes) ? nodes : []
// Fan out to each node and collect entries that match this model.
// Per-node failures are tolerated — a single offline worker shouldn't
// hide logs available on its peers.
const perNode = await Promise.all(nodeList.map(async (node) => {
try {
const models = await nodesApi.getModels(node.id)
const matches = (Array.isArray(models) ? models : []).filter(m => m.model_name === modelId)
return matches.map(m => ({ node, model: m }))
} catch {
return []
}
}))
if (cancelled) return
setHits(perNode.flat())
} catch (err) {
if (!cancelled) setError(err)
}
})()
return () => { cancelled = true }
}, [modelId])
if (error) {
return (
<div className="page page--wide">
<div className="empty-state">
<div className="empty-state-icon"><i className="fas fa-exclamation-triangle" /></div>
<h2 className="empty-state-title">Failed to resolve hosting nodes</h2>
<p className="empty-state-text">{error.message}</p>
</div>
</div>
)
}
if (hits === null) {
return (
<div style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
<LoadingSpinner size="lg" />
</div>
)
}
if (hits.length === 0) {
return (
<div className="page page--wide">
<div className="empty-state">
<div className="empty-state-icon"><i className="fas fa-terminal" /></div>
<h2 className="empty-state-title">Model not loaded on any worker</h2>
<p className="empty-state-text">
<span style={{ fontFamily: 'var(--font-mono)' }}>{modelId}</span> isn't currently loaded on any node in the cluster.
Check the <Link to="/app/nodes" style={{ color: 'var(--color-primary)' }}>Nodes page</Link> to see which models are running where.
</p>
</div>
</div>
)
}
// Bare model name aggregates this node's replicas via the worker's log
// store; preserve ?from= so the deep-link from a trace still scrolls to
// the right line on arrival.
const buildHref = (nodeId) => {
const base = `/app/node-backend-logs/${nodeId}/${encodeURIComponent(modelId)}`
return fromTimestamp ? `${base}?from=${encodeURIComponent(fromTimestamp)}` : base
}
if (hits.length === 1) {
return <Navigate to={buildHref(hits[0].node.id)} replace />
}
// Multiple workers host this model — let the operator pick.
return (
<div className="page page--wide">
<div className="page-header">
<div>
<h1 className="page-title" style={{ marginBottom: 0 }}>
<i className="fas fa-terminal" style={{ fontSize: '0.8em', marginRight: 'var(--spacing-sm)' }} />
{modelId}
</h1>
<p className="page-subtitle" style={{ marginTop: 'var(--spacing-xs)' }}>
Hosted on {hits.length} workers — pick one to view its logs.
</p>
</div>
</div>
<div style={{ display: 'flex', flexDirection: 'column', gap: 'var(--spacing-xs)' }}>
{hits.map(({ node, model }) => (
<Link
key={`${node.id}#${model.replica_index ?? 0}`}
to={buildHref(node.id)}
style={{
display: 'flex', alignItems: 'center', justifyContent: 'space-between',
padding: 'var(--spacing-sm) var(--spacing-md)',
background: 'var(--color-bg-primary)', border: '1px solid var(--color-border)',
borderRadius: 'var(--radius-md)', textDecoration: 'none', color: 'inherit',
}}
>
<div>
<div style={{ fontWeight: 500 }}>{node.name || node.id}</div>
<div style={{ fontSize: '0.75rem', color: 'var(--color-text-secondary)', fontFamily: 'var(--font-mono)' }}>
{node.id}{model.replica_index ? ` · replica ${model.replica_index}` : ''} · {model.state}
</div>
</div>
<i className="fas fa-chevron-right" style={{ color: 'var(--color-text-muted)' }} />
</Link>
))}
</div>
</div>
)
}
// BackendLogsRouter picks between the local WebSocket view (standalone) and
// the distributed resolver. The probe runs once via useDistributedMode so a
// 503 from /api/nodes (the canonical "distributed disabled" signal) keeps the
// existing standalone path intact.
function BackendLogsRouter({ modelId }) {
const [searchParams] = useSearchParams()
const fromTimestamp = searchParams.get('from')
const { enabled: distributedMode, loading } = useDistributedMode()
if (loading) {
return (
<div style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
<LoadingSpinner size="lg" />
</div>
)
}
if (distributedMode) {
return <DistributedBackendLogsResolver modelId={modelId} fromTimestamp={fromTimestamp} />
}
return <BackendLogsDetail modelId={modelId} />
}
export default function BackendLogs() {
const { modelId } = useParams()
if (modelId) {
return <BackendLogsRouter modelId={decodeURIComponent(modelId)} />
return <BackendLogsDetail modelId={decodeURIComponent(modelId)} />
}
// No model specified — redirect to System page

View File

@@ -660,7 +660,8 @@ export default function Manage() {
{ key: 'edit', icon: 'fa-pen-to-square', label: 'Edit configuration',
onClick: () => navigate(`/app/model-editor/${encodeURIComponent(model.id)}`) },
{ key: 'logs', icon: 'fa-terminal', label: 'Backend logs',
onClick: () => navigate(`/app/backend-logs/${encodeURIComponent(model.id)}`) },
onClick: () => navigate(`/app/backend-logs/${encodeURIComponent(model.id)}`),
hidden: distributedMode },
{ divider: true },
{ key: 'delete', icon: 'fa-trash', label: 'Delete model', danger: true,
onClick: () => handleDeleteModel(model.id) },

View File

@@ -220,10 +220,7 @@ function BackendTraceDetail({ trace }) {
</div>
)}
{/* Backend logs link — /app/backend-logs/:modelId is the unified entry
point: in standalone mode it streams local logs, in distributed mode
it resolves the model to the host worker(s) and either redirects to
/app/node-backend-logs/<nodeId>/<modelId> or shows a node picker. */}
{/* Backend logs link */}
{trace.model_name && (
<div style={{ marginBottom: 'var(--spacing-md)' }}>
<a

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"slices"
"sort"
"strconv"
"strings"
"time"
@@ -57,7 +58,6 @@ var usecaseFilters = map[string]config.ModelConfigUsecase{
config.UsecaseRealtimeAudio: config.FLAG_REALTIME_AUDIO,
}
// extractHFRepo tries to find a HuggingFace repo ID from model overrides or URLs.
func extractHFRepo(overrides map[string]any, urls []string) string {
if overrides != nil {
@@ -257,6 +257,44 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
if status != nil && status.Error != nil {
opData["error"] = status.Error.Error()
}
// Expose the per-node breakdown when the Phase 4 progress sink
// has populated OpStatus.Nodes (distributed backend installs).
// We sort by node_name for stable UI rendering across polls;
// the underlying slice is order-dependent on UpdateNodeProgress
// arrival order, which the UI must not depend on. Single-node
// ops and model installs leave Nodes empty so this block emits
// no key, preserving the legacy payload shape.
if status != nil && len(status.Nodes) > 0 {
nodes := make([]map[string]any, 0, len(status.Nodes))
for _, n := range status.Nodes {
entry := map[string]any{
"node_id": n.NodeID,
"node_name": n.NodeName,
"status": n.Status,
"percentage": n.Percentage,
}
if n.FileName != "" {
entry["file_name"] = n.FileName
}
if n.Current != "" {
entry["current"] = n.Current
}
if n.Total != "" {
entry["total"] = n.Total
}
if n.Phase != "" {
entry["phase"] = n.Phase
}
if n.Error != "" {
entry["error"] = n.Error
}
nodes = append(nodes, entry)
}
sort.SliceStable(nodes, func(i, j int) bool {
return fmt.Sprintf("%v", nodes[i]["node_name"]) < fmt.Sprintf("%v", nodes[j]["node_name"])
})
opData["nodes"] = nodes
}
operations = append(operations, opData)
}
@@ -557,11 +595,11 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
NodeStatus string `json:"node_status"`
}
type modelCapability struct {
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
// LoadedOn is populated only when the node registry is active
// (distributed mode). Lets the UI show "loaded on worker-1" without
// the operator having to expand every node manually. An empty slice
@@ -1159,17 +1197,17 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
}
return c.JSON(200, map[string]any{
"backends": backendsJSON,
"repositories": appConfig.BackendGalleries,
"allTags": tags,
"processingBackends": processingBackendsData,
"taskTypes": taskTypes,
"availableBackends": totalBackends,
"installedBackends": installedBackendsCount,
"currentPage": pageNum,
"totalPages": totalPages,
"prevPage": prevPage,
"nextPage": nextPage,
"backends": backendsJSON,
"repositories": appConfig.BackendGalleries,
"allTags": tags,
"processingBackends": processingBackendsData,
"taskTypes": taskTypes,
"availableBackends": totalBackends,
"installedBackends": installedBackendsCount,
"currentPage": pageNum,
"totalPages": totalPages,
"prevPage": prevPage,
"nextPage": nextPage,
"systemCapability": detectedCapability,
"preferDevelopmentBackends": appConfig.PreferDevelopmentBackends,
})
@@ -1599,4 +1637,3 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
app.DELETE("/api/branding/asset/:kind", localai.DeleteBrandingAssetEndpoint(appConfig), adminMiddleware)
}

View File

@@ -62,6 +62,63 @@ var _ = Describe("/api/operations with node-scoped backend ops", func() {
Expect(found["isBackend"]).To(Equal(true))
})
It("surfaces per-node OpStatus entries on /api/operations", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)
opcache := galleryop.NewOpCache(galleryService)
jobID := "test-op-nodes-1"
// Register a backend op so the handler treats this as a backend
// install (no need to consult the gallery during the test).
opcache.SetBackend("vllm", jobID)
// Populate per-node entries via the P4.2 helper. The helper also
// allocates an OpStatus under jobID, which the handler will read.
galleryService.UpdateNodeProgress(jobID, "node-b", galleryop.NodeProgress{
NodeID: "node-b", NodeName: "worker-b", Status: "running_on_worker",
})
galleryService.UpdateNodeProgress(jobID, "node-a", galleryop.NodeProgress{
NodeID: "node-a", NodeName: "worker-a", Status: "downloading", Percentage: 30, FileName: "vllm.tar",
})
e := echo.New()
routes.RegisterUIAPIRoutes(e, nil, nil, appCfg, galleryService, opcache, &application.Application{}, noopMw)
req := httptest.NewRequest(http.MethodGet, "/api/operations", nil)
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
Expect(rec.Code).To(Equal(http.StatusOK))
var envelope struct {
Operations []map[string]any `json:"operations"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &envelope)).To(Succeed())
var found map[string]any
for _, op := range envelope.Operations {
if op["jobID"] == jobID {
found = op
break
}
}
Expect(found).ToNot(BeNil(), "operation should appear in /api/operations")
nodes, ok := found["nodes"].([]any)
Expect(ok).To(BeTrue(), "operation should have a nodes array")
Expect(nodes).To(HaveLen(2))
// Stable sort by node_name: "worker-a" comes before "worker-b"
// even though UpdateNodeProgress was called in reverse order.
first := nodes[0].(map[string]any)
Expect(first["node_name"]).To(Equal("worker-a"))
Expect(first["status"]).To(Equal("downloading"))
Expect(first["file_name"]).To(Equal("vllm.tar"))
Expect(first["percentage"]).To(Equal(30.0))
second := nodes[1].(map[string]any)
Expect(second["node_name"]).To(Equal("worker-b"))
Expect(second["status"]).To(Equal("running_on_worker"))
})
It("does not emit nodeID for non-node-scoped backend ops", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)

View File

@@ -91,6 +91,21 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend,
})
return err
}
if errors.Is(err, ErrWorkerStillInstalling) {
// Soft failure: at least one worker timed out replying but is
// still running the install in the background. Mark the op as
// processed with a non-error message so the admin UI shows a
// yellow in-progress state rather than red. The reconciler's
// next pass will reconcile the actual outcome via backend.list.
xlog.Info("worker still installing in background", "backend", op.GalleryElementName, "error", err)
g.UpdateStatus(op.ID, &OpStatus{
Processed: true,
GalleryElementName: op.GalleryElementName,
Message: fmt.Sprintf("backend %s: worker still installing in background; reconciler will confirm completion (%v)", op.GalleryElementName, err),
Cancellable: false,
})
return nil
}
xlog.Error("error installing backend", "error", err, "backend", op.GalleryElementName)
if !op.Delete {
// If we didn't install the backend, we need to make sure we don't have a leftover directory

View File

@@ -0,0 +1,13 @@
package galleryop
import "errors"
// ErrWorkerStillInstalling indicates a distributed backend install
// timed out at the NATS round-trip layer but the worker is most likely
// still pulling the OCI image in the background. Producers
// (DistributedBackendManager) wrap this when the round-trip times out;
// consumers (backendHandler) use errors.Is(err, ErrWorkerStillInstalling)
// to surface a yellow "in progress" OpStatus instead of a red error,
// leaving the pending_backend_ops row in place for the reconciler to
// confirm via backend.list.
var ErrWorkerStillInstalling = errors.New("worker did not reply in time; install may still be running in the background")

View File

@@ -0,0 +1,93 @@
package galleryop_test
import (
"encoding/json"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
)
var _ = Describe("OpStatus.Nodes", func() {
It("defaults to empty on a fresh OpStatus", func() {
os := &galleryop.OpStatus{}
Expect(os.Nodes).To(BeEmpty())
})
It("JSON round-trips with all NodeProgress fields", func() {
os := &galleryop.OpStatus{
Nodes: []galleryop.NodeProgress{
{
NodeID: "node-1",
NodeName: "worker-a",
Status: "running_on_worker",
FileName: "vllm.tar.zst",
Current: "412 MB",
Total: "2.1 GB",
Percentage: 19.6,
Phase: "downloading",
Error: "",
},
},
}
raw, err := json.Marshal(os)
Expect(err).ToNot(HaveOccurred())
got := &galleryop.OpStatus{}
Expect(json.Unmarshal(raw, got)).To(Succeed())
Expect(got.Nodes).To(HaveLen(1))
Expect(got.Nodes[0]).To(Equal(os.Nodes[0]))
})
})
var _ = Describe("GalleryService.UpdateNodeProgress", func() {
var svc *galleryop.GalleryService
BeforeEach(func() {
// UpdateNodeProgress + GetStatus only touch the in-memory statuses
// map. A zero-value ApplicationConfig is enough to get past the
// LocalModelManager / LocalBackendManager constructors.
svc = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
})
It("creates a node entry on first call", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0,
})
st := svc.GetStatus("op1")
Expect(st).ToNot(BeNil())
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].NodeID).To(Equal("n1"))
Expect(st.Nodes[0].Percentage).To(Equal(12.0))
})
It("merges subsequent updates into the same NodeID entry, not appending", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 48.0, FileName: "vllm.tar"})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].Percentage).To(Equal(48.0))
Expect(st.Nodes[0].FileName).To(Equal("vllm.tar"))
})
It("appends a new entry for a different NodeID", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n2", galleryop.NodeProgress{NodeID: "n2", NodeName: "worker-b", Status: "queued"})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(2))
})
It("mirrors the latest tick into the aggregate OpStatus fields", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: "downloading",
Percentage: 33.0, FileName: "vllm.tar", Current: "330 MB", Total: "1 GB",
})
st := svc.GetStatus("op1")
Expect(st.Progress).To(Equal(33.0))
Expect(st.FileName).To(Equal("vllm.tar"))
Expect(st.DownloadedFileSize).To(Equal("330 MB"))
Expect(st.TotalFileSize).To(Equal("1 GB"))
})
})

View File

@@ -53,6 +53,30 @@ type OpStatus struct {
GalleryElementName string `json:"gallery_element_name"`
Cancelled bool `json:"cancelled"` // Cancelled is true if the operation was cancelled
Cancellable bool `json:"cancellable"` // Cancellable is true if the operation can be cancelled
// Nodes is the per-node breakdown for a fanned-out backend install.
// Populated by DistributedBackendManager (per-node terminal status)
// and by the Phase 2 progress bridge (per-byte ticks). The
// /api/operations handler surfaces this so the UI can render an
// expandable per-node view of an in-flight install.
Nodes []NodeProgress `json:"nodes,omitempty"`
}
// NodeProgress is a single node's contribution to a backend install
// operation. Populated by DistributedBackendManager (per-node terminal
// status) and by the Phase 2 progress bridge (per-byte ticks). Read by
// the /api/operations handler so the UI can render an expandable
// per-node breakdown.
type NodeProgress struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"` // "queued" | "running_on_worker" | "success" | "error" | "downloading"
FileName string `json:"file_name,omitempty"`
Current string `json:"current,omitempty"`
Total string `json:"total,omitempty"`
Percentage float64 `json:"percentage"`
Phase string `json:"phase,omitempty"`
Error string `json:"error,omitempty"`
}
type OpCache struct {

View File

@@ -135,6 +135,47 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
}
}
// UpdateNodeProgress merges a per-node progress tick into OpStatus.Nodes,
// keyed by nodeID, and mirrors the latest values into the aggregate
// Progress / FileName / DownloadedFileSize / TotalFileSize / Message
// fields so the legacy single-bar OperationsBar view keeps working
// unchanged alongside the new per-node breakdown.
//
// We deliberately do NOT delegate the aggregate mirror to UpdateStatus
// here: UpdateStatus overwrites the entire OpStatus, which would clobber
// the Nodes slice we just merged into. Doing the merge + mirror under a
// single lock keeps both views consistent and concurrent-safe.
func (g *GalleryService) UpdateNodeProgress(opID, nodeID string, np NodeProgress) {
g.Lock()
defer g.Unlock()
status := g.statuses[opID]
if status == nil {
status = &OpStatus{}
g.statuses[opID] = status
}
merged := false
for i := range status.Nodes {
if status.Nodes[i].NodeID == nodeID {
status.Nodes[i] = np
merged = true
break
}
}
if !merged {
status.Nodes = append(status.Nodes, np)
}
// Mirror the latest tick into the legacy aggregate fields so the
// existing single-bar UI keeps rendering meaningful progress.
status.FileName = np.FileName
status.Progress = np.Percentage
status.DownloadedFileSize = np.Current
status.TotalFileSize = np.Total
if np.Phase != "" {
status.Message = np.Phase
}
}
func (g *GalleryService) GetStatus(s string) *OpStatus {
g.Lock()
defer g.Unlock()

View File

@@ -0,0 +1,23 @@
package messaging
// BackendInstallProgressEvent is the wire payload published by a worker to
// nodes.<nodeID>.backend.install.<opID>.progress while a long-running install
// is in flight. Transient: dropped events are acceptable, the master relies
// on BackendInstallReply for ground truth on success/failure.
type BackendInstallProgressEvent struct {
OpID string `json:"op_id"`
NodeID string `json:"node_id"`
Backend string `json:"backend"`
FileName string `json:"file_name,omitempty"`
Current string `json:"current,omitempty"` // human-readable size, e.g. "412 MB"
Total string `json:"total,omitempty"` // human-readable size, e.g. "2.1 GB"
Percentage float64 `json:"percentage"`
Phase string `json:"phase,omitempty"` // "resolving" | "downloading" | "extracting" | "starting"
}
// SubjectNodeBackendInstallProgress returns the NATS subject for transient
// progress events emitted by a worker during a single backend.install run.
// Per-op so multiple concurrent installs on the same node never alias.
func SubjectNodeBackendInstallProgress(nodeID, opID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.install." + sanitizeSubjectToken(opID) + ".progress"
}

View File

@@ -0,0 +1,51 @@
package messaging_test
import (
"encoding/json"
"strings"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("BackendInstallProgress", func() {
Context("SubjectNodeBackendInstallProgress", func() {
It("composes the per-op progress subject", func() {
Expect(messaging.SubjectNodeBackendInstallProgress("node-abc", "op-123")).
To(Equal("nodes.node-abc.backend.install.op-123.progress"))
})
It("sanitizes NATS-reserved characters in node and op tokens", func() {
// '.' is the NATS hierarchy delimiter, '*' and '>' are wildcards,
// and whitespace must be stripped - sanitizeSubjectToken replaces
// all of them with '-'. The resulting subject must still parse as
// exactly six hierarchy segments: nodes/<node>/backend/install/<op>/progress.
subj := messaging.SubjectNodeBackendInstallProgress("a.b c", "x.y z")
Expect(subj).ToNot(ContainSubstring(" "))
Expect(strings.Count(subj, ".")).To(Equal(5))
})
})
Context("BackendInstallProgressEvent", func() {
It("JSON round-trips with all known fields", func() {
ev := messaging.BackendInstallProgressEvent{
OpID: "op-123",
NodeID: "node-abc",
Backend: "vllm",
FileName: "vllm-cpu.tar.zst",
Current: "412 MB",
Total: "2.1 GB",
Percentage: 19.6,
Phase: "downloading",
}
raw, err := json.Marshal(ev)
Expect(err).ToNot(HaveOccurred())
var got messaging.BackendInstallProgressEvent
Expect(json.Unmarshal(raw, &got)).To(Succeed())
Expect(got).To(Equal(ev))
})
})
})

View File

@@ -144,6 +144,12 @@ type BackendInstallRequest struct {
// worker still works (the master's install fallback path also uses this
// when backend.upgrade returns nats.ErrNoResponders).
Force bool `json:"force,omitempty"`
// OpID identifies the admin-side operation. When non-empty the worker
// publishes BackendInstallProgressEvent values to
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the install is
// running, debounced to roughly 250ms. Empty means the caller is a
// reconciler-driven retry that does not need progress streamed.
OpID string `json:"op_id,omitempty"`
}
// BackendInstallReply is the response from a backend.install NATS request.

View File

@@ -0,0 +1,120 @@
package nodes
import (
"sync"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
)
// DebouncedInstallProgressPublisher buffers backend-install download ticks
// and publishes them to the per-op NATS progress subject at most once per
// `interval`. Always publishes the final event on Flush so the UI sees the
// terminal percentage.
//
// Behavior: leading-edge debounce. The first OnDownload after a quiet window
// publishes immediately; subsequent ticks within `interval` only buffer the
// latest event, which is then emitted via a single trailing timer. This
// keeps the wire chatter bounded (~4 events per second at 250ms) while
// still surfacing every meaningful percentage jump.
//
// Lock ordering: never hold p.mu across a Publish call. Publish hits the
// NATS client which may block on a slow link, and we don't want a stalled
// network to stall the underlying gallery download loop.
type DebouncedInstallProgressPublisher struct {
mu sync.Mutex
client messaging.MessagingClient
subject string
nodeID string
opID string
backend string
interval time.Duration
lastPublishedAt time.Time
pending *messaging.BackendInstallProgressEvent
timer *time.Timer
}
// NewDebouncedInstallProgressPublisher constructs a publisher for one
// install operation. interval is the leading-edge debounce window
// (~250ms in production).
func NewDebouncedInstallProgressPublisher(client messaging.MessagingClient, nodeID, opID, backend string, interval time.Duration) *DebouncedInstallProgressPublisher {
return &DebouncedInstallProgressPublisher{
client: client,
subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID),
nodeID: nodeID,
opID: opID,
backend: backend,
interval: interval,
}
}
// OnDownload is the callback shape gallery.InstallBackendFromGallery and
// galleryop.InstallExternalBackend pass into the worker. Each invocation
// represents a single tick from the underlying io.Reader copy loop.
func (p *DebouncedInstallProgressPublisher) OnDownload(file, current, total string, percentage float64) {
ev := messaging.BackendInstallProgressEvent{
OpID: p.opID,
NodeID: p.nodeID,
Backend: p.backend,
FileName: file,
Current: current,
Total: total,
Percentage: percentage,
Phase: "downloading",
}
p.mu.Lock()
now := time.Now()
if p.lastPublishedAt.IsZero() || now.Sub(p.lastPublishedAt) >= p.interval {
// Leading edge: publish immediately.
p.lastPublishedAt = now
p.pending = nil
p.mu.Unlock()
_ = p.client.Publish(p.subject, ev)
return
}
// Within the window: buffer the latest event and arm a trailing
// publish. If a timer is already armed, we just overwrite p.pending so
// the trailing publish carries the freshest data.
p.pending = &ev
if p.timer == nil {
delay := p.interval - now.Sub(p.lastPublishedAt)
p.timer = time.AfterFunc(delay, p.flushPending)
}
p.mu.Unlock()
}
// flushPending is the trailing-edge publisher fired by the AfterFunc timer.
// It clears the pending slot under the lock, then publishes outside the
// lock so Publish never blocks an in-progress OnDownload call.
func (p *DebouncedInstallProgressPublisher) flushPending() {
p.mu.Lock()
p.timer = nil
pending := p.pending
p.pending = nil
if pending != nil {
p.lastPublishedAt = time.Now()
}
p.mu.Unlock()
if pending != nil {
_ = p.client.Publish(p.subject, *pending)
}
}
// Flush publishes any pending buffered event synchronously and stops the
// pending timer. Safe to call multiple times. Callers MUST defer Flush
// after constructing the publisher so the terminal percentage reaches the
// master even on error returns.
func (p *DebouncedInstallProgressPublisher) Flush() {
p.mu.Lock()
if p.timer != nil {
p.timer.Stop()
p.timer = nil
}
pending := p.pending
p.pending = nil
p.mu.Unlock()
if pending != nil {
_ = p.client.Publish(p.subject, *pending)
}
}

View File

@@ -0,0 +1,48 @@
package nodes
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("DebouncedInstallProgressPublisher", func() {
It("publishes the first event immediately and debounces subsequent ones within the window", func() {
mc := newScriptedMessagingClient()
pub := NewDebouncedInstallProgressPublisher(mc, "n1", "op1", "vllm", 50*time.Millisecond)
// Three rapid-fire ticks within the debounce window.
pub.OnDownload("vllm.tar.zst", "100 MB", "1 GB", 10.0)
pub.OnDownload("vllm.tar.zst", "200 MB", "1 GB", 20.0)
pub.OnDownload("vllm.tar.zst", "300 MB", "1 GB", 30.0)
pub.Flush()
// First event publishes immediately; the others coalesce; Flush guarantees a final.
// So we expect at least 2 publishes and at most 4 (lead + final + any window-bounded).
Eventually(func() int {
return len(mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1")))
}, "1s").Should(BeNumerically(">=", 2))
calls := mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1"))
Expect(len(calls)).To(BeNumerically("<=", 4),
"three ticks within the debounce window should produce at most ~4 publishes")
})
It("publishes the final event after Flush with the latest percentage", func() {
mc := newScriptedMessagingClient()
pub := NewDebouncedInstallProgressPublisher(mc, "n1", "op1", "vllm", 50*time.Millisecond)
pub.OnDownload("vllm.tar.zst", "1 GB", "1 GB", 100.0)
pub.Flush()
Eventually(func() float64 {
calls := mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1"))
if len(calls) == 0 {
return -1
}
return calls[len(calls)-1].Percentage
}, "1s").Should(Equal(100.0))
})
})

View File

@@ -10,6 +10,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
@@ -48,6 +49,13 @@ func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryo
return d.local.InstallModel(ctx, op, progressCb)
}
// nodeProgressSink is the narrow interface DistributedBackendManager uses to
// publish per-node progress without dragging in the full *GalleryService.
// nil means "no sink, skip per-node writes" (used by single-node tests).
type nodeProgressSink interface {
UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress)
}
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
// for backend deletion so worker nodes clean up stale files.
type DistributedBackendManager struct {
@@ -56,16 +64,20 @@ type DistributedBackendManager struct {
registry *NodeRegistry
backendGalleries []config.Gallery
systemState *system.SystemState
progressSink nodeProgressSink
}
// NewDistributedBackendManager creates a DistributedBackendManager.
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
// progressSink may be nil to disable per-node OpStatus writes (single-node
// tests don't need it).
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry, progressSink nodeProgressSink) *DistributedBackendManager {
return &DistributedBackendManager{
local: galleryop.NewLocalBackendManager(appConfig, ml),
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
progressSink: progressSink,
}
}
@@ -75,7 +87,7 @@ func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model
type NodeOpStatus struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"` // "success" | "queued" | "error"
Status string `json:"status"` // "success" | "queued" | "error" | "running_on_worker"
Error string `json:"error,omitempty"`
}
@@ -116,25 +128,48 @@ func (r BackendOpResult) Err() error {
// when the node returns.
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
// never had the backend installed to "upgrade" it such requests fail at the
// never had the backend installed to "upgrade" it - such requests fail at the
// gallery (no platform variant) and would otherwise leave a forever-retrying
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
//
// opID is the gallery operation identifier; when non-empty and progressSink is
// set, every per-node terminal status appended to BackendOpResult is also
// mirrored into the sink so the UI's per-node OpStatus.Nodes view stays in
// lockstep with the manager's view. opID may be empty for ops that aren't
// gallery-tracked (e.g. DeleteBackend's plain code path).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, opID, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
// emitNodeProgress is a small helper that funnels every NodeOpStatus we
// append to result.Nodes into the per-node OpStatus sink (when configured
// and opID is known). Keeping it inline avoids drift between the
// BackendOpResult view and the sink view - they're written from the same
// code path on the same terminal statuses.
emitNodeProgress := func(node BackendNode, status, errMsg string) {
if d.progressSink == nil || opID == "" {
return
}
d.progressSink.UpdateNodeProgress(opID, node.ID, galleryop.NodeProgress{
NodeID: node.ID,
NodeName: node.Name,
Status: status,
Error: errMsg,
})
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet no intent to apply.
// Pending nodes haven't been approved yet - no intent to apply.
if node.Status == StatusPending {
continue
}
// Backend lifecycle ops only make sense on backend-type workers.
// Agent workers don't subscribe to backend.install/delete/list, so
// enqueueing for them guarantees a forever-retrying row that the
// reconciler can never drain. Silently skip they aren't consumers.
// reconciler can never drain. Silently skip - they aren't consumers.
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
@@ -143,19 +178,23 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
errMsg := fmt.Sprintf("enqueue failed: %v", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
Error: errMsg,
})
emitNodeProgress(node, "error", errMsg)
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
errMsg := fmt.Sprintf("node %s, will retry when healthy", node.Status)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
Error: errMsg,
})
emitNodeProgress(node, "queued", errMsg)
continue
}
@@ -169,12 +208,31 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "success",
})
emitNodeProgress(node, "success", "")
continue
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL mark unhealthy so the router stops picking it too.
// gone AWOL - mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
// Worker-still-installing is a "soft" failure: the worker is most
// likely still pulling the OCI image. Keep the row, push NextRetryAt
// out so the reconciler does not immediately re-fire another install
// while the worker is still busy, and report the in-progress state
// to the caller. The next reconciler pass / backend.list confirms
// the actual outcome.
if errors.Is(applyErr, galleryop.ErrWorkerStillInstalling) {
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpInFlight(ctx, id, errMsg, d.adapter.InstallTimeout())
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "running_on_worker", Error: errMsg,
})
emitNodeProgress(node, "running_on_worker", errMsg)
continue
}
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
@@ -185,6 +243,7 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
})
emitNodeProgress(node, "error", errMsg)
}
return result, nil
}
@@ -226,7 +285,11 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error {
}
ctx := context.Background()
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
// Empty opID: plain DeleteBackend isn't gallery-tracked the same way as
// Install/Upgrade (no progress dialog), so we skip the per-node sink
// writes here. DeleteBackendDetailed is the HTTP path that surfaces
// per-node results in its own response.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -249,7 +312,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
return d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -324,9 +387,60 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
result[b.Name] = entry
}
}
// Proactively clear pending_backend_ops install rows whose intent is now
// satisfied: the backend is reported installed on its target node. Without
// this, the row sits in the queue until next_retry_at expires (up to the
// install timeout, default 15m) and the operator UI shows the install as
// "still installing in background" for that whole window even though the
// worker has actually been ready for minutes. We only clear install rows;
// upgrade and delete rows have presence-based semantics that do NOT match
// backend.list confirmation.
d.clearSatisfiedInstallRows(context.Background(), result)
return result, nil
}
// clearSatisfiedInstallRows removes pending_backend_ops install rows whose
// (nodeID, backend) pair now appears in the cluster-wide backend listing.
// Called by ListBackends after fan-out so the proactive clear sees every
// node's report. Best-effort: a DB failure is logged and the row stays for
// the reconciler to drain via its slower path.
func (d *DistributedBackendManager) clearSatisfiedInstallRows(ctx context.Context, backends gallery.SystemBackends) {
rows, err := d.registry.ListPendingBackendOps(ctx)
if err != nil {
xlog.Debug("clearSatisfiedInstallRows: failed to list pending ops", "error", err)
return
}
if len(rows) == 0 {
return
}
// Build a (nodeID, backend) presence set from the listing.
present := make(map[string]map[string]bool, len(backends))
for name, b := range backends {
for _, ref := range b.Nodes {
if present[ref.NodeID] == nil {
present[ref.NodeID] = make(map[string]bool)
}
present[ref.NodeID][name] = true
}
}
for _, row := range rows {
if row.Op != OpBackendInstall {
continue
}
if !present[row.NodeID][row.Backend] {
continue
}
if err := d.registry.DeletePendingBackendOp(ctx, row.ID); err != nil {
xlog.Debug("clearSatisfiedInstallRows: delete failed",
"id", row.ID, "node", row.NodeID, "backend", row.Backend, "error", err)
continue
}
xlog.Info("Reconciler: pending install row satisfied by backend.list",
"node", row.NodeID, "backend", row.Backend)
}
}
// InstallBackend fans out installation through the pending-ops queue so
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
@@ -345,11 +459,41 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
targetNodeIDs = map[string]bool{op.TargetNodeID: true}
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
result, err := d.enqueueAndDrainBackendOp(ctx, op.ID, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// onProgress fans each BackendInstallProgressEvent into two
// observers: the legacy single-bar progressCb (kept so callers
// that only consume the aggregate view keep working) and the
// per-node sink (so OpStatus.Nodes gets a "downloading" tick
// per file/percentage with node attribution). Defined inside the
// loop so each node captures its own node.Name into the closure.
onProgress := func(ev messaging.BackendInstallProgressEvent) {
if progressCb != nil {
progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage)
}
if d.progressSink != nil && op.ID != "" {
d.progressSink.UpdateNodeProgress(op.ID, ev.NodeID, galleryop.NodeProgress{
NodeID: ev.NodeID,
NodeName: node.Name,
Status: "downloading",
FileName: ev.FileName,
Current: ev.Current,
Total: ev.Total,
Percentage: ev.Percentage,
Phase: ev.Phase,
})
}
}
// nil-callback shortcut: when there is nothing to deliver to,
// hand the adapter a nil onProgress so it skips the per-op NATS
// subscription. Matches the pre-Phase-4 bridgeProgressCb semantics.
var onProgressArg func(messaging.BackendInstallProgressEvent)
if progressCb != nil || d.progressSink != nil {
onProgressArg = onProgress
}
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, onProgressArg)
if err != nil {
return err
}
@@ -361,7 +505,19 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
if err != nil {
return err
}
return result.Err()
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// No hard failures, but if at least one node reported running_on_worker,
// surface a wrapped ErrWorkerStillInstalling so galleryop can render a
// yellow in-progress state instead of green success. The reconciler
// will confirm the actual outcome on its next pass via backend.list.
for _, n := range result.Nodes {
if n.Status == "running_on_worker" {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
}
// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
@@ -392,7 +548,11 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
targetNodeIDs[n.NodeID] = true
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// Empty opID: the caller (galleryop) doesn't thread an op ID into
// UpgradeBackend today, so we can't tag per-node sink writes with the
// right OpStatus key. Until the upgrade path takes a ManagementOp the
// way InstallBackend does, the sink stays no-op here.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
if err != nil {
// Rolling-update fallback: an older worker doesn't know
@@ -417,7 +577,18 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
if err != nil {
return err
}
return result.Err()
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// Same in-progress surfacing as InstallBackend: a long-running worker
// upgrade that timed out the NATS round-trip must not be reported as
// green success.
for _, n := range result.Nodes {
if n.Status == "running_on_worker" {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
}
// IsDistributed reports that installs from this manager fan out across the
@@ -443,3 +614,16 @@ func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[stri
// it used to come from the empty frontend filesystem.
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
}
// summarizeRunningOnWorker builds a short human-readable summary of which
// nodes are still installing in the background, for inclusion in the
// wrapped ErrWorkerStillInstalling error.
func summarizeRunningOnWorker(nodes []NodeOpStatus) string {
var names []string
for _, n := range nodes {
if n.Status == "running_on_worker" {
names = append(names, n.NodeName)
}
}
return strings.Join(names, ", ")
}

View File

@@ -3,6 +3,7 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"runtime"
"sync"
"time"
@@ -12,6 +13,7 @@ import (
. "github.com/onsi/gomega"
"gorm.io/gorm"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
@@ -22,11 +24,35 @@ import (
// (or error). Used so each fan-out request can simulate a different worker
// outcome without spinning up real NATS.
type scriptedMessagingClient struct {
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
publishes []progressPublishCall
scheduledProgressPublishes []scheduledProgressPublish
subscribes []string
}
// progressPublishCall records a single Publish invocation. The progress
// publisher tests assert on the sequence of BackendInstallProgressEvent
// values written to a per-op subject, so we capture both subject and the
// decoded event. Named to avoid clashing with the simpler `publishCall`
// already defined in unloader_test.go (which stores raw JSON bytes for
// non-progress assertions).
type progressPublishCall struct {
Subject string
Event messaging.BackendInstallProgressEvent
}
// scheduledProgressPublish queues a batch of BackendInstallProgressEvent
// values to be delivered the next time Subscribe is called with the matching
// subject. This lets master-side tests assert that the adapter installs its
// handler BEFORE publishing the install request, by scripting events to be
// delivered as soon as the subscription appears.
type scheduledProgressPublish struct {
subject string
events []messaging.BackendInstallProgressEvent
}
// matchedReply lets a test script a canned reply that only fires when the
@@ -98,10 +124,10 @@ func (s *scriptedMessagingClient) scriptReplyMatching(subject string, pred func(
})
}
func (s *scriptedMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
func (s *scriptedMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.calls = append(s.calls, requestCall{Subject: subject, Data: data})
s.calls = append(s.calls, requestCall{Subject: subject, Data: data, Timeout: timeout})
// Predicate-matched replies take precedence over flat scriptReply.
if matchers, ok := s.matchedReplies[subject]; ok {
@@ -135,8 +161,88 @@ func (s *scriptedMessagingClient) Request(subject string, data []byte, _ time.Du
return nil, &fakeNoRespondersErr{}
}
func (s *scriptedMessagingClient) Publish(_ string, _ any) error { return nil }
func (s *scriptedMessagingClient) Subscribe(_ string, _ func([]byte)) (messaging.Subscription, error) {
// Publish records each call so progress-publisher tests can assert on the
// stream of events written to a subject. The real messaging.Client JSON
// encodes the payload before sending, but our publisher hands a typed
// struct directly, so we handle both shapes.
func (s *scriptedMessagingClient) Publish(subject string, data any) error {
s.mu.Lock()
defer s.mu.Unlock()
switch ev := data.(type) {
case messaging.BackendInstallProgressEvent:
s.publishes = append(s.publishes, progressPublishCall{Subject: subject, Event: ev})
case []byte:
var e messaging.BackendInstallProgressEvent
_ = json.Unmarshal(ev, &e)
s.publishes = append(s.publishes, progressPublishCall{Subject: subject, Event: e})
}
return nil
}
// publishCalls returns every BackendInstallProgressEvent that was published
// to `subject`, in order. Lets tests assert on debounce behavior without
// depending on internal Publish timing.
func (s *scriptedMessagingClient) publishCalls(subject string) []messaging.BackendInstallProgressEvent {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]messaging.BackendInstallProgressEvent, 0)
for _, c := range s.publishes {
if c.Subject != subject {
continue
}
out = append(out, c.Event)
}
return out
}
// scheduleProgressPublish queues a set of BackendInstallProgressEvent values
// to be delivered on the next Subscribe call matching the per-op progress
// subject. A short delay before delivery gives the subscriber time to install
// its message handler before the events arrive.
func (s *scriptedMessagingClient) scheduleProgressPublish(nodeID, opID string, events []messaging.BackendInstallProgressEvent) {
s.mu.Lock()
defer s.mu.Unlock()
s.scheduledProgressPublishes = append(s.scheduledProgressPublishes, scheduledProgressPublish{
subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID),
events: events,
})
}
// subscribeCalls returns the subjects on which Subscribe was invoked.
// Used to confirm the master skipped subscription when onProgress was nil.
func (s *scriptedMessagingClient) subscribeCalls() []string {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]string, len(s.subscribes))
copy(out, s.subscribes)
return out
}
func (s *scriptedMessagingClient) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
s.mu.Lock()
s.subscribes = append(s.subscribes, subject)
matched := []scheduledProgressPublish{}
remaining := s.scheduledProgressPublishes[:0]
for _, sp := range s.scheduledProgressPublishes {
if sp.subject == subject {
matched = append(matched, sp)
} else {
remaining = append(remaining, sp)
}
}
s.scheduledProgressPublishes = remaining
s.mu.Unlock()
go func() {
time.Sleep(20 * time.Millisecond)
for _, sp := range matched {
for _, ev := range sp.events {
raw, _ := json.Marshal(ev)
handler(raw)
}
}
}()
return &fakeSubscription{}, nil
}
func (s *scriptedMessagingClient) QueueSubscribe(_ string, _ string, _ func([]byte)) (messaging.Subscription, error) {
@@ -151,8 +257,43 @@ func (s *scriptedMessagingClient) SubscribeReply(_ string, _ func([]byte, func([
func (s *scriptedMessagingClient) IsConnected() bool { return true }
func (s *scriptedMessagingClient) Close() {}
// recordingNodeCall captures a single UpdateNodeProgress invocation so
// per-node OpStatus tests can assert on the sequence of writes the
// DistributedBackendManager fans out into the sink.
type recordingNodeCall struct {
OpID string
NodeID string
Progress galleryop.NodeProgress
}
// recordingProgressSink is a test-only nodeProgressSink that just records
// every call. Used by the per-node OpStatus specs below to assert the
// manager wrote the expected terminal and downloading entries.
type recordingProgressSink struct {
mu sync.Mutex
calls []recordingNodeCall
}
func (r *recordingProgressSink) UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, recordingNodeCall{OpID: opID, NodeID: nodeID, Progress: np})
}
func (r *recordingProgressSink) callsFor(opID, nodeID string) []galleryop.NodeProgress {
r.mu.Lock()
defer r.mu.Unlock()
out := []galleryop.NodeProgress{}
for _, c := range r.calls {
if c.OpID == opID && c.NodeID == nodeID {
out = append(out, c.Progress)
}
}
return out
}
// fakeNoRespondersErr is the unscripted-subject default. It matches
// nats.ErrNoResponders by string only used when a test forgets to script
// nats.ErrNoResponders by string only - used when a test forgets to script
// a node so the failure is loud but doesn't tickle errors.Is(...) sentinel
// paths the test wasn't deliberately exercising. Tests that DO want the
// real sentinel (e.g. to drive the manager's NoResponders fallback) call
@@ -204,7 +345,7 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(err).ToNot(HaveOccurred())
mc = newScriptedMessagingClient()
adapter = NewRemoteUnloaderAdapter(nil, mc)
adapter = NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
mgr = &DistributedBackendManager{
local: stubLocalBackendManager{},
adapter: adapter,
@@ -352,6 +493,263 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(mc.calls).To(BeEmpty())
})
})
Context("when InstallBackend times out on a worker", func() {
It("returns galleryop.ErrWorkerStillInstalling and keeps the queue row with NextRetryAt pushed out", func() {
n := registerHealthyBackend("slow", "10.0.0.1:50051")
// Script a NATS timeout on the install subject. The adapter
// wraps this into galleryop.ErrWorkerStillInstalling, which
// the manager should treat as a soft failure.
mc.scriptErr(messaging.SubjectNodeBackendInstall(n.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
rows, err := registry.ListPendingBackendOps(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(HaveLen(1))
Expect(rows[0].Backend).To(Equal("vllm"))
// The adapter is configured with a 3m install timeout in this
// suite (NewRemoteUnloaderAdapter above). NextRetryAt should
// be ~now+3m; a > now+2m bound is safe-but-tight enough to
// catch the buggy short default (30s exponential backoff).
Expect(rows[0].NextRetryAt).To(BeTemporally(">", time.Now().Add(2*time.Minute)),
"NextRetryAt should be pushed to ~now+installTimeout, not the short default")
})
})
Context("end-to-end: timeout then successful reconcile via backend.list", func() {
It("surfaces the install in ListBackends after the worker finishes", func() {
// Use the same node-registration helper the Task 5 test uses
// so the test fixture is identical to the prior context.
node := registerHealthyBackend("jetson", "10.0.0.2:50051")
// First install attempt: NATS times out. The adapter wraps
// this as galleryop.ErrWorkerStillInstalling and the manager
// keeps the pending_backend_ops row alive with NextRetryAt
// pushed out (asserted in the previous context).
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
rows, listErr := registry.ListPendingBackendOps(ctx)
Expect(listErr).ToNot(HaveOccurred())
Expect(rows).To(HaveLen(1))
// The worker finished installing in the background. Script
// backend.list on the same scriptedMessagingClient so the
// manager's ListBackends fan-out reports the backend.
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
backends, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
Expect(backends).To(HaveKey("vllm"))
Expect(backends["vllm"].Nodes).To(HaveLen(1))
Expect(backends["vllm"].Nodes[0].NodeID).To(Equal(node.ID))
// Phase 1b shipped: ListBackends proactively clears install rows
// whose intent is now satisfied by backend.list confirmation. The
// operator UI clears immediately instead of waiting for the next
// reconciler tick after NextRetryAt.
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(BeEmpty(),
"install row should clear once backend.list confirms presence on the target node")
})
})
Context("ListBackends clears confirmed install rows", func() {
It("deletes the pending_backend_ops install row when the backend is reported installed on its target node", func() {
node := registerHealthyBackend("worker-a", "10.0.0.5:50051")
// Pre-stage: simulate an admin install that timed out at the NATS
// round-trip, leaving an install row in the queue.
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue())
rows, _ := registry.ListPendingBackendOps(ctx)
Expect(rows).To(HaveLen(1))
// Worker finishes installing in the background. backend.list now
// confirms presence; ListBackends should proactively clear the row.
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
backends, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
Expect(backends).To(HaveKey("vllm"))
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(BeEmpty(),
"ListBackends should clear install rows whose intent is now satisfied by backend.list")
})
It("does NOT clear an upgrade row even if the backend is reported installed", func() {
node := registerHealthyBackend("worker-b", "10.0.0.6:50051")
Expect(registry.UpsertPendingBackendOp(ctx, node.ID, "vllm", OpBackendUpgrade, []byte("[]"))).To(Succeed())
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
_, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(HaveLen(1), "upgrade rows must not be cleared by backend.list presence")
})
})
Context("InstallBackend streams progress events to the caller's progressCb", func() {
It("invokes progressCb once per worker-published progress event", func() {
node := registerHealthyBackend("worker-prog", "10.0.0.7:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.7:50051"})
mc.scheduleProgressPublish(node.ID, "op-prog-1", []messaging.BackendInstallProgressEvent{
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100},
})
type tick struct {
FileName, Current, Total string
Percentage float64
}
var (
pcCalls []tick
mu sync.Mutex
)
progressCb := func(file, current, total string, pct float64) {
mu.Lock()
defer mu.Unlock()
pcCalls = append(pcCalls, tick{file, current, total, pct})
}
opVal := op("vllm")
opVal.ID = "op-prog-1"
Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(pcCalls)
}, "1s").Should(Equal(2))
mu.Lock()
defer mu.Unlock()
// The adapter dispatches each progress event to its own goroutine
// (see unloader.go: `go onProgress(ev)`) so two events emitted back
// to back can land at the bridge in either order. Assert the set of
// percentages observed contains both ticks, rather than depending
// on goroutine scheduling for ordering.
pcts := []float64{pcCalls[0].Percentage, pcCalls[1].Percentage}
Expect(pcts).To(ConsistOf(10.0, 100.0))
})
})
Context("InstallBackend tolerates silent (pre-Phase-2) workers", func() {
It("completes successfully even when no progress events are ever published", func() {
node := registerHealthyBackend("worker-silent", "10.0.0.8:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.8:50051"})
// NO scheduleProgressPublish call - silent worker.
var ticks int
var mu sync.Mutex
progressCb := func(file, current, total string, pct float64) {
mu.Lock()
defer mu.Unlock()
ticks++
}
opVal := op("vllm")
opVal.ID = "op-silent-1"
Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed())
Consistently(func() int {
mu.Lock()
defer mu.Unlock()
return ticks
}, "200ms").Should(Equal(0))
})
})
Context("populates per-node OpStatus entries", func() {
var sink *recordingProgressSink
BeforeEach(func() {
// Reconstruct mgr with the recording sink so the new code
// path (per-node OpStatus writes) is exercised. The default
// mgr in the outer BeforeEach has progressSink=nil so the
// pre-existing specs keep verifying the no-sink behavior.
sink = &recordingProgressSink{}
appCfg := &config.ApplicationConfig{}
mgr = NewDistributedBackendManager(appCfg, nil, adapter, registry, sink)
// stubLocalBackendManager mirrors the production behaviour
// where the frontend node rarely has the backend installed
// locally - the NATS fan-out is what these specs verify.
mgr.local = stubLocalBackendManager{}
})
It("emits a success entry for each healthy node visited", func() {
node := registerHealthyBackend("worker-ok", "10.0.0.9:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true, Address: "10.0.0.9:50051"})
opVal := op("vllm")
opVal.ID = "op-node-success"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
calls := sink.callsFor("op-node-success", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal("success"))
Expect(calls[len(calls)-1].NodeName).To(Equal("worker-ok"))
})
It("emits a running_on_worker entry when NATS times out", func() {
node := registerHealthyBackend("worker-slow", "10.0.0.10:50051")
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
opVal := op("vllm")
opVal.ID = "op-node-slow"
// Soft failure: returns wrapped ErrWorkerStillInstalling.
_ = mgr.InstallBackend(ctx, opVal, nil)
calls := sink.callsFor("op-node-slow", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal("running_on_worker"))
})
It("emits downloading entries from progress events", func() {
node := registerHealthyBackend("worker-dl", "10.0.0.11:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true})
mc.scheduleProgressPublish(node.ID, "op-node-dl", []messaging.BackendInstallProgressEvent{
{OpID: "op-node-dl", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100, Phase: "downloading"},
})
opVal := op("vllm")
opVal.ID = "op-node-dl"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
Eventually(func() bool {
for _, np := range sink.callsFor("op-node-dl", node.ID) {
if np.Status == "downloading" && np.Percentage == 100.0 {
return true
}
}
return false
}, "1s").Should(BeTrue())
})
})
})
Describe("UpgradeBackend", func() {

View File

@@ -68,9 +68,9 @@ type ModelScheduler interface {
// ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler.
type ReplicaReconcilerOptions struct {
Registry *NodeRegistry
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
Unloader NodeCommandSender
// Adapter is the NATS sender used to retry pending backend ops. When nil,
// the state-reconciler pending-drain pass is a no-op (single-node mode).
Adapter *RemoteUnloaderAdapter
@@ -78,7 +78,7 @@ type ReplicaReconcilerOptions struct {
// addresses. Matches the worker's token so HealthCheck auth succeeds.
RegistrationToken string
// Prober overrides the default gRPC health probe (used by tests).
Prober ModelProber
Prober ModelProber
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
@@ -191,7 +191,7 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// Pending-op drain for admin install — not a per-replica load.
// Replica 0 is the conventional admin slot. Install is idempotent:
// the worker short-circuits if the backend is already running.
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, "", nil)
if err != nil {
applyErr = err
} else if !reply.Success {

View File

@@ -17,24 +17,24 @@ import (
// Workers are generic — they don't have a fixed backend type.
// The SmartRouter dynamically installs backends via NATS backend.install events.
type BackendNode struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
// ReservedVRAM is a soft, in-tick reservation deducted by the scheduler when
// it picks this node to load a model. Workers reset it back to 0 on each
// heartbeat (the worker is the source of truth for actual free VRAM); the
// reservation is only here to keep two scheduling decisions within the
// same heartbeat window from over-committing the same node.
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
// MaxReplicasPerModel caps how many replicas of any one model can run on
// this node concurrently. Default 1 preserves the historical "one
// (node, model)" assumption; set higher (via worker --max-replicas-per-model)
@@ -44,12 +44,12 @@ type BackendNode struct {
// admin override. When true, the worker's CLI value is ignored on
// re-registration so the override survives worker restarts. Cleared
// by an explicit "reset to worker default" action.
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
const (
@@ -79,17 +79,17 @@ const (
// gRPC Address (each replica is a separate worker process on its own port),
// and its own InFlight counter.
type NodeModel struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
@@ -1287,7 +1287,7 @@ func (r *NodeRegistry) UpdateMaxReplicasPerModel(ctx context.Context, nodeID str
res := r.db.WithContext(ctx).Model(&BackendNode{}).
Where("id = ?", nodeID).
Updates(map[string]any{
ColMaxReplicasPerModel: n,
ColMaxReplicasPerModel: n,
"max_replicas_per_model_manually_set": true,
})
if res.Error != nil {
@@ -1460,7 +1460,7 @@ func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backe
NextRetryAt: time.Now(),
}
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
}).Create(&row).Error
}
@@ -1515,6 +1515,27 @@ func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uin
})
}
// RecordPendingBackendOpInFlight is the "soft failure" cousin of
// RecordPendingBackendOpFailure. Used when a NATS install round-trip timed
// out but the worker is still installing in the background. Stores the
// message in LastError and pushes NextRetryAt out by `retryDelay` (typically
// the install timeout) so the reconciler does not immediately re-fire
// another install while the worker is still busy.
//
// Attempts is intentionally NOT incremented: an in-flight timeout is not a
// failed attempt, it is a still-in-progress one. Incrementing it would let a
// genuinely-progressing slow install (e.g. 30 GB CUDA image on Wi-Fi) trip
// the maxPendingBackendOpAttempts cap in the reconciler and dead-letter the
// row while the worker is still legitimately working.
func (r *NodeRegistry) RecordPendingBackendOpInFlight(ctx context.Context, id uint, lastError string, retryDelay time.Duration) error {
return r.db.WithContext(ctx).Model(&PendingBackendOp{}).
Where("id = ?", id).
Updates(map[string]any{
"last_error": lastError,
"next_retry_at": time.Now().Add(retryDelay),
}).Error
}
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
// reconciler tick is 30s so anything shorter would just re-fire immediately.
func backoffForAttempt(attempts int) time.Duration {

View File

@@ -688,7 +688,7 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
v, err, _ := r.installFlight.Do(key, func() (any, error) {
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
if err != nil {
return "", err
}

View File

@@ -330,7 +330,7 @@ type upgradeCall struct {
replica int
}
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) {
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int, _ string, _ func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error) {
// installHook intentionally runs OUTSIDE the mutex: the hook may block
// on a channel and we don't want to serialize concurrent callers,
// which would defeat the singleflight-overlap test.

View File

@@ -2,9 +2,15 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/xlog"
)
@@ -28,7 +34,7 @@ type backendStopRequest struct {
// nats.ErrNoResponders for old workers that don't subscribe to the new
// backend.upgrade subject.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
@@ -43,18 +49,33 @@ type NodeCommandSender interface {
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
// over NATS for remote nodes.
type RemoteUnloaderAdapter struct {
registry ModelLocator
nats messaging.MessagingClient
registry ModelLocator
nats messaging.MessagingClient
installTimeout time.Duration
upgradeTimeout time.Duration
}
// NewRemoteUnloaderAdapter creates a new adapter.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
// NewRemoteUnloaderAdapter creates a new adapter. installTimeout and
// upgradeTimeout govern the NATS request-reply deadlines for backend.install
// and backend.upgrade respectively. Use
// DistributedConfig.BackendInstallTimeoutOrDefault() /
// BackendUpgradeTimeoutOrDefault() at construction.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient, installTimeout, upgradeTimeout time.Duration) *RemoteUnloaderAdapter {
return &RemoteUnloaderAdapter{
registry: registry,
nats: nats,
registry: registry,
nats: nats,
installTimeout: installTimeout,
upgradeTimeout: upgradeTimeout,
}
}
// InstallTimeout returns the configured backend.install round-trip timeout.
// Used by DistributedBackendManager to push NextRetryAt out by this duration
// when a worker times out replying but is still installing in the background.
func (a *RemoteUnloaderAdapter) InstallTimeout() time.Duration {
return a.installTimeout
}
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
// to stop their backend process via NATS backend.stop event.
// The worker process handles: Free() → kill process.
@@ -87,18 +108,59 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
// is on disk, the worker just spawns a process; only a missing binary
// triggers a full gallery pull.
//
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
// case while still failing fast enough to surface real worker hangs.
// Timeout: configured via DistributedConfig.BackendInstallTimeoutOrDefault
// (default 15m). Most calls return in under 2 seconds (process already
// running). The 15-minute ceiling covers the cold-binary spawn-after-download
// case on slow links (Jetson Wi-Fi, multi-GB CUDA images) while still
// failing fast enough to surface real worker hangs.
//
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead -
// it lives on a different NATS subject so it cannot head-of-line-block
// routine load traffic on the same worker.
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
func (a *RemoteUnloaderAdapter) InstallBackend(
nodeID, backendType, modelID, galleriesJSON, uri, name, alias string,
replicaIndex int,
opID string,
onProgress func(messaging.BackendInstallProgressEvent),
) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "opID", opID)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
// Subscribe to the per-op progress subject BEFORE publishing the install
// request so we don't miss early events. When onProgress is nil OR opID
// is empty (the reconciler-driven retry path), skip subscription entirely:
// silent installs cost nothing extra.
var sub messaging.Subscription
if onProgress != nil && opID != "" {
progressSubject := messaging.SubjectNodeBackendInstallProgress(nodeID, opID)
s, subErr := a.nats.Subscribe(progressSubject, func(raw []byte) {
var ev messaging.BackendInstallProgressEvent
if err := json.Unmarshal(raw, &ev); err != nil {
xlog.Debug("malformed install progress event", "subject", progressSubject, "error", err)
return
}
// Goroutine guard: a slow onProgress callback must not stall
// the NATS reader thread.
//
// NOTE: events spawn one goroutine each, so ordering at the
// consumer is best-effort. In practice the worker debounces to
// ~250ms which is far larger than goroutine scheduling jitter,
// so reordering is rare. The worker's final Flush() event is
// intended to win as the terminal tick. A future hardening pass
// could add a Seq uint64 field to BackendInstallProgressEvent
// and drop stale-by-seq at the bridge if reordering becomes a
// real UX issue.
go onProgress(ev)
})
if subErr != nil {
xlog.Warn("Failed to subscribe to install progress subject; proceeding without progress streaming",
"subject", progressSubject, "error", subErr)
} else {
sub = s
}
}
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
ModelID: modelID,
BackendGalleries: galleriesJSON,
@@ -106,29 +168,46 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 3*time.Minute)
OpID: opID,
}, a.installTimeout)
if sub != nil {
_ = sub.Unsubscribe()
}
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
// The worker stops every live process for this backend, force-reinstalls
// from the gallery (overwriting the on-disk artifact), and replies. The
// next routine InstallBackend call spawns a fresh process with the new
// binary upgrade itself does not start a process.
// binary - upgrade itself does not start a process.
//
// Timeout: 15 minutes. Real-world worst case observed: 810 minutes for
// large CUDA-l4t backend images on Jetson over WiFi.
// Timeout: configured via DistributedConfig.BackendUpgradeTimeoutOrDefault
// (default 15m). Real-world worst case observed: 8-10 minutes for large
// CUDA-l4t backend images on Jetson over WiFi.
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
reply, err := messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 15*time.Minute)
}, a.upgradeTimeout)
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}
// installWithForceFallback is the rolling-update fallback used by
@@ -141,7 +220,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
@@ -149,7 +228,12 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: true,
}, 15*time.Minute)
}, a.upgradeTimeout)
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}
// ListBackends queries a worker node for its installed backends via NATS request-reply.
@@ -228,3 +312,14 @@ func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
subject := messaging.SubjectNodeStop(nodeID)
return a.nats.Publish(subject, nil)
}
// isNATSTimeout returns true if err looks like a NATS request-reply timeout.
// nats.ErrTimeout is the canonical sentinel; context.DeadlineExceeded can
// also surface depending on the client's path; we accept both, plus a
// string-match fallback for clients that return a bare error.
func isNATSTimeout(err error) bool {
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
return true
}
return err != nil && strings.Contains(err.Error(), "nats: timeout")
}

View File

@@ -3,13 +3,16 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/nats-io/nats.go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
)
@@ -60,6 +63,7 @@ type publishCall struct {
type requestCall struct {
Subject string
Data []byte
Timeout time.Duration
}
func (f *fakeMessagingClient) Publish(subject string, data any) error {
@@ -93,10 +97,10 @@ func (f *fakeMessagingClient) SubscribeReply(_ string, _ func(data []byte, reply
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
func (f *fakeMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data})
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data, Timeout: timeout})
return f.requestReply, f.requestErr
}
@@ -119,7 +123,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
BeforeEach(func() {
locator = &fakeModelLocator{}
mc = &fakeMessagingClient{}
adapter = NewRemoteUnloaderAdapter(locator, mc)
adapter = NewRemoteUnloaderAdapter(locator, mc, 3*time.Minute, 15*time.Minute)
})
Describe("UnloadRemoteModel", func() {
@@ -154,7 +158,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
}
// Use a messaging client that fails the first Publish call only.
failOnce := &failOnceMessagingClient{inner: mc, failOn: 0}
adapter = NewRemoteUnloaderAdapter(locator, failOnce)
adapter = NewRemoteUnloaderAdapter(locator, failOnce, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
@@ -259,3 +263,96 @@ func (f *failOnceMessagingClient) Request(subject string, data []byte, timeout t
func (f *failOnceMessagingClient) IsConnected() bool { return true }
func (f *failOnceMessagingClient) Close() {}
var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
It("passes the configured install timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(7 * time.Minute))
})
It("passes the configured upgrade timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendUpgrade("n1"), messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute))
})
})
var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() {
It("wraps nats.ErrTimeout from InstallBackend in galleryop.ErrWorkerStillInstalling", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
})
It("does NOT wrap non-timeout errors", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse())
Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue())
})
})
var _ = Describe("RemoteUnloaderAdapter install progress streaming", func() {
It("forwards BackendInstallProgressEvent values into the onProgress callback when the worker publishes them", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
mc.scheduleProgressPublish("n1", "op-abc", []messaging.BackendInstallProgressEvent{
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "500 MB", Total: "1 GB", Percentage: 50},
})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
var (
received []messaging.BackendInstallProgressEvent
mu sync.Mutex
)
onProgress := func(ev messaging.BackendInstallProgressEvent) {
mu.Lock()
defer mu.Unlock()
received = append(received, ev)
}
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "op-abc", onProgress)
Expect(err).ToNot(HaveOccurred())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(received)
}, "1s").Should(Equal(2))
})
It("does NOT subscribe when onProgress is nil (reconciler retry path)", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.subscribeCalls()).To(BeEmpty(),
"reconciler-driven retries must not subscribe to the per-op progress subject")
})
})

View File

@@ -1,6 +1,8 @@
package nodes
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -15,7 +17,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc)
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(reply.Success).To(BeTrue())
@@ -24,7 +26,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
It("returns the underlying error when the subject has no responders", func() {
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
adapter := NewRemoteUnloaderAdapter(nil, mc)
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
Expect(err).To(HaveOccurred())
})

View File

@@ -7,14 +7,22 @@ import (
"os"
"path/filepath"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/xlog"
)
// installProgressDebounce is the leading-edge window the worker uses when
// streaming download progress to the master. 250ms caps wire chatter at
// ~4 events/sec per in-flight install while still surfacing every
// meaningful percentage jump.
const installProgressDebounce = 250 * time.Millisecond
// buildProcessKey is the supervisor's stable identifier for a backend gRPC
// process. It includes the replica index so the same model can run multiple
// processes on a worker simultaneously without colliding on the same map slot
@@ -100,6 +108,20 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest,
}
}
// When the master tagged this install with an OpID, stream the
// gallery download progress back to it on the per-op NATS subject.
// Old masters that omit OpID stay on the silent path so they keep
// working without changes. The publisher releases its mutex before
// every Publish so a slow link never stalls the download loop, and
// the deferred Flush guarantees a terminal-percentage event reaches
// the master even when the install errors out.
var downloadCb func(file, current, total string, percentage float64)
if req.OpID != "" && s.nats != nil {
publisher := nodes.NewDebouncedInstallProgressPublisher(s.nats, s.nodeID, req.OpID, req.Backend, installProgressDebounce)
downloadCb = publisher.OnDownload
defer publisher.Flush()
}
// On upgrade, run the gallery install path even if the binary already
// exists on disk: findBackend would otherwise short-circuit and we'd
// restart the same stale binary. The force flag passed to
@@ -112,14 +134,14 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest,
if req.URI != "" {
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", force)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
} else {
xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", force)
if err := gallery.InstallBackendFromGallery(
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, force, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, req.Backend, downloadCb, force, s.cfg.RequireBackendIntegrity,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}

View File

@@ -86,6 +86,8 @@ The frontend is a standard LocalAI instance with distributed mode enabled. These
| `--auto-approve-nodes` | `LOCALAI_AUTO_APPROVE_NODES` | `false` | Auto-approve new worker nodes (skip admin approval) |
| `--auth` | `LOCALAI_AUTH` | `false` | **Must be `true`** for distributed mode |
| `--auth-database-url` | `LOCALAI_AUTH_DATABASE_URL` | *(required)* | PostgreSQL connection URL |
| `--backend-install-timeout` | `LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT` | `15m` | NATS round-trip timeout for `backend.install` requests sent to worker nodes. Raise on slow links pulling multi-GB OCI images (e.g. Jetson over Wi-Fi). If the round-trip times out but the worker is still installing in the background, the admin UI shows the operation as `still installing in background` rather than failed, and the reconciler confirms completion via the next `backend.list` poll. |
| `--backend-upgrade-timeout` | `LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT` | `15m` | NATS round-trip timeout for `backend.upgrade` requests (force-reinstall path). Same semantics as install timeout. |
### Optional: S3 Object Storage
@@ -103,6 +105,46 @@ When S3 is not configured, model files are transferred directly from the fronten
For high-throughput or very large model files, S3 can be more efficient since it avoids streaming through the frontend.
### Install Progress Streaming
While a worker is pulling an OCI image for a backend install, it publishes
debounced progress events (~250ms) on `nodes.<nodeID>.backend.install.<opID>.progress`.
The frontend subscribes for the duration of the install request and forwards each
event into the operation status so the admin UI surfaces per-file byte progress
and percentage in real time, the same way local-mode installs already do.
The NATS reply for `backend.install` is still the source of truth for the
final success/failure; dropped progress events are acceptable and the install
completes regardless.
**Mixed-version clusters:** Workers running pre-2026-05-22 code do not publish
on the new progress subject. New frontends tolerate that silently. The install
still completes via the reply; the UI keeps showing the message from the
install-timeout fallback path (`still installing in background`) until the
pending operation row clears.
#### Per-Node Operations Breakdown
When an admin backend install fans out to more than one worker, the
**Operations Bar** at the top of the admin UI shows a per-node breakdown.
Click the "N nodes" chevron on the operation row to expand a list with one
entry per target node, each carrying:
- A color-coded status pill: queued (gray), downloading (blue), worker
busy / running on worker (yellow), done (green), failed (red).
- The current file being pulled, current/total bytes, percentage.
- A thin per-node progress bar.
- Any error message returned by the worker for that node.
The yellow "Worker busy" pill appears when the NATS round-trip to a node
timed out but the worker is still installing in the background. Hover the
pill for the full tooltip.
The breakdown is driven by the `nodes` array on the `/api/operations`
response, which the frontend polls every second. Single-node installs and
model installs render the same single-line card as before: the per-node
section only appears when more than one node is involved.
## Worker Configuration
Workers are started with the `worker` subcommand. Each worker is generic — it doesn't need a backend type at startup:

View File

@@ -225,7 +225,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
// newTestSmartRouter creates a SmartRouter with NATS wired up and a mock
// backend.install handler that always replies success for all registered nodes.
newTestSmartRouter := func(reg *nodes.NodeRegistry, extraOpts ...nodes.SmartRouterOptions) *nodes.SmartRouter {
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC, 3*time.Minute, 15*time.Minute)
opts := nodes.SmartRouterOptions{
Unloader: unloader,
@@ -395,7 +395,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
Expect(err).ToNot(HaveOccurred())
// Create RemoteUnloaderAdapter and unload model
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
err = unloader.UnloadRemoteModel("old-model")
Expect(err).ToNot(HaveOccurred())

View File

@@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
@@ -175,7 +176,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedModelManager(appCfg, ml, adapter)
err = distMgr.DeleteModel("big-model")
@@ -251,8 +252,8 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
err = distMgr.DeleteBackend("my-backend")
Expect(err).ToNot(HaveOccurred())
@@ -298,8 +299,8 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
// Should NOT return an error even though the backend doesn't exist locally
err = distMgr.DeleteBackend("remote-only-backend")

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -56,8 +57,8 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeTrue())
})
@@ -77,8 +78,8 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeFalse())
Expect(installReply.Error).To(ContainSubstring("backend not found"))
@@ -103,7 +104,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
// Frontend calls UnloadRemoteModel (triggered by UI "Stop" or WatchDog)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("whisper-large")).To(Succeed())
Eventually(func() int32 { return stopReceived.Load() }, "5s").Should(Equal(int32(1)))
@@ -133,14 +134,14 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter.UnloadRemoteModel("shared-model")
Eventually(func() int32 { return count.Load() }, "5s").Should(Equal(int32(2)))
})
It("should be no-op for models not on any node", func() {
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("nonexistent-model")).To(Succeed())
})
})
@@ -161,7 +162,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.StopNode(node.ID)).To(Succeed())
Eventually(func() int32 { return stopped.Load() }, "5s").Should(Equal(int32(1)))

View File

@@ -3,6 +3,7 @@ package distributed_test
import (
"context"
"encoding/json"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -78,7 +79,7 @@ var _ = Describe("SmartRouter trackingKey", Label("Distributed"), func() {
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
nodeID = node.ID
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
router = nodes.NewSmartRouter(registry, nodes.SmartRouterOptions{
Unloader: unloader,
})