Compare commits

...

10 Commits

Author SHA1 Message Date
LocalAI [bot]
a95f4e63e0 chore: ⬆️ Update ikawrakow/ik_llama.cpp to 642c038ccdf3dd08e6d9ac6fdc3b1c311ebd8a02 (#9966)
⬆️ Update ikawrakow/ik_llama.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 23:52:51 +02:00
LocalAI [bot]
dfd19a3f88 chore: ⬆️ Update ggml-org/llama.cpp to c0c7e147e7efa6c5858754b47259ba4880f8a906 (#9963)
⬆️ Update ggml-org/llama.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 23:52:36 +02:00
LocalAI [bot]
d7387c725c feat(swagger): update swagger (#9962)
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 23:52:10 +02:00
LocalAI [bot]
63d84a5705 chore: ⬆️ Update antirez/ds4 to 444afce822057d87f14c4dec307dce24fd49b3ee (#9964)
⬆️ Update antirez/ds4

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 23:51:53 +02:00
LocalAI [bot]
1198d10b58 fix(traces): cap backend trace Data to keep admin UI responsive (#9960)
* fix(traces): cap backend trace Data field so the admin UI stays responsive

The previous fix (#9946) capped API trace bodies but missed backend traces,
which carry the same blast radius:

  - LLM backend traces store the full chat messages JSON, full response, and
    full streaming deltas. Every agent-pool reasoning step ships the full
    RAG-augmented history (50-500 KiB per trace, often 100+ traces queued).
  - TTS / audio_transform / transcript traces embed a 30s audio snippet as
    base64, around 1.3 MiB per trace.

Both blow the /api/backend-traces JSON past tens of MiB. The admin Traces
page then keeps re-downloading and re-parsing the buffer faster than the
5s auto-refresh and stays in the loading state forever, the same symptom
the API-side fix addressed.

Apply two complementary caps, both honoring LOCALAI_TRACING_MAX_BODY_BYTES:

Option A (safety net in core/trace): RecordBackendTrace walks the Data map
recursively and replaces any string value larger than the cap with
"<truncated: N bytes>". Catches anything a future producer forgets.

Option B (head-preserving at the producer):
  - core/backend/llm.go: TruncateToBytes on messages, response, and
    chat_deltas content/reasoning_content so the leading content stays
    readable in the UI.
  - core/trace/audio_snippet.go: omit audio_wav_base64 when the encoded
    blob would exceed the cap (truncated base64 is undecodable). The
    quality metrics still ship and the UI's WaveformPlayer simply skips
    when the field is absent.

TruncateToBytes is bounded to <= maxBytes so Option A leaves the producer's
head-preserving output alone instead of replacing it with the bare marker.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7

* fix(react-ui): expose tracing_max_body_bytes in Settings and Traces panels

The setting was already plumbed through env (LOCALAI_TRACING_MAX_BODY_BYTES),
CLI flag, and the runtime_settings.json GET/PUT schema, but neither the main
Settings page nor the inline Traces panel offered an input for it. Admins
hitting the "Traces UI stuck loading" symptom had to know to set an env var
or PUT raw JSON to /api/settings to dial the cap.

Add a "Max Body Bytes" row next to "Max Items" in both places. Same input
type, same disabled-when-tracing-off semantics, placeholder shows the 65536
default so users see what they're inheriting.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7

* test(react-ui): disambiguate Max Items locator after adding Max Body Bytes

The Tracing settings panel now has two number inputs. The previous spec
matched 'input[type="number"]' which became ambiguous and triggered a
Playwright strict-mode violation in CI. Switch to getByPlaceholder('100')
for Max Items and add a parallel spec for the new Max Body Bytes field
using getByPlaceholder('65536').

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-23 14:50:40 +02:00
LocalAI [bot]
a0f3e26245 fix(distributed): make admin backend installs resilient and observable (#9958)
* feat(distributed): add configurable NATS backend install/upgrade timeouts

Adds BackendInstallTimeout and BackendUpgradeTimeout to DistributedConfig
with 15m defaults, following the existing MCPToolTimeout / WorkerWaitTimeout
pattern. These will replace the hardcoded literals in RemoteUnloaderAdapter
so admin-driven backend installs across the cluster survive long OCI image
pulls that previously timed out at 3m.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* style(distributed): gofmt alignment after timeout fields

Re-aligns the Validate() negative-duration map and the Default* const
block so the new BackendInstall/UpgradeTimeout entries do not leave
the surrounding columns mis-padded.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(cli): surface LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT and _UPGRADE_TIMEOUT

Parses the two new env vars on the run CLI and threads them through the
existing AppOption builder so DistributedConfig picks them up. Invalid
duration strings now fail loudly at startup rather than silently falling
back to the default.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): inject NATS install/upgrade timeouts into RemoteUnloaderAdapter

Removes the hardcoded 3m / 15m literals from RemoteUnloaderAdapter and
threads in DistributedConfig.BackendInstallTimeoutOrDefault() and
BackendUpgradeTimeoutOrDefault() at construction. Install now defaults
to 15m (was 3m); cold OCI image pulls on Jetson Wi-Fi routinely blew
past the old ceiling. Scripted messaging client captures the timeout
so tests can assert the configured value actually reaches the NATS
request.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): introduce galleryop.ErrWorkerStillInstalling sentinel

When the NATS request-reply for backend.install (or .upgrade) times out
the worker is almost always still pulling the OCI image. Wrap the timeout
in a typed sentinel so the manager above can distinguish "worker hung"
from "worker still working" and leave the pending_backend_ops row in
place for the reconciler to confirm via backend.list.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): treat NATS install timeout as in-progress, not failure

When a worker times out replying to backend.install but the install is
still running on the worker, enqueueAndDrainBackendOp now reports a
running_on_worker status and pushes NextRetryAt out by the install
timeout so the reconciler does not immediately re-fire another install
while the worker is still pulling the image. The pending_backend_ops
row stays in place for the next reconciler pass to confirm via
backend.list.

InstallBackend wraps the result in galleryop.ErrWorkerStillInstalling
so callers can branch (galleryop renders yellow in-progress instead of
red error). UpgradeBackend uses the same wrap.

Adds RemoteUnloaderAdapter.InstallTimeout() so the manager can push
NextRetryAt by the configured timeout without reaching into a private
field, and NodeRegistry.RecordPendingBackendOpInFlight as the soft
cousin of RecordPendingBackendOpFailure.

Also includes incidental gofmt-driven struct-field alignment in
registry.go on lines unrelated to the change (touched files are
re-formatted to canonical form per project policy).

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(distributed): don't increment Attempts on in-flight install timeout

An in-flight timeout (worker still pulling the OCI image) is not a
failed attempt, it's a delayed one. Incrementing Attempts let
genuinely-progressing slow installs (e.g. 30 GB CUDA images on Wi-Fi)
trip the reconciler's maxPendingBackendOpAttempts cap and dead-letter
the queue row while the worker was still legitimately working.

RecordPendingBackendOpInFlight now only updates LastError and NextRetryAt.
Also documents "running_on_worker" in the NodeOpStatus.Status enum
comment so Task 6 implementers see the full surface.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): surface ErrWorkerStillInstalling as non-error OpStatus

When the distributed backend manager returns an error that wraps
ErrWorkerStillInstalling, backendHandler now completes the op with a
"still installing in background" message rather than marking it as a
red failure. Admin UI sees a yellow in-progress state; reconciler
confirms completion on its next pass.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): end-to-end install-timeout-then-reconcile

Wires Task 1-6 end-to-end so any seam mismatch surfaces in CI rather
than during a real cluster install. NATS times out, the queue row
stays alive with running_on_worker status, the worker eventually
reports the backend installed via backend.list, the manager surfaces
it via ListBackends.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): document LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT / _UPGRADE_TIMEOUT

Add the two new operator-tunable env vars to the Frontend Configuration
table in the distributed-mode docs. Explains the 15m default, when to
raise it (slow links pulling multi-GB OCI images), and the new
"still installing in background" admin-UI state when the round-trip
times out but the worker is still working.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): clear pending install rows when backend.list confirms

DistributedBackendManager.ListBackends now proactively clears
pending_backend_ops install rows whose (nodeID, backend) is reported
installed by backend.list. Operator UI updates immediately instead of
waiting up to installTimeout (default 15m) for the next reconciler
tick after NextRetryAt.

Only install rows are cleared; upgrade and delete intents are not
satisfied by presence in backend.list and continue to drain through
their normal reconciler paths.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(messaging): add BackendInstallProgressEvent wire type and subject

New NATS subject nodes.<nodeID>.backend.install.<opID>.progress lets the
worker publish transient progress events (file, current/total bytes,
percentage, phase) while a long-running install pulls its OCI image.
BackendInstallRequest gains an optional OpID field so the worker knows
which subject to publish on.

Transient pub/sub (not JetStream): the install reply remains ground
truth for success/failure; dropped progress events are tolerable.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* style(messaging): drop em-dash from BackendInstallProgress test comment

Per project convention (no em-dashes anywhere). Comment substance is
unchanged.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): worker publishes debounced install progress over NATS

When BackendInstallRequest.OpID is set, the worker's backend.install
handler wires a debounced publisher (250ms window) into the gallery
download callback. Each tick becomes a BackendInstallProgressEvent on
nodes.<nodeID>.backend.install.<opID>.progress; the publisher always
emits a final event on Flush so the UI sees the terminal percentage.

Old masters that do not set OpID continue to run silent installs: no
behavior change for them. Lock ordering: the publisher releases its
mutex before calling messaging.Publish so a slow network never stalls
the install loop.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): RemoteUnloaderAdapter subscribes to install progress

InstallBackend gains opID + onProgress parameters. When both are set,
the adapter subscribes to nodes.<nodeID>.backend.install.<opID>.progress
BEFORE publishing the install request, decodes each message into the
caller's onProgress callback in a goroutine (so a slow callback never
stalls the NATS reader thread), and unsubscribes after RequestJSON
returns.

When onProgress is nil OR opID is empty (the reconciler retry path),
subscription is skipped entirely - silent installs cost nothing extra.

Subscribe failure is logged at Warn and the install proceeds without
progress streaming; the NATS round-trip still owns terminal status.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): forward backend install progress into galleryop OpStatus

DistributedBackendManager.InstallBackend now passes the gallery op ID
and a progress bridge into the adapter call. Each
BackendInstallProgressEvent from the worker becomes a
galleryop.ProgressCallback tick - which the existing backendHandler
already turns into OpStatus.UpdateStatus, so the admin UI/SSE polling
sees per-byte progress for distributed installs without any UI-side
change.

UpgradeBackend is intentionally left silent for now: its wire request
(BackendUpgradeRequest) does not carry OpID, and rolling-update
fallback is the rarer path. Will be picked up in a follow-up if the
worker upgrade path also gets a progress channel.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): InstallBackend tolerates silent (pre-Phase-2) workers

A worker on pre-Phase-2 code never publishes progress events. The new
master subscribes optimistically; this spec pins that a silent worker
still produces a green install with no progressCb ticks. The install
reply is the source of truth for terminal state; the progress stream
is a best-effort UX enrichment.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): document install progress streaming

Note the new nodes.<nodeID>.backend.install.<opID>.progress subject and
the silent-worker compatibility behavior so operators know to expect
real-time progress and what happens on a mixed-version cluster.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): note progress-event ordering trade-off in InstallBackend

Document near the goroutine dispatch why ordering at the consumer is
best-effort, why it rarely matters in practice (worker debounce >>
goroutine jitter), and what a future hardening pass would look like
(Seq field + stale-by-seq drop). Stops the next reader from accidentally
"fixing" the goroutine pool away.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): add NodeProgress + OpStatus.Nodes for per-node breakdown

Adds the data model the UI needs to render an expandable per-node
breakdown of a fanned-out backend install. NodeProgress carries node
identity (ID + name), per-node status (queued / running_on_worker /
success / error / downloading), the current file + bytes + percentage
from the Phase 2 progress stream, and any per-node error.

OpStatus.Nodes is the slice the /api/operations handler will surface
in a follow-up.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): UpdateNodeProgress merges per-node ticks by NodeID

GalleryService.UpdateNodeProgress(opID, nodeID, np) merges a NodeProgress
into OpStatus.Nodes (keyed by NodeID, no duplicates) and mirrors the
latest tick into the aggregate Progress / FileName /
DownloadedFileSize / TotalFileSize fields so the legacy single-bar
OperationsBar view keeps working unchanged alongside the new per-node
breakdown.

Concurrent-safe via the existing g.Mutex.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): write per-node OpStatus entries during install fan-out

DistributedBackendManager now accepts a nodeProgressSink and feeds it
two streams:

1. enqueueAndDrainBackendOp emits a per-node terminal entry on each
   status it appends to BackendOpResult (queued, success, error,
   running_on_worker). The opID is threaded through the function so
   the sink gets the right gallery op identity.

2. The install apply closure fans each BackendInstallProgressEvent
   into the sink as a downloading entry, alongside the legacy
   progressCb path so the aggregate single-bar view stays correct.

Production wiring passes the GalleryService (which implements
UpdateNodeProgress via Task 2) as the sink. Single-node tests pass
nil. DeleteBackend and UpgradeBackend pass an empty opID so the
sink path no-ops for ops that aren't gallery-tracked the same way
as Install.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(operations): expose per-node breakdown on /api/operations

When an operation's OpStatus has Nodes entries (populated by the
Phase 4 progress sink wiring), surface them as a "nodes" array on the
/api/operations response, sorted by node_name for stable rendering.

Backward compatible: legacy clients ignore the field; ops without any
node entries (single-node mode, model installs) omit the array entirely
thanks to the empty-slice guard.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ui): per-node breakdown in OperationsBar

When an install op fans out to more than one worker, the operations
bar now shows a "N nodes" chevron that expands into a per-node list.
Each row carries the node's status (color-coded pill), the current
file being downloaded, byte counts, percentage, and a thin per-node
progress bar. Yellow "Worker busy" pill marks running_on_worker
status with a tooltip explaining the NATS round-trip timed out but
the worker is still installing in the background.

Backward compatible: ops without a nodes field (legacy or single-node
mode) render as before. State for expand/collapse is local to the
component, keyed by jobID/id - reload starts collapsed.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): document per-node breakdown in the operations bar

Adds a short subsection covering the expandable "N nodes" chevron in
the OperationsBar admin UI, the meaning of each status pill, and
how it relates to the /api/operations nodes array.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(galleryop): UpdateStatus preserves Nodes when caller sends none

Real-world bug surfaced by the Phase 4 multi-worker smoke test: the
nodes[] array in /api/operations flickered between a single node at a
time on a 2-worker install. Root cause: the Phase 2 progress bridge
also calls the legacy progressCb -> UpdateStatus(&OpStatus{...}) on
every tick. UpdateStatus then overwrote the entire status pointer,
wiping the Nodes slice that UpdateNodeProgress had just merged in.

Fix: in UpdateStatus, if the incoming op has an empty Nodes slice,
carry forward the previous status's Nodes before storing. Callers
that explicitly populate Nodes still win (their slice replaces the
prior one, no merge across the two code paths).

Two regression specs added pinning both directions of the contract.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): strip implementation details from user-facing docs

Trim the new install/upgrade timeout rows and the install-progress
sections to focus on what the operator sees and tunes. Drops:

- the NATS subject names and pub/sub mechanics
- "round-trip" / reconciler / backend.list jargon
- /api/operations polling cadence
- "pre-2026-05-22" version references

Reframes the breakdown text around the admin UI (Operations Bar,
chevron, status pills, "Worker busy" tooltip). Implementation context
lives in the agent notes and code comments.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(config): move DistributedConfig.Validate flag names to constants

The negative-duration check map was a wall of literal kebab-case
strings that had to stay in sync with the kong-derived CLI flag names
manually. Move them to a Flag* const block alongside the existing
Default* block so a rename of either the Go field or the CLI naming
convention forces a compile error rather than silent drift.

Sole consumer today is Validate; the constants are exported so future
operator-facing surfaces (e.g. error messages on other validation
paths) can reference them by name instead of repeating the literals.

Tests pin both the literal values (so a future "let's just rename
this" doesn't accidentally regress the CLI flag) and the negative-
duration error message for the new BackendInstall / BackendUpgrade
fields.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(distributed): extract NodeStatus and Phase enums to constants

Sweep for the same literal-string-as-identifier pattern called out on
the Validate flag names: the per-node install status enum
("queued" | "downloading" | "running_on_worker" | "success" | "error")
appeared as raw literals across managers_distributed.go (10+ sites,
including 3 separate `n.Status == "running_on_worker"` checks),
operation.go, and the test suite. Same shape for the Phase enum
("resolving" | "downloading" | "extracting" | "starting") in the
worker-side progress publisher.

Promote both to exported const blocks:

- galleryop.NodeStatus{Queued,Downloading,RunningOnWorker,Success,Error}
  shared between galleryop.NodeProgress.Status (the wire field) and
  nodes.NodeOpStatus.Status (the in-process per-node summary)
- messaging.Phase{Resolving,Downloading,Extracting,Starting}
  shared between the worker publisher and any future consumer that
  needs to switch on phase

Tests pin both the literal values (so a future "let's just rename" doesn't
silently change the JSON wire) and use the constants in setup (so the
producer side stays drift-protected). Wire-format assertions on the
/api/operations JSON output keep their literals deliberately, so the
constant value can never silently diverge from what the UI receives.

Out of scope for this PR (separate cleanup): the finetune and
quantization job-status enums have the same anti-pattern with 14+
literal sites each, but predate this PR's work.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-23 12:35:44 +02:00
LocalAI [bot]
e4cc1f11f3 chore: ⬆️ Update ggml-org/llama.cpp to 1acee6bf8939948f9bcbf4b14034e4b475f06069 (#9952)
⬆️ Update ggml-org/llama.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 08:38:29 +02:00
LocalAI [bot]
6ed269d0b9 chore: ⬆️ Update ggml-org/whisper.cpp to 0ccd896f5b882628e1c077f9769735ef4ce52860 (#9954)
⬆️ Update ggml-org/whisper.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 08:37:26 +02:00
LocalAI [bot]
5756fb046d chore: ⬆️ Update leejet/stable-diffusion.cpp to 0baf721215f45335a5df8caf0ecb34e870c956e7 (#9955)
⬆️ Update leejet/stable-diffusion.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 08:37:10 +02:00
Copilot
7980629bc5 Fix backend manifest merge signing on current cosign releases (#9957)
* Initial plan

* fix: remove deprecated cosign bundle flag from backend merge workflow

Agent-Logs-Url: https://github.com/mudler/LocalAI/sessions/4207dabc-14ec-4655-9594-487338977fcf

Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-23 00:20:28 +02:00
69 changed files with 2482 additions and 223 deletions

View File

@@ -16,7 +16,8 @@ side (`pkg/oci/cosignverify` plus the gallery YAML).
per-arch manifest before checking signatures.
- **Storage:** Signatures are written as OCI 1.1 referrers
(`--registry-referrers-mode=oci-1-1`) in the new Sigstore bundle format
(`--new-bundle-format`). No `:sha256-<hex>.sig` tag clutter.
(current cosign releases do this by default; no `--new-bundle-format`
flag). No `:sha256-<hex>.sig` tag clutter.
- **Consumer:** `pkg/oci/cosignverify` discovers the bundle via the
referrers API, hands it to `sigstore-go`, and verifies it against the
policy declared in the gallery YAML (`Gallery.Verification`).
@@ -33,15 +34,14 @@ to sign. The job needs:
- `permissions: { id-token: write, contents: read }` at the job level so
the runner can exchange its GitHub OIDC token for a Fulcio cert.
- `sigstore/cosign-installer@v3` step (cosign ≥ 2.2 for
`--new-bundle-format`).
- `sigstore/cosign-installer@v3` step (current cosign releases already
default to the new bundle format).
- After each `docker buildx imagetools create`, resolve the resulting
list digest with `docker buildx imagetools inspect <tag> --format
'{{.Manifest.Digest}}'` and sign:
```sh
cosign sign --yes --recursive \
--new-bundle-format \
--registry-referrers-mode=oci-1-1 \
"${REGISTRY_REPO}@${DIGEST}"
```

View File

@@ -66,7 +66,8 @@ jobs:
# cosign signs each pushed manifest list with --recursive so the
# index and every per-arch entry get an attached Sigstore bundle.
# 2.2+ is required for --new-bundle-format.
# Recent cosign releases always emit the new bundle format, so
# there's no extra CLI flag to opt into it.
- name: Install cosign
if: github.event_name != 'pull_request'
uses: sigstore/cosign-installer@v3
@@ -153,7 +154,6 @@ jobs:
# manifest before checking signatures need the per-arch
# signatures, not just the list-level one.
cosign sign --yes --recursive \
--new-bundle-format \
--registry-referrers-mode=oci-1-1 \
"quay.io/go-skynet/local-ai-backends@${digest}"
@@ -180,7 +180,6 @@ jobs:
' <<< "$DOCKER_METADATA_OUTPUT_JSON")
digest=$(docker buildx imagetools inspect "$first_tag" --format '{{.Manifest.Digest}}')
cosign sign --yes --recursive \
--new-bundle-format \
--registry-referrers-mode=oci-1-1 \
"localai/localai-backends@${digest}"

View File

@@ -1,10 +1,10 @@
# ds4 backend Makefile.
#
# Upstream pin lives below as DS4_VERSION?=8d576642c39b9a2d782a80159ba84ef5a81c0b81
# Upstream pin lives below as DS4_VERSION?=444afce822057d87f14c4dec307dce24fd49b3ee
# (.github/bump_deps.sh) can find and update it - matches the
# llama-cpp / ik-llama-cpp / turboquant convention.
DS4_VERSION?=8d576642c39b9a2d782a80159ba84ef5a81c0b81
DS4_VERSION?=444afce822057d87f14c4dec307dce24fd49b3ee
DS4_REPO?=https://github.com/antirez/ds4
CURRENT_MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))

View File

@@ -1,5 +1,5 @@
IK_LLAMA_VERSION?=b3d39cff8bffbd67296d6badd4076a1486a0715c
IK_LLAMA_VERSION?=642c038ccdf3dd08e6d9ac6fdc3b1c311ebd8a02
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
CMAKE_ARGS?=

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=bb28c1fe246b72276ee1d00ce89306be7b865766
LLAMA_VERSION?=c0c7e147e7efa6c5858754b47259ba4880f8a906
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=3a8788cb7d74f185d6b18688e9563015524ecaf5
STABLEDIFFUSION_GGML_VERSION?=0baf721215f45335a5df8caf0ecb34e870c956e7
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=8443cf05e3fa8ce1b32348e1bcbcf8fc31f7f3ae
WHISPER_CPP_VERSION?=0ccd896f5b882628e1c077f9769735ef4ce52860
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -233,7 +233,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
xlog.Info("File stager initialized (HTTP direct transfer)")
}
// Create RemoteUnloaderAdapter — needed by SmartRouter and startup.go
remoteUnloader := nodes.NewRemoteUnloaderAdapter(registry, natsClient)
remoteUnloader := nodes.NewRemoteUnloaderAdapter(
registry,
natsClient,
cfg.Distributed.BackendInstallTimeoutOrDefault(),
cfg.Distributed.BackendUpgradeTimeoutOrDefault(),
)
// All dependencies ready — build SmartRouter with all options at once
var conflictResolver nodes.ConcurrencyConflictResolver

View File

@@ -17,9 +17,9 @@ import (
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/vram"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/sanitize"
@@ -200,7 +200,7 @@ func New(opts ...config.AppOption) (*Application, error) {
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),
)
application.galleryService.SetBackendManager(
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry),
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry, application.galleryService),
)
}
}

View File

@@ -78,7 +78,7 @@ func ModelAudioTransform(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}
@@ -104,7 +104,7 @@ func ModelAudioTransform(
data["sample_rate"] = res.SampleRate
data["samples"] = res.Samples
data["reference_provided"] = res.ReferenceProvided
if snippet := trace.AudioSnippet(dst); snippet != nil {
if snippet := trace.AudioSnippet(dst, appConfig.TracingMaxBodyBytes); snippet != nil {
maps.Copy(data, snippet)
}
}

View File

@@ -35,7 +35,7 @@ func Detection(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -67,7 +67,7 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, modelConf
}
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
traceData := map[string]any{
"input_text": trace.TruncateString(s, 1000),

View File

@@ -32,7 +32,7 @@ func FaceAnalyze(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -32,7 +32,7 @@ func FaceVerify(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -41,7 +41,7 @@ func ImageGeneration(height, width, step, seed int, positive_prompt, negative_pr
}
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
traceData := map[string]any{
"positive_prompt": positive_prompt,

View File

@@ -305,7 +305,7 @@ func ModelInference(ctx context.Context, s string, messages schema.Messages, ima
}
if o.EnableTracing {
trace.InitBackendTracingIfEnabled(o.TracingMaxItems)
trace.InitBackendTracingIfEnabled(o.TracingMaxItems, o.TracingMaxBodyBytes)
traceData := map[string]any{
"chat_template": c.TemplateConfig.Chat,
@@ -316,9 +316,13 @@ func ModelInference(ctx context.Context, s string, messages schema.Messages, ima
"audios_count": len(audios),
}
// Cap the captured fields up front: agent-pool LLM calls embed the
// full augmented chat history in messages and the full reply in
// response, so without a per-field cap a single trace can dwarf the
// rest of the buffer. The cap matches the API-trace body cap.
if len(messages) > 0 {
if msgJSON, err := json.Marshal(messages); err == nil {
traceData["messages"] = string(msgJSON)
traceData["messages"] = trace.TruncateToBytes(string(msgJSON), o.TracingMaxBodyBytes)
}
}
if reasoningJSON, err := json.Marshal(c.ReasoningConfig); err == nil {
@@ -337,7 +341,7 @@ func ModelInference(ctx context.Context, s string, messages schema.Messages, ima
resp, err := originalFn()
duration := time.Since(startTime)
traceData["response"] = resp.Response
traceData["response"] = trace.TruncateToBytes(resp.Response, o.TracingMaxBodyBytes)
traceData["token_usage"] = map[string]any{
"prompt": resp.Usage.Prompt,
"completion": resp.Usage.Completion,
@@ -359,10 +363,10 @@ func ModelInference(ctx context.Context, s string, messages schema.Messages, ima
toolCallCount += len(d.ToolCalls)
}
if len(contentParts) > 0 {
chatDeltasInfo["content"] = strings.Join(contentParts, "")
chatDeltasInfo["content"] = trace.TruncateToBytes(strings.Join(contentParts, ""), o.TracingMaxBodyBytes)
}
if len(reasoningParts) > 0 {
chatDeltasInfo["reasoning_content"] = strings.Join(reasoningParts, "")
chatDeltasInfo["reasoning_content"] = trace.TruncateToBytes(strings.Join(reasoningParts, ""), o.TracingMaxBodyBytes)
}
if toolCallCount > 0 {
chatDeltasInfo["tool_call_count"] = toolCallCount

View File

@@ -21,7 +21,7 @@ func recordModelLoadFailure(appConfig *config.ApplicationConfig, modelName, back
if !appConfig.EnableTracing {
return
}
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: time.Now(),
Type: trace.BackendTraceModelLoad,

View File

@@ -25,7 +25,7 @@ func Rerank(ctx context.Context, request *proto.RerankRequest, loader *model.Mod
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -98,7 +98,7 @@ func SoundGeneration(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -27,7 +27,7 @@ func ModelTokenize(s string, loader *model.ModelLoader, modelConfig config.Model
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -76,10 +76,10 @@ func ModelTranscriptionWithOptions(ctx context.Context, req TranscriptionRequest
var startTime time.Time
var audioSnippet map[string]any
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
// Capture audio before the backend call — the backend may delete the file.
audioSnippet = trace.AudioSnippet(req.Audio)
audioSnippet = trace.AudioSnippet(req.Audio, appConfig.TracingMaxBodyBytes)
}
r, err := transcriptionModel.AudioTranscription(ctx, req.toProto(uint32(*modelConfig.Threads)))

View File

@@ -67,7 +67,7 @@ func ModelTTS(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}
@@ -93,7 +93,7 @@ func ModelTTS(
"language": language,
}
if err == nil && res.Success {
if snippet := trace.AudioSnippet(filePath); snippet != nil {
if snippet := trace.AudioSnippet(filePath, appConfig.TracingMaxBodyBytes); snippet != nil {
maps.Copy(data, snippet)
}
}
@@ -161,7 +161,7 @@ func ModelTTSStream(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}
@@ -260,7 +260,7 @@ func ModelTTSStream(
"streaming": true,
}
if resultErr == nil && len(snippetPCM) > 0 {
if snippet := trace.AudioSnippetFromPCM(snippetPCM, int(sampleRate), totalPCMBytes); snippet != nil {
if snippet := trace.AudioSnippetFromPCM(snippetPCM, int(sampleRate), totalPCMBytes, appConfig.TracingMaxBodyBytes); snippet != nil {
maps.Copy(data, snippet)
}
}

View File

@@ -42,7 +42,7 @@ func VideoGeneration(height, width int32, prompt, negativePrompt, startImage, en
}
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
traceData := map[string]any{
"prompt": prompt,

View File

@@ -31,7 +31,7 @@ func VoiceAnalyze(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -34,7 +34,7 @@ func VoiceEmbed(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -32,7 +32,7 @@ func VoiceVerify(
var startTime time.Time
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems, appConfig.TracingMaxBodyBytes)
startTime = time.Now()
}

View File

@@ -39,19 +39,19 @@ type RunCMD struct {
LocalaiConfigDir string `env:"LOCALAI_CONFIG_DIR" type:"path" default:"${basepath}/configuration" help:"Directory for dynamic loading of certain configuration files (currently api_keys.json and external_backends.json)" group:"storage"`
LocalaiConfigDirPollInterval time.Duration `env:"LOCALAI_CONFIG_DIR_POLL_INTERVAL" help:"Typically the config path picks up changes automatically, but if your system has broken fsnotify events, set this to an interval to poll the LocalAI Config Dir (example: 1m)" group:"storage"`
// The alias on this option is there to preserve functionality with the old `--config-file` parameter
ModelsConfigFile string `env:"LOCALAI_MODELS_CONFIG_FILE,CONFIG_FILE" aliases:"config-file" help:"YAML file containing a list of model backend configs" group:"storage"`
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"backends" default:"${backends}"`
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of galleries" group:"models" default:"${galleries}"`
AutoloadGalleries bool `env:"LOCALAI_AUTOLOAD_GALLERIES,AUTOLOAD_GALLERIES" group:"models" default:"true"`
AutoloadBackendGalleries bool `env:"LOCALAI_AUTOLOAD_BACKEND_GALLERIES,AUTOLOAD_BACKEND_GALLERIES" group:"backends" default:"true"`
BackendImagesReleaseTag string `env:"LOCALAI_BACKEND_IMAGES_RELEASE_TAG,BACKEND_IMAGES_RELEASE_TAG" help:"Fallback release tag for backend images" group:"backends" default:"latest"`
BackendImagesBranchTag string `env:"LOCALAI_BACKEND_IMAGES_BRANCH_TAG,BACKEND_IMAGES_BRANCH_TAG" help:"Fallback branch tag for backend images" group:"backends" default:"master"`
BackendDevSuffix string `env:"LOCALAI_BACKEND_DEV_SUFFIX,BACKEND_DEV_SUFFIX" help:"Development suffix for backend images" group:"backends" default:"development"`
ModelsConfigFile string `env:"LOCALAI_MODELS_CONFIG_FILE,CONFIG_FILE" aliases:"config-file" help:"YAML file containing a list of model backend configs" group:"storage"`
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"backends" default:"${backends}"`
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of galleries" group:"models" default:"${galleries}"`
AutoloadGalleries bool `env:"LOCALAI_AUTOLOAD_GALLERIES,AUTOLOAD_GALLERIES" group:"models" default:"true"`
AutoloadBackendGalleries bool `env:"LOCALAI_AUTOLOAD_BACKEND_GALLERIES,AUTOLOAD_BACKEND_GALLERIES" group:"backends" default:"true"`
BackendImagesReleaseTag string `env:"LOCALAI_BACKEND_IMAGES_RELEASE_TAG,BACKEND_IMAGES_RELEASE_TAG" help:"Fallback release tag for backend images" group:"backends" default:"latest"`
BackendImagesBranchTag string `env:"LOCALAI_BACKEND_IMAGES_BRANCH_TAG,BACKEND_IMAGES_BRANCH_TAG" help:"Fallback branch tag for backend images" group:"backends" default:"master"`
BackendDevSuffix string `env:"LOCALAI_BACKEND_DEV_SUFFIX,BACKEND_DEV_SUFFIX" help:"Development suffix for backend images" group:"backends" default:"development"`
AutoUpgradeBackends bool `env:"LOCALAI_AUTO_UPGRADE_BACKENDS,AUTO_UPGRADE_BACKENDS" help:"Automatically upgrade backends when new versions are detected" group:"backends" default:"false"`
PreferDevelopmentBackends bool `env:"LOCALAI_PREFER_DEV_BACKENDS,PREFER_DEV_BACKENDS" help:"Prefer development backend versions (shows development backends by default in UI)" group:"backends" default:"false"`
PreloadModels string `env:"LOCALAI_PRELOAD_MODELS,PRELOAD_MODELS" help:"A List of models to apply in JSON at start" group:"models"`
Models []string `env:"LOCALAI_MODELS,MODELS" help:"A List of model configuration URLs to load" group:"models"`
PreloadModelsConfig string `env:"LOCALAI_PRELOAD_MODELS_CONFIG,PRELOAD_MODELS_CONFIG" help:"A List of models to apply at startup. Path to a YAML config file" group:"models"`
Models []string `env:"LOCALAI_MODELS,MODELS" help:"A List of model configuration URLs to load" group:"models"`
PreloadModelsConfig string `env:"LOCALAI_PRELOAD_MODELS_CONFIG,PRELOAD_MODELS_CONFIG" help:"A List of models to apply at startup. Path to a YAML config file" group:"models"`
F16 bool `name:"f16" env:"LOCALAI_F16,F16" help:"Enable GPU acceleration" group:"performance"`
Threads int `env:"LOCALAI_THREADS,THREADS" short:"t" help:"Number of threads used for parallel computation. Usage of the number of physical cores in the system is suggested" group:"performance"`
@@ -145,16 +145,18 @@ type RunCMD struct {
DefaultAPIKeyExpiry string `env:"LOCALAI_DEFAULT_API_KEY_EXPIRY" help:"Default expiry for API keys (e.g. 90d, 1y; empty = no expiry)" group:"auth"`
// Distributed / Horizontal Scaling
Distributed bool `env:"LOCALAI_DISTRIBUTED" default:"false" help:"Enable distributed mode (requires PostgreSQL + NATS)" group:"distributed"`
InstanceID string `env:"LOCALAI_INSTANCE_ID" help:"Unique instance ID for distributed mode (auto-generated UUID if empty)" group:"distributed"`
NatsURL string `env:"LOCALAI_NATS_URL" help:"NATS server URL (e.g., nats://localhost:4222)" group:"distributed"`
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3-compatible storage endpoint URL (e.g., http://minio:9000)" group:"distributed"`
StorageBucket string `env:"LOCALAI_STORAGE_BUCKET" default:"localai" help:"S3 bucket name for object storage" group:"distributed"`
StorageRegion string `env:"LOCALAI_STORAGE_REGION" default:"us-east-1" help:"S3 region" group:"distributed"`
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
Distributed bool `env:"LOCALAI_DISTRIBUTED" default:"false" help:"Enable distributed mode (requires PostgreSQL + NATS)" group:"distributed"`
InstanceID string `env:"LOCALAI_INSTANCE_ID" help:"Unique instance ID for distributed mode (auto-generated UUID if empty)" group:"distributed"`
NatsURL string `env:"LOCALAI_NATS_URL" help:"NATS server URL (e.g., nats://localhost:4222)" group:"distributed"`
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3-compatible storage endpoint URL (e.g., http://minio:9000)" group:"distributed"`
StorageBucket string `env:"LOCALAI_STORAGE_BUCKET" default:"localai" help:"S3 bucket name for object storage" group:"distributed"`
StorageRegion string `env:"LOCALAI_STORAGE_REGION" default:"us-east-1" help:"S3 region" group:"distributed"`
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
BackendInstallTimeout string `env:"LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT" help:"NATS round-trip timeout for backend.install requests sent to worker nodes (default 15m). Increase for slow links pulling multi-GB images." group:"distributed"`
BackendUpgradeTimeout string `env:"LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT" help:"NATS round-trip timeout for backend.upgrade requests (default 15m)." group:"distributed"`
Version bool
}
@@ -255,6 +257,20 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if r.StorageSecretKey != "" {
opts = append(opts, config.WithStorageSecretKey(r.StorageSecretKey))
}
if r.BackendInstallTimeout != "" {
d, err := time.ParseDuration(r.BackendInstallTimeout)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT %q: %w", r.BackendInstallTimeout, err)
}
opts = append(opts, config.WithBackendInstallTimeout(d))
}
if r.BackendUpgradeTimeout != "" {
d, err := time.ParseDuration(r.BackendUpgradeTimeout)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT %q: %w", r.BackendUpgradeTimeout, err)
}
opts = append(opts, config.WithBackendUpgradeTimeout(d))
}
if r.RegistrationToken != "" {
opts = append(opts, config.WithRegistrationToken(r.RegistrationToken))
}

View File

@@ -40,7 +40,10 @@ type DistributedConfig struct {
// model-row cleanup on MarkUnhealthy / MarkDraining).
DisablePerModelHealthCheck bool
MCPCIJobTimeout time.Duration // MCP CI job execution timeout (default 10m)
MCPCIJobTimeout time.Duration // MCP CI job execution timeout (default 10m)
BackendInstallTimeout time.Duration // NATS round-trip timeout for backend.install (default 15m)
BackendUpgradeTimeout time.Duration // NATS round-trip timeout for backend.upgrade (default 15m)
MaxUploadSize int64 // Maximum upload body size in bytes (default 50 GB)
@@ -68,13 +71,15 @@ func (c DistributedConfig) Validate() error {
}
// Check for negative durations
for name, d := range map[string]time.Duration{
"mcp-tool-timeout": c.MCPToolTimeout,
"mcp-discovery-timeout": c.MCPDiscoveryTimeout,
"worker-wait-timeout": c.WorkerWaitTimeout,
"drain-timeout": c.DrainTimeout,
"health-check-interval": c.HealthCheckInterval,
"stale-node-threshold": c.StaleNodeThreshold,
"mcp-ci-job-timeout": c.MCPCIJobTimeout,
FlagMCPToolTimeout: c.MCPToolTimeout,
FlagMCPDiscoveryTimeout: c.MCPDiscoveryTimeout,
FlagWorkerWaitTimeout: c.WorkerWaitTimeout,
FlagDrainTimeout: c.DrainTimeout,
FlagHealthCheckInterval: c.HealthCheckInterval,
FlagStaleNodeThreshold: c.StaleNodeThreshold,
FlagMCPCIJobTimeout: c.MCPCIJobTimeout,
FlagBackendInstallTimeout: c.BackendInstallTimeout,
FlagBackendUpgradeTimeout: c.BackendUpgradeTimeout,
} {
if d < 0 {
return fmt.Errorf("%s must not be negative", name)
@@ -137,24 +142,66 @@ func WithStorageSecretKey(key string) AppOption {
}
}
func WithBackendInstallTimeout(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.BackendInstallTimeout = d
}
}
func WithBackendUpgradeTimeout(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.BackendUpgradeTimeout = d
}
}
var EnableAutoApproveNodes = func(o *ApplicationConfig) {
o.Distributed.AutoApproveNodes = true
}
// Flag names for distributed timeout / interval configuration. These are
// the kebab-case identifiers kong derives from the matching RunCMD struct
// fields; they appear in Validate error messages and any other operator-
// facing surface that needs to reference a specific knob by name. Keeping
// them as constants prevents the string from drifting from the actual
// flag a future rename would produce.
const (
FlagMCPToolTimeout = "mcp-tool-timeout"
FlagMCPDiscoveryTimeout = "mcp-discovery-timeout"
FlagWorkerWaitTimeout = "worker-wait-timeout"
FlagDrainTimeout = "drain-timeout"
FlagHealthCheckInterval = "health-check-interval"
FlagStaleNodeThreshold = "stale-node-threshold"
FlagMCPCIJobTimeout = "mcp-ci-job-timeout"
FlagBackendInstallTimeout = "backend-install-timeout"
FlagBackendUpgradeTimeout = "backend-upgrade-timeout"
)
// Defaults for distributed timeouts.
const (
DefaultMCPToolTimeout = 360 * time.Second
DefaultMCPDiscoveryTimeout = 60 * time.Second
DefaultWorkerWaitTimeout = 5 * time.Minute
DefaultDrainTimeout = 30 * time.Second
DefaultHealthCheckInterval = 15 * time.Second
DefaultStaleNodeThreshold = 60 * time.Second
DefaultMCPCIJobTimeout = 10 * time.Minute
DefaultMCPToolTimeout = 360 * time.Second
DefaultMCPDiscoveryTimeout = 60 * time.Second
DefaultWorkerWaitTimeout = 5 * time.Minute
DefaultDrainTimeout = 30 * time.Second
DefaultHealthCheckInterval = 15 * time.Second
DefaultStaleNodeThreshold = 60 * time.Second
DefaultMCPCIJobTimeout = 10 * time.Minute
DefaultBackendInstallTimeout = 15 * time.Minute
DefaultBackendUpgradeTimeout = 15 * time.Minute
)
// DefaultMaxUploadSize is the default maximum upload body size (50 GB).
const DefaultMaxUploadSize int64 = 50 << 30
// BackendInstallTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendInstallTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendInstallTimeout, DefaultBackendInstallTimeout)
}
// BackendUpgradeTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendUpgradeTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendUpgradeTimeout, DefaultBackendUpgradeTimeout)
}
// MCPToolTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) MCPToolTimeoutOrDefault() time.Duration {
return cmp.Or(c.MCPToolTimeout, DefaultMCPToolTimeout)

View File

@@ -0,0 +1,90 @@
package config_test
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
)
var _ = Describe("DistributedConfig backend NATS timeouts", func() {
Context("BackendInstallTimeoutOrDefault", func() {
It("returns 15 minutes when unset", func() {
c := config.DistributedConfig{}
Expect(c.BackendInstallTimeoutOrDefault()).To(Equal(15 * time.Minute))
})
It("returns the configured value when set", func() {
c := config.DistributedConfig{BackendInstallTimeout: 42 * time.Minute}
Expect(c.BackendInstallTimeoutOrDefault()).To(Equal(42 * time.Minute))
})
})
Context("BackendUpgradeTimeoutOrDefault", func() {
It("returns 15 minutes when unset", func() {
c := config.DistributedConfig{}
Expect(c.BackendUpgradeTimeoutOrDefault()).To(Equal(15 * time.Minute))
})
It("returns the configured value when set", func() {
c := config.DistributedConfig{BackendUpgradeTimeout: 30 * time.Minute}
Expect(c.BackendUpgradeTimeoutOrDefault()).To(Equal(30 * time.Minute))
})
})
})
var _ = Describe("DistributedConfig flag-name constants", func() {
// Pin the kebab-case strings so a rename of the Go field name (or a
// CLI flag naming convention change) forces the constant to update,
// keeping the Validate error messages and any future operator-facing
// surface in sync with the actual CLI flag.
DescribeTable("flag name constants",
func(actual, expected string) {
Expect(actual).To(Equal(expected))
},
Entry("MCP tool timeout", config.FlagMCPToolTimeout, "mcp-tool-timeout"),
Entry("MCP discovery timeout", config.FlagMCPDiscoveryTimeout, "mcp-discovery-timeout"),
Entry("worker wait timeout", config.FlagWorkerWaitTimeout, "worker-wait-timeout"),
Entry("drain timeout", config.FlagDrainTimeout, "drain-timeout"),
Entry("health check interval", config.FlagHealthCheckInterval, "health-check-interval"),
Entry("stale node threshold", config.FlagStaleNodeThreshold, "stale-node-threshold"),
Entry("MCP CI job timeout", config.FlagMCPCIJobTimeout, "mcp-ci-job-timeout"),
Entry("backend install timeout", config.FlagBackendInstallTimeout, "backend-install-timeout"),
Entry("backend upgrade timeout", config.FlagBackendUpgradeTimeout, "backend-upgrade-timeout"),
)
})
var _ = Describe("DistributedConfig.Validate negative-duration errors", func() {
It("rejects a negative BackendInstallTimeout with the flag name in the error", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
BackendInstallTimeout: -1 * time.Second,
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring(config.FlagBackendInstallTimeout))
Expect(err.Error()).To(ContainSubstring("must not be negative"))
})
It("rejects a negative BackendUpgradeTimeout with the flag name in the error", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
BackendUpgradeTimeout: -1 * time.Second,
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring(config.FlagBackendUpgradeTimeout))
})
It("accepts all-zero durations as valid (defaults apply)", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
}
Expect(c.Validate()).To(Succeed())
})
})

View File

@@ -52,11 +52,22 @@ test.describe('Traces Settings', () => {
await page.locator('button', { hasText: 'Tracing is' }).click()
await expect(page.locator('text=Enable Tracing')).toBeVisible()
const maxItemsInput = page.locator('input[type="number"]')
// The Tracing panel has two numeric inputs (Max Items and Max Body Bytes).
// Disambiguate by placeholder so adding a third field later doesn't break this.
const maxItemsInput = page.getByPlaceholder('100')
await maxItemsInput.fill('500')
await expect(maxItemsInput).toHaveValue('500')
})
test('set max body bytes value', async ({ page }) => {
await page.locator('button', { hasText: 'Tracing is' }).click()
await expect(page.locator('text=Enable Tracing')).toBeVisible()
const maxBodyBytesInput = page.getByPlaceholder('65536')
await maxBodyBytesInput.fill('16384')
await expect(maxBodyBytesInput).toHaveValue('16384')
})
test('save shows toast', async ({ page }) => {
// Expand settings
await page.locator('button', { hasText: 'Tracing is' }).click()

View File

@@ -649,6 +649,7 @@
align-items: center;
gap: var(--spacing-md);
padding: var(--spacing-xs) 0;
flex-wrap: wrap;
}
.operation-info {
@@ -739,6 +740,110 @@
color: var(--color-error);
}
/* Operations bar: per-node breakdown (multi-worker installs) */
.operation-expand {
background: none;
border: none;
color: var(--color-text-muted);
cursor: pointer;
padding: 0 var(--spacing-xs);
font-size: var(--text-xs);
display: inline-flex;
align-items: center;
gap: 0.25rem;
}
.operation-expand:hover {
color: var(--color-text-primary);
}
.operation-expand-label {
font-size: var(--text-xs);
}
.operation-nodes-list {
list-style: none;
margin: var(--spacing-xs) 0 0;
padding: var(--spacing-xs) 0 0;
border-top: 1px solid var(--color-border-subtle);
flex-basis: 100%;
width: 100%;
}
.operation-node {
display: flex;
align-items: center;
gap: var(--spacing-sm);
padding: var(--spacing-xs) 0;
font-size: var(--text-xs);
color: var(--color-text-muted);
flex-wrap: wrap;
}
.operation-node-status {
padding: 2px 6px;
border-radius: var(--radius-md);
font-size: 0.65rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.025em;
white-space: nowrap;
}
.operation-node-status-success {
background: var(--color-success-light);
color: var(--color-success);
}
.operation-node-status-error {
background: var(--color-error-light);
color: var(--color-error);
}
.operation-node-status-queued {
background: var(--color-bg-tertiary);
color: var(--color-text-muted);
}
.operation-node-status-running_on_worker {
background: var(--color-warning-light);
color: var(--color-warning);
}
.operation-node-status-downloading {
background: var(--color-primary-light);
color: var(--color-primary);
}
.operation-node-name {
font-weight: 500;
color: var(--color-text-secondary);
}
.operation-node-file {
font-family: var(--font-mono);
color: var(--color-text-tertiary);
overflow: hidden;
text-overflow: ellipsis;
max-width: 30ch;
white-space: nowrap;
}
.operation-node-bytes {
font-variant-numeric: tabular-nums;
color: var(--color-text-tertiary);
}
.operation-node-pct {
font-variant-numeric: tabular-nums;
color: var(--color-primary);
font-weight: 500;
}
.operation-node-error {
color: var(--color-error);
}
.operation-node-bar-container {
flex-basis: 100%;
height: 3px;
background: var(--color-surface-sunken);
border-radius: var(--radius-full);
overflow: hidden;
margin-top: 0.25rem;
}
.operation-node-bar {
height: 100%;
background: var(--color-primary);
border-radius: var(--radius-full);
transition: width var(--duration-slow, 0.3s) var(--ease-spring, ease);
}
/* Toast */
.toast-container {
position: fixed;

View File

@@ -1,14 +1,33 @@
import { useState } from 'react'
import { useOperations } from '../hooks/useOperations'
const nodeStatusLabels = {
success: 'Done',
error: 'Failed',
queued: 'Queued',
running_on_worker: 'Worker busy',
downloading: 'Downloading',
}
const runningOnWorkerTooltip = 'NATS round-trip timed out, but the worker is still installing in the background. The reconciler will confirm completion.'
export default function OperationsBar() {
const { operations, cancelOperation, dismissFailedOp } = useOperations()
const [expanded, setExpanded] = useState({})
if (operations.length === 0) return null
const toggle = (key) => setExpanded((m) => ({ ...m, [key]: !m[key] }))
return (
<div className="operations-bar">
{operations.map(op => (
<div key={op.jobID || op.id} className="operation-item">
{operations.map(op => {
const key = op.jobID || op.id
const nodes = Array.isArray(op.nodes) ? op.nodes : []
const canExpand = nodes.length > 1
const isOpen = !!expanded[key]
return (
<div key={key} className="operation-item">
<div className="operation-info">
{op.error ? (
<i className="fas fa-circle-exclamation" style={{ color: 'var(--color-error)', marginRight: 'var(--spacing-xs)' }} />
@@ -80,8 +99,55 @@ export default function OperationsBar() {
<i className="fas fa-xmark" />
</button>
) : null}
{canExpand && (
<button
type="button"
className="operation-expand"
onClick={() => toggle(key)}
aria-expanded={isOpen}
title={isOpen ? 'Hide per-node detail' : `Show ${nodes.length} nodes`}
>
<i className={`fas fa-chevron-${isOpen ? 'up' : 'down'}`} />
<span className="operation-expand-label">{nodes.length} nodes</span>
</button>
)}
{canExpand && isOpen && (
<ul className="operation-nodes-list">
{nodes.map((n) => (
<li key={n.node_id} className={`operation-node operation-node-${n.status}`}>
<span
className={`operation-node-status operation-node-status-${n.status}`}
title={n.status === 'running_on_worker' ? runningOnWorkerTooltip : undefined}
>
{nodeStatusLabels[n.status] || n.status}
</span>
<span className="operation-node-name">{n.node_name || n.node_id}</span>
{n.file_name && <span className="operation-node-file">{n.file_name}</span>}
{(n.current || n.total) && (
<span className="operation-node-bytes">
{n.current || '?'} / {n.total || '?'}
</span>
)}
{n.percentage > 0 && (
<span className="operation-node-pct">{Math.round(n.percentage)}%</span>
)}
{n.error && (
<span className="operation-node-error" title={n.error}>
{n.error.length > 80 ? n.error.slice(0, 80) + '...' : n.error}
</span>
)}
{n.percentage > 0 && n.percentage < 100 && (
<div className="operation-node-bar-container">
<div className="operation-node-bar" style={{ width: `${n.percentage}%` }} />
</div>
)}
</li>
))}
</ul>
)}
</div>
))}
)
})}
</div>
)
}

View File

@@ -435,6 +435,9 @@ export default function Settings() {
<SettingRow label="Max Items" description="Maximum number of trace items to retain (0 = unlimited)">
<input className="input" type="number" style={{ width: 120 }} value={settings.tracing_max_items ?? ''} onChange={(e) => update('tracing_max_items', parseInt(e.target.value) || 0)} placeholder="100" disabled={!settings.enable_tracing} />
</SettingRow>
<SettingRow label="Max Body Bytes" description="Per-field cap (bytes) for captured request/response bodies and backend trace Data fields. Prevents large LLM histories or TTS audio snippets from locking the Traces UI. 0 = uncapped.">
<input className="input" type="number" style={{ width: 120 }} value={settings.tracing_max_body_bytes ?? ''} onChange={(e) => update('tracing_max_body_bytes', parseInt(e.target.value) || 0)} placeholder="65536" disabled={!settings.enable_tracing} />
</SettingRow>
<SettingRow label="Enable Backend Logging" description="Capture backend process output per model (without requiring debug mode)">
<Toggle checked={settings.enable_backend_logging} onChange={(v) => update('enable_backend_logging', v)} />
</SettingRow>

View File

@@ -470,6 +470,17 @@ export default function Traces() {
disabled={!settings.enable_tracing}
/>
</SettingRow>
<SettingRow label="Max Body Bytes" description="Per-field cap for captured bodies and backend trace Data (0 = uncapped). Prevents oversized LLM histories or TTS snippets from locking this page in loading.">
<input
className="input"
type="number"
style={{ width: 120 }}
value={settings.tracing_max_body_bytes ?? ''}
onChange={(e) => setSettings(prev => ({ ...prev, tracing_max_body_bytes: parseInt(e.target.value) || 0 }))}
placeholder="65536"
disabled={!settings.enable_tracing}
/>
</SettingRow>
<SettingRow label="Enable Backend Logging" description="Capture backend process output per model (without requiring debug mode)">
<Toggle
checked={settings.enable_backend_logging}

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"slices"
"sort"
"strconv"
"strings"
"time"
@@ -57,7 +58,6 @@ var usecaseFilters = map[string]config.ModelConfigUsecase{
config.UsecaseRealtimeAudio: config.FLAG_REALTIME_AUDIO,
}
// extractHFRepo tries to find a HuggingFace repo ID from model overrides or URLs.
func extractHFRepo(overrides map[string]any, urls []string) string {
if overrides != nil {
@@ -257,6 +257,44 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
if status != nil && status.Error != nil {
opData["error"] = status.Error.Error()
}
// Expose the per-node breakdown when the Phase 4 progress sink
// has populated OpStatus.Nodes (distributed backend installs).
// We sort by node_name for stable UI rendering across polls;
// the underlying slice is order-dependent on UpdateNodeProgress
// arrival order, which the UI must not depend on. Single-node
// ops and model installs leave Nodes empty so this block emits
// no key, preserving the legacy payload shape.
if status != nil && len(status.Nodes) > 0 {
nodes := make([]map[string]any, 0, len(status.Nodes))
for _, n := range status.Nodes {
entry := map[string]any{
"node_id": n.NodeID,
"node_name": n.NodeName,
"status": n.Status,
"percentage": n.Percentage,
}
if n.FileName != "" {
entry["file_name"] = n.FileName
}
if n.Current != "" {
entry["current"] = n.Current
}
if n.Total != "" {
entry["total"] = n.Total
}
if n.Phase != "" {
entry["phase"] = n.Phase
}
if n.Error != "" {
entry["error"] = n.Error
}
nodes = append(nodes, entry)
}
sort.SliceStable(nodes, func(i, j int) bool {
return fmt.Sprintf("%v", nodes[i]["node_name"]) < fmt.Sprintf("%v", nodes[j]["node_name"])
})
opData["nodes"] = nodes
}
operations = append(operations, opData)
}
@@ -557,11 +595,11 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
NodeStatus string `json:"node_status"`
}
type modelCapability struct {
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
// LoadedOn is populated only when the node registry is active
// (distributed mode). Lets the UI show "loaded on worker-1" without
// the operator having to expand every node manually. An empty slice
@@ -1159,17 +1197,17 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
}
return c.JSON(200, map[string]any{
"backends": backendsJSON,
"repositories": appConfig.BackendGalleries,
"allTags": tags,
"processingBackends": processingBackendsData,
"taskTypes": taskTypes,
"availableBackends": totalBackends,
"installedBackends": installedBackendsCount,
"currentPage": pageNum,
"totalPages": totalPages,
"prevPage": prevPage,
"nextPage": nextPage,
"backends": backendsJSON,
"repositories": appConfig.BackendGalleries,
"allTags": tags,
"processingBackends": processingBackendsData,
"taskTypes": taskTypes,
"availableBackends": totalBackends,
"installedBackends": installedBackendsCount,
"currentPage": pageNum,
"totalPages": totalPages,
"prevPage": prevPage,
"nextPage": nextPage,
"systemCapability": detectedCapability,
"preferDevelopmentBackends": appConfig.PreferDevelopmentBackends,
})
@@ -1599,4 +1637,3 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
app.DELETE("/api/branding/asset/:kind", localai.DeleteBrandingAssetEndpoint(appConfig), adminMiddleware)
}

View File

@@ -62,6 +62,63 @@ var _ = Describe("/api/operations with node-scoped backend ops", func() {
Expect(found["isBackend"]).To(Equal(true))
})
It("surfaces per-node OpStatus entries on /api/operations", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)
opcache := galleryop.NewOpCache(galleryService)
jobID := "test-op-nodes-1"
// Register a backend op so the handler treats this as a backend
// install (no need to consult the gallery during the test).
opcache.SetBackend("vllm", jobID)
// Populate per-node entries via the P4.2 helper. The helper also
// allocates an OpStatus under jobID, which the handler will read.
galleryService.UpdateNodeProgress(jobID, "node-b", galleryop.NodeProgress{
NodeID: "node-b", NodeName: "worker-b", Status: galleryop.NodeStatusRunningOnWorker,
})
galleryService.UpdateNodeProgress(jobID, "node-a", galleryop.NodeProgress{
NodeID: "node-a", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 30, FileName: "vllm.tar",
})
e := echo.New()
routes.RegisterUIAPIRoutes(e, nil, nil, appCfg, galleryService, opcache, &application.Application{}, noopMw)
req := httptest.NewRequest(http.MethodGet, "/api/operations", nil)
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
Expect(rec.Code).To(Equal(http.StatusOK))
var envelope struct {
Operations []map[string]any `json:"operations"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &envelope)).To(Succeed())
var found map[string]any
for _, op := range envelope.Operations {
if op["jobID"] == jobID {
found = op
break
}
}
Expect(found).ToNot(BeNil(), "operation should appear in /api/operations")
nodes, ok := found["nodes"].([]any)
Expect(ok).To(BeTrue(), "operation should have a nodes array")
Expect(nodes).To(HaveLen(2))
// Stable sort by node_name: "worker-a" comes before "worker-b"
// even though UpdateNodeProgress was called in reverse order.
first := nodes[0].(map[string]any)
Expect(first["node_name"]).To(Equal("worker-a"))
Expect(first["status"]).To(Equal("downloading"))
Expect(first["file_name"]).To(Equal("vllm.tar"))
Expect(first["percentage"]).To(Equal(30.0))
second := nodes[1].(map[string]any)
Expect(second["node_name"]).To(Equal("worker-b"))
Expect(second["status"]).To(Equal("running_on_worker"))
})
It("does not emit nodeID for non-node-scoped backend ops", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)

View File

@@ -91,6 +91,21 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend,
})
return err
}
if errors.Is(err, ErrWorkerStillInstalling) {
// Soft failure: at least one worker timed out replying but is
// still running the install in the background. Mark the op as
// processed with a non-error message so the admin UI shows a
// yellow in-progress state rather than red. The reconciler's
// next pass will reconcile the actual outcome via backend.list.
xlog.Info("worker still installing in background", "backend", op.GalleryElementName, "error", err)
g.UpdateStatus(op.ID, &OpStatus{
Processed: true,
GalleryElementName: op.GalleryElementName,
Message: fmt.Sprintf("backend %s: worker still installing in background; reconciler will confirm completion (%v)", op.GalleryElementName, err),
Cancellable: false,
})
return nil
}
xlog.Error("error installing backend", "error", err, "backend", op.GalleryElementName)
if !op.Delete {
// If we didn't install the backend, we need to make sure we don't have a leftover directory

View File

@@ -0,0 +1,13 @@
package galleryop
import "errors"
// ErrWorkerStillInstalling indicates a distributed backend install
// timed out at the NATS round-trip layer but the worker is most likely
// still pulling the OCI image in the background. Producers
// (DistributedBackendManager) wrap this when the round-trip times out;
// consumers (backendHandler) use errors.Is(err, ErrWorkerStillInstalling)
// to surface a yellow "in progress" OpStatus instead of a red error,
// leaving the pending_backend_ops row in place for the reconciler to
// confirm via backend.list.
var ErrWorkerStillInstalling = errors.New("worker did not reply in time; install may still be running in the background")

View File

@@ -0,0 +1,149 @@
package galleryop_test
import (
"encoding/json"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
)
var _ = Describe("NodeStatus constants", func() {
// Pin the wire-format string values. A future refactor that renames
// a constant must NOT silently change the JSON value the UI receives
// (or the cross-package contract with the nodes package, which
// reuses these constants for NodeOpStatus.Status).
DescribeTable("status constant",
func(actual, expected string) {
Expect(actual).To(Equal(expected))
},
Entry("queued", galleryop.NodeStatusQueued, "queued"),
Entry("downloading", galleryop.NodeStatusDownloading, "downloading"),
Entry("running on worker", galleryop.NodeStatusRunningOnWorker, "running_on_worker"),
Entry("success", galleryop.NodeStatusSuccess, "success"),
Entry("error", galleryop.NodeStatusError, "error"),
)
})
var _ = Describe("OpStatus.Nodes", func() {
It("defaults to empty on a fresh OpStatus", func() {
os := &galleryop.OpStatus{}
Expect(os.Nodes).To(BeEmpty())
})
It("JSON round-trips with all NodeProgress fields", func() {
os := &galleryop.OpStatus{
Nodes: []galleryop.NodeProgress{
{
NodeID: "node-1",
NodeName: "worker-a",
Status: galleryop.NodeStatusRunningOnWorker,
FileName: "vllm.tar.zst",
Current: "412 MB",
Total: "2.1 GB",
Percentage: 19.6,
Phase: "downloading", // literal pins the wire-format value
Error: "",
},
},
}
raw, err := json.Marshal(os)
Expect(err).ToNot(HaveOccurred())
got := &galleryop.OpStatus{}
Expect(json.Unmarshal(raw, got)).To(Succeed())
Expect(got.Nodes).To(HaveLen(1))
Expect(got.Nodes[0]).To(Equal(os.Nodes[0]))
})
})
var _ = Describe("GalleryService.UpdateNodeProgress", func() {
var svc *galleryop.GalleryService
BeforeEach(func() {
// UpdateNodeProgress + GetStatus only touch the in-memory statuses
// map. A zero-value ApplicationConfig is enough to get past the
// LocalModelManager / LocalBackendManager constructors.
svc = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
})
It("creates a node entry on first call", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 12.0,
})
st := svc.GetStatus("op1")
Expect(st).ToNot(BeNil())
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].NodeID).To(Equal("n1"))
Expect(st.Nodes[0].Percentage).To(Equal(12.0))
})
It("merges subsequent updates into the same NodeID entry, not appending", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 48.0, FileName: "vllm.tar"})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].Percentage).To(Equal(48.0))
Expect(st.Nodes[0].FileName).To(Equal("vllm.tar"))
})
It("appends a new entry for a different NodeID", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n2", galleryop.NodeProgress{NodeID: "n2", NodeName: "worker-b", Status: galleryop.NodeStatusQueued})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(2))
})
It("mirrors the latest tick into the aggregate OpStatus fields", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading,
Percentage: 33.0, FileName: "vllm.tar", Current: "330 MB", Total: "1 GB",
})
st := svc.GetStatus("op1")
Expect(st.Progress).To(Equal(33.0))
Expect(st.FileName).To(Equal("vllm.tar"))
Expect(st.DownloadedFileSize).To(Equal("330 MB"))
Expect(st.TotalFileSize).To(Equal("1 GB"))
})
It("preserves accumulated Nodes when a subsequent UpdateStatus comes through the legacy path", func() {
// Regression: the Phase 2 progress bridge also calls the legacy
// progressCb -> UpdateStatus(opID, &OpStatus{...}) on every tick.
// Without preservation that overwrite would wipe the Nodes slice
// and the UI would flicker between one node and another on a
// multi-worker install. UpdateStatus must carry forward existing
// Nodes when the incoming op has none.
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusSuccess})
svc.UpdateNodeProgress("op1", "n2", galleryop.NodeProgress{NodeID: "n2", NodeName: "worker-b", Status: galleryop.NodeStatusDownloading, Percentage: 30.0})
// Now simulate the legacy progressCb path: a fresh OpStatus
// pointer with no Nodes set, carrying only aggregate fields.
svc.UpdateStatus("op1", &galleryop.OpStatus{
Progress: 30.0,
Message: "downloading",
})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(2), "Nodes accumulated before the legacy UpdateStatus must be preserved")
ids := []string{st.Nodes[0].NodeID, st.Nodes[1].NodeID}
Expect(ids).To(ConsistOf("n1", "n2"))
})
It("allows an explicit empty-then-populated Nodes transition to win when caller sets Nodes", func() {
// If a caller explicitly passes a non-empty Nodes slice on the
// incoming op, that should replace the existing slice (no merge).
// Only an EMPTY incoming slice triggers the carry-forward.
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusSuccess})
svc.UpdateStatus("op1", &galleryop.OpStatus{
Progress: 100.0,
Nodes: []galleryop.NodeProgress{
{NodeID: "n9", NodeName: "worker-final", Status: galleryop.NodeStatusSuccess},
},
})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].NodeID).To(Equal("n9"))
})
})

View File

@@ -53,6 +53,45 @@ type OpStatus struct {
GalleryElementName string `json:"gallery_element_name"`
Cancelled bool `json:"cancelled"` // Cancelled is true if the operation was cancelled
Cancellable bool `json:"cancellable"` // Cancellable is true if the operation can be cancelled
// Nodes is the per-node breakdown for a fanned-out backend install.
// Populated by DistributedBackendManager (per-node terminal status)
// and by the Phase 2 progress bridge (per-byte ticks). The
// /api/operations handler surfaces this so the UI can render an
// expandable per-node view of an in-flight install.
Nodes []NodeProgress `json:"nodes,omitempty"`
}
// NodeStatus values shared between NodeProgress (per-node tick) and the
// NodeOpStatus surfaced by DistributedBackendManager's fan-out. Defined
// as exported constants so producers (the manager, the progress bridge)
// and consumers (the /api/operations handler, the React OperationsBar
// through its JSON contract) stay in sync via a single source of truth.
const (
NodeStatusQueued = "queued" // node accepted the intent but install has not started
NodeStatusDownloading = "downloading" // worker is actively pulling the OCI image
NodeStatusRunningOnWorker = "running_on_worker" // NATS round-trip timed out but worker is still installing
NodeStatusSuccess = "success" // install completed on this node
NodeStatusError = "error" // install failed on this node
)
// NodeProgress is a single node's contribution to a backend install
// operation. Populated by DistributedBackendManager (per-node terminal
// status) and by the Phase 2 progress bridge (per-byte ticks). Read by
// the /api/operations handler so the UI can render an expandable
// per-node breakdown.
//
// Status holds one of the NodeStatus* constants above.
type NodeProgress struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"`
FileName string `json:"file_name,omitempty"`
Current string `json:"current,omitempty"`
Total string `json:"total,omitempty"`
Percentage float64 `json:"percentage"`
Phase string `json:"phase,omitempty"`
Error string `json:"error,omitempty"`
}
type OpCache struct {

View File

@@ -110,6 +110,18 @@ func (g *GalleryService) DeleteBackend(name string) error {
func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
g.Lock()
defer g.Unlock()
// Preserve any per-node entries already accumulated by UpdateNodeProgress:
// the legacy progressCb path (used by the Phase 2 install bridge) calls
// UpdateStatus with a fresh *OpStatus on every tick, which would otherwise
// wipe the Nodes slice and leave the UI flickering between one node and
// another. If the caller explicitly populates Nodes on the incoming op,
// that wins; an empty Nodes slice on the incoming op is treated as "no
// new per-node data" and the previous Nodes are carried forward.
if op != nil && len(op.Nodes) == 0 {
if prev := g.statuses[s]; prev != nil && len(prev.Nodes) > 0 {
op.Nodes = prev.Nodes
}
}
g.statuses[s] = op
// Persist to PostgreSQL in distributed mode
@@ -135,6 +147,47 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
}
}
// UpdateNodeProgress merges a per-node progress tick into OpStatus.Nodes,
// keyed by nodeID, and mirrors the latest values into the aggregate
// Progress / FileName / DownloadedFileSize / TotalFileSize / Message
// fields so the legacy single-bar OperationsBar view keeps working
// unchanged alongside the new per-node breakdown.
//
// We deliberately do NOT delegate the aggregate mirror to UpdateStatus
// here: UpdateStatus overwrites the entire OpStatus, which would clobber
// the Nodes slice we just merged into. Doing the merge + mirror under a
// single lock keeps both views consistent and concurrent-safe.
func (g *GalleryService) UpdateNodeProgress(opID, nodeID string, np NodeProgress) {
g.Lock()
defer g.Unlock()
status := g.statuses[opID]
if status == nil {
status = &OpStatus{}
g.statuses[opID] = status
}
merged := false
for i := range status.Nodes {
if status.Nodes[i].NodeID == nodeID {
status.Nodes[i] = np
merged = true
break
}
}
if !merged {
status.Nodes = append(status.Nodes, np)
}
// Mirror the latest tick into the legacy aggregate fields so the
// existing single-bar UI keeps rendering meaningful progress.
status.FileName = np.FileName
status.Progress = np.Percentage
status.DownloadedFileSize = np.Current
status.TotalFileSize = np.Total
if np.Phase != "" {
status.Message = np.Phase
}
}
func (g *GalleryService) GetStatus(s string) *OpStatus {
g.Lock()
defer g.Unlock()

View File

@@ -0,0 +1,36 @@
package messaging
// Phase values published on the BackendInstallProgressEvent.Phase field.
// Defined as exported constants so producer (worker install handler) and
// consumer (master bridge into OpStatus) share a single source of truth
// instead of two copies of the literal string.
const (
PhaseResolving = "resolving" // worker is locating the gallery / image manifest
PhaseDownloading = "downloading" // worker is actively pulling layers
PhaseExtracting = "extracting" // worker is unpacking the downloaded archive
PhaseStarting = "starting" // worker is spawning the gRPC backend process
)
// BackendInstallProgressEvent is the wire payload published by a worker to
// nodes.<nodeID>.backend.install.<opID>.progress while a long-running install
// is in flight. Transient: dropped events are acceptable, the master relies
// on BackendInstallReply for ground truth on success/failure.
//
// Phase holds one of the Phase* constants above.
type BackendInstallProgressEvent struct {
OpID string `json:"op_id"`
NodeID string `json:"node_id"`
Backend string `json:"backend"`
FileName string `json:"file_name,omitempty"`
Current string `json:"current,omitempty"` // human-readable size, e.g. "412 MB"
Total string `json:"total,omitempty"` // human-readable size, e.g. "2.1 GB"
Percentage float64 `json:"percentage"`
Phase string `json:"phase,omitempty"`
}
// SubjectNodeBackendInstallProgress returns the NATS subject for transient
// progress events emitted by a worker during a single backend.install run.
// Per-op so multiple concurrent installs on the same node never alias.
func SubjectNodeBackendInstallProgress(nodeID, opID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.install." + sanitizeSubjectToken(opID) + ".progress"
}

View File

@@ -0,0 +1,66 @@
package messaging_test
import (
"encoding/json"
"strings"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("Phase constants", func() {
// Pin the wire-format string values. A future refactor that renames
// a constant must NOT silently change the JSON value the master
// receives or break consumers that switch on Phase.
DescribeTable("phase constant",
func(actual, expected string) {
Expect(actual).To(Equal(expected))
},
Entry("resolving", messaging.PhaseResolving, "resolving"),
Entry("downloading", messaging.PhaseDownloading, "downloading"),
Entry("extracting", messaging.PhaseExtracting, "extracting"),
Entry("starting", messaging.PhaseStarting, "starting"),
)
})
var _ = Describe("BackendInstallProgress", func() {
Context("SubjectNodeBackendInstallProgress", func() {
It("composes the per-op progress subject", func() {
Expect(messaging.SubjectNodeBackendInstallProgress("node-abc", "op-123")).
To(Equal("nodes.node-abc.backend.install.op-123.progress"))
})
It("sanitizes NATS-reserved characters in node and op tokens", func() {
// '.' is the NATS hierarchy delimiter, '*' and '>' are wildcards,
// and whitespace must be stripped - sanitizeSubjectToken replaces
// all of them with '-'. The resulting subject must still parse as
// exactly six hierarchy segments: nodes/<node>/backend/install/<op>/progress.
subj := messaging.SubjectNodeBackendInstallProgress("a.b c", "x.y z")
Expect(subj).ToNot(ContainSubstring(" "))
Expect(strings.Count(subj, ".")).To(Equal(5))
})
})
Context("BackendInstallProgressEvent", func() {
It("JSON round-trips with all known fields", func() {
ev := messaging.BackendInstallProgressEvent{
OpID: "op-123",
NodeID: "node-abc",
Backend: "vllm",
FileName: "vllm-cpu.tar.zst",
Current: "412 MB",
Total: "2.1 GB",
Percentage: 19.6,
Phase: "downloading",
}
raw, err := json.Marshal(ev)
Expect(err).ToNot(HaveOccurred())
var got messaging.BackendInstallProgressEvent
Expect(json.Unmarshal(raw, &got)).To(Succeed())
Expect(got).To(Equal(ev))
})
})
})

View File

@@ -144,6 +144,12 @@ type BackendInstallRequest struct {
// worker still works (the master's install fallback path also uses this
// when backend.upgrade returns nats.ErrNoResponders).
Force bool `json:"force,omitempty"`
// OpID identifies the admin-side operation. When non-empty the worker
// publishes BackendInstallProgressEvent values to
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the install is
// running, debounced to roughly 250ms. Empty means the caller is a
// reconciler-driven retry that does not need progress streamed.
OpID string `json:"op_id,omitempty"`
}
// BackendInstallReply is the response from a backend.install NATS request.

View File

@@ -0,0 +1,120 @@
package nodes
import (
"sync"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
)
// DebouncedInstallProgressPublisher buffers backend-install download ticks
// and publishes them to the per-op NATS progress subject at most once per
// `interval`. Always publishes the final event on Flush so the UI sees the
// terminal percentage.
//
// Behavior: leading-edge debounce. The first OnDownload after a quiet window
// publishes immediately; subsequent ticks within `interval` only buffer the
// latest event, which is then emitted via a single trailing timer. This
// keeps the wire chatter bounded (~4 events per second at 250ms) while
// still surfacing every meaningful percentage jump.
//
// Lock ordering: never hold p.mu across a Publish call. Publish hits the
// NATS client which may block on a slow link, and we don't want a stalled
// network to stall the underlying gallery download loop.
type DebouncedInstallProgressPublisher struct {
mu sync.Mutex
client messaging.MessagingClient
subject string
nodeID string
opID string
backend string
interval time.Duration
lastPublishedAt time.Time
pending *messaging.BackendInstallProgressEvent
timer *time.Timer
}
// NewDebouncedInstallProgressPublisher constructs a publisher for one
// install operation. interval is the leading-edge debounce window
// (~250ms in production).
func NewDebouncedInstallProgressPublisher(client messaging.MessagingClient, nodeID, opID, backend string, interval time.Duration) *DebouncedInstallProgressPublisher {
return &DebouncedInstallProgressPublisher{
client: client,
subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID),
nodeID: nodeID,
opID: opID,
backend: backend,
interval: interval,
}
}
// OnDownload is the callback shape gallery.InstallBackendFromGallery and
// galleryop.InstallExternalBackend pass into the worker. Each invocation
// represents a single tick from the underlying io.Reader copy loop.
func (p *DebouncedInstallProgressPublisher) OnDownload(file, current, total string, percentage float64) {
ev := messaging.BackendInstallProgressEvent{
OpID: p.opID,
NodeID: p.nodeID,
Backend: p.backend,
FileName: file,
Current: current,
Total: total,
Percentage: percentage,
Phase: messaging.PhaseDownloading,
}
p.mu.Lock()
now := time.Now()
if p.lastPublishedAt.IsZero() || now.Sub(p.lastPublishedAt) >= p.interval {
// Leading edge: publish immediately.
p.lastPublishedAt = now
p.pending = nil
p.mu.Unlock()
_ = p.client.Publish(p.subject, ev)
return
}
// Within the window: buffer the latest event and arm a trailing
// publish. If a timer is already armed, we just overwrite p.pending so
// the trailing publish carries the freshest data.
p.pending = &ev
if p.timer == nil {
delay := p.interval - now.Sub(p.lastPublishedAt)
p.timer = time.AfterFunc(delay, p.flushPending)
}
p.mu.Unlock()
}
// flushPending is the trailing-edge publisher fired by the AfterFunc timer.
// It clears the pending slot under the lock, then publishes outside the
// lock so Publish never blocks an in-progress OnDownload call.
func (p *DebouncedInstallProgressPublisher) flushPending() {
p.mu.Lock()
p.timer = nil
pending := p.pending
p.pending = nil
if pending != nil {
p.lastPublishedAt = time.Now()
}
p.mu.Unlock()
if pending != nil {
_ = p.client.Publish(p.subject, *pending)
}
}
// Flush publishes any pending buffered event synchronously and stops the
// pending timer. Safe to call multiple times. Callers MUST defer Flush
// after constructing the publisher so the terminal percentage reaches the
// master even on error returns.
func (p *DebouncedInstallProgressPublisher) Flush() {
p.mu.Lock()
if p.timer != nil {
p.timer.Stop()
p.timer = nil
}
pending := p.pending
p.pending = nil
p.mu.Unlock()
if pending != nil {
_ = p.client.Publish(p.subject, *pending)
}
}

View File

@@ -0,0 +1,48 @@
package nodes
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("DebouncedInstallProgressPublisher", func() {
It("publishes the first event immediately and debounces subsequent ones within the window", func() {
mc := newScriptedMessagingClient()
pub := NewDebouncedInstallProgressPublisher(mc, "n1", "op1", "vllm", 50*time.Millisecond)
// Three rapid-fire ticks within the debounce window.
pub.OnDownload("vllm.tar.zst", "100 MB", "1 GB", 10.0)
pub.OnDownload("vllm.tar.zst", "200 MB", "1 GB", 20.0)
pub.OnDownload("vllm.tar.zst", "300 MB", "1 GB", 30.0)
pub.Flush()
// First event publishes immediately; the others coalesce; Flush guarantees a final.
// So we expect at least 2 publishes and at most 4 (lead + final + any window-bounded).
Eventually(func() int {
return len(mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1")))
}, "1s").Should(BeNumerically(">=", 2))
calls := mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1"))
Expect(len(calls)).To(BeNumerically("<=", 4),
"three ticks within the debounce window should produce at most ~4 publishes")
})
It("publishes the final event after Flush with the latest percentage", func() {
mc := newScriptedMessagingClient()
pub := NewDebouncedInstallProgressPublisher(mc, "n1", "op1", "vllm", 50*time.Millisecond)
pub.OnDownload("vllm.tar.zst", "1 GB", "1 GB", 100.0)
pub.Flush()
Eventually(func() float64 {
calls := mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1"))
if len(calls) == 0 {
return -1
}
return calls[len(calls)-1].Percentage
}, "1s").Should(Equal(100.0))
})
})

View File

@@ -10,6 +10,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
@@ -48,6 +49,13 @@ func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryo
return d.local.InstallModel(ctx, op, progressCb)
}
// nodeProgressSink is the narrow interface DistributedBackendManager uses to
// publish per-node progress without dragging in the full *GalleryService.
// nil means "no sink, skip per-node writes" (used by single-node tests).
type nodeProgressSink interface {
UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress)
}
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
// for backend deletion so worker nodes clean up stale files.
type DistributedBackendManager struct {
@@ -56,26 +64,31 @@ type DistributedBackendManager struct {
registry *NodeRegistry
backendGalleries []config.Gallery
systemState *system.SystemState
progressSink nodeProgressSink
}
// NewDistributedBackendManager creates a DistributedBackendManager.
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
// progressSink may be nil to disable per-node OpStatus writes (single-node
// tests don't need it).
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry, progressSink nodeProgressSink) *DistributedBackendManager {
return &DistributedBackendManager{
local: galleryop.NewLocalBackendManager(appConfig, ml),
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
progressSink: progressSink,
}
}
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
// Returned as part of BackendOpResult so the frontend can surface exactly
// what happened on each worker instead of a single joined error string.
// Status holds one of the galleryop.NodeStatus* constants.
type NodeOpStatus struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"` // "success" | "queued" | "error"
Status string `json:"status"`
Error string `json:"error,omitempty"`
}
@@ -93,7 +106,7 @@ type BackendOpResult struct {
func (r BackendOpResult) Err() error {
var failures []string
for _, n := range r.Nodes {
if n.Status == "error" {
if n.Status == galleryop.NodeStatusError {
failures = append(failures, fmt.Sprintf("%s: %s", n.NodeName, n.Error))
}
}
@@ -116,25 +129,48 @@ func (r BackendOpResult) Err() error {
// when the node returns.
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
// never had the backend installed to "upgrade" it such requests fail at the
// never had the backend installed to "upgrade" it - such requests fail at the
// gallery (no platform variant) and would otherwise leave a forever-retrying
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
//
// opID is the gallery operation identifier; when non-empty and progressSink is
// set, every per-node terminal status appended to BackendOpResult is also
// mirrored into the sink so the UI's per-node OpStatus.Nodes view stays in
// lockstep with the manager's view. opID may be empty for ops that aren't
// gallery-tracked (e.g. DeleteBackend's plain code path).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, opID, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
// emitNodeProgress is a small helper that funnels every NodeOpStatus we
// append to result.Nodes into the per-node OpStatus sink (when configured
// and opID is known). Keeping it inline avoids drift between the
// BackendOpResult view and the sink view - they're written from the same
// code path on the same terminal statuses.
emitNodeProgress := func(node BackendNode, status, errMsg string) {
if d.progressSink == nil || opID == "" {
return
}
d.progressSink.UpdateNodeProgress(opID, node.ID, galleryop.NodeProgress{
NodeID: node.ID,
NodeName: node.Name,
Status: status,
Error: errMsg,
})
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet no intent to apply.
// Pending nodes haven't been approved yet - no intent to apply.
if node.Status == StatusPending {
continue
}
// Backend lifecycle ops only make sense on backend-type workers.
// Agent workers don't subscribe to backend.install/delete/list, so
// enqueueing for them guarantees a forever-retrying row that the
// reconciler can never drain. Silently skip they aren't consumers.
// reconciler can never drain. Silently skip - they aren't consumers.
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
@@ -143,19 +179,23 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
errMsg := fmt.Sprintf("enqueue failed: %v", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusError,
Error: errMsg,
})
emitNodeProgress(node, galleryop.NodeStatusError, errMsg)
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
errMsg := fmt.Sprintf("node %s, will retry when healthy", node.Status)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusQueued,
Error: errMsg,
})
emitNodeProgress(node, galleryop.NodeStatusQueued, errMsg)
continue
}
@@ -167,14 +207,33 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
xlog.Debug("Failed to clear pending backend op after success", "error", err)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "success",
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusSuccess,
})
emitNodeProgress(node, galleryop.NodeStatusSuccess, "")
continue
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL mark unhealthy so the router stops picking it too.
// gone AWOL - mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
// Worker-still-installing is a "soft" failure: the worker is most
// likely still pulling the OCI image. Keep the row, push NextRetryAt
// out so the reconciler does not immediately re-fire another install
// while the worker is still busy, and report the in-progress state
// to the caller. The next reconciler pass / backend.list confirms
// the actual outcome.
if errors.Is(applyErr, galleryop.ErrWorkerStillInstalling) {
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpInFlight(ctx, id, errMsg, d.adapter.InstallTimeout())
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusRunningOnWorker, Error: errMsg,
})
emitNodeProgress(node, galleryop.NodeStatusRunningOnWorker, errMsg)
continue
}
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
@@ -183,8 +242,9 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusError, Error: errMsg,
})
emitNodeProgress(node, galleryop.NodeStatusError, errMsg)
}
return result, nil
}
@@ -226,7 +286,11 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error {
}
ctx := context.Background()
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
// Empty opID: plain DeleteBackend isn't gallery-tracked the same way as
// Install/Upgrade (no progress dialog), so we skip the per-node sink
// writes here. DeleteBackendDetailed is the HTTP path that surfaces
// per-node results in its own response.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -249,7 +313,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
return d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -324,9 +388,60 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
result[b.Name] = entry
}
}
// Proactively clear pending_backend_ops install rows whose intent is now
// satisfied: the backend is reported installed on its target node. Without
// this, the row sits in the queue until next_retry_at expires (up to the
// install timeout, default 15m) and the operator UI shows the install as
// "still installing in background" for that whole window even though the
// worker has actually been ready for minutes. We only clear install rows;
// upgrade and delete rows have presence-based semantics that do NOT match
// backend.list confirmation.
d.clearSatisfiedInstallRows(context.Background(), result)
return result, nil
}
// clearSatisfiedInstallRows removes pending_backend_ops install rows whose
// (nodeID, backend) pair now appears in the cluster-wide backend listing.
// Called by ListBackends after fan-out so the proactive clear sees every
// node's report. Best-effort: a DB failure is logged and the row stays for
// the reconciler to drain via its slower path.
func (d *DistributedBackendManager) clearSatisfiedInstallRows(ctx context.Context, backends gallery.SystemBackends) {
rows, err := d.registry.ListPendingBackendOps(ctx)
if err != nil {
xlog.Debug("clearSatisfiedInstallRows: failed to list pending ops", "error", err)
return
}
if len(rows) == 0 {
return
}
// Build a (nodeID, backend) presence set from the listing.
present := make(map[string]map[string]bool, len(backends))
for name, b := range backends {
for _, ref := range b.Nodes {
if present[ref.NodeID] == nil {
present[ref.NodeID] = make(map[string]bool)
}
present[ref.NodeID][name] = true
}
}
for _, row := range rows {
if row.Op != OpBackendInstall {
continue
}
if !present[row.NodeID][row.Backend] {
continue
}
if err := d.registry.DeletePendingBackendOp(ctx, row.ID); err != nil {
xlog.Debug("clearSatisfiedInstallRows: delete failed",
"id", row.ID, "node", row.NodeID, "backend", row.Backend, "error", err)
continue
}
xlog.Info("Reconciler: pending install row satisfied by backend.list",
"node", row.NodeID, "backend", row.Backend)
}
}
// InstallBackend fans out installation through the pending-ops queue so
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
@@ -345,11 +460,41 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
targetNodeIDs = map[string]bool{op.TargetNodeID: true}
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
result, err := d.enqueueAndDrainBackendOp(ctx, op.ID, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// onProgress fans each BackendInstallProgressEvent into two
// observers: the legacy single-bar progressCb (kept so callers
// that only consume the aggregate view keep working) and the
// per-node sink (so OpStatus.Nodes gets a "downloading" tick
// per file/percentage with node attribution). Defined inside the
// loop so each node captures its own node.Name into the closure.
onProgress := func(ev messaging.BackendInstallProgressEvent) {
if progressCb != nil {
progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage)
}
if d.progressSink != nil && op.ID != "" {
d.progressSink.UpdateNodeProgress(op.ID, ev.NodeID, galleryop.NodeProgress{
NodeID: ev.NodeID,
NodeName: node.Name,
Status: galleryop.NodeStatusDownloading,
FileName: ev.FileName,
Current: ev.Current,
Total: ev.Total,
Percentage: ev.Percentage,
Phase: ev.Phase,
})
}
}
// nil-callback shortcut: when there is nothing to deliver to,
// hand the adapter a nil onProgress so it skips the per-op NATS
// subscription. Matches the pre-Phase-4 bridgeProgressCb semantics.
var onProgressArg func(messaging.BackendInstallProgressEvent)
if progressCb != nil || d.progressSink != nil {
onProgressArg = onProgress
}
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, onProgressArg)
if err != nil {
return err
}
@@ -361,7 +506,19 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
if err != nil {
return err
}
return result.Err()
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// No hard failures, but if at least one node reported running_on_worker,
// surface a wrapped ErrWorkerStillInstalling so galleryop can render a
// yellow in-progress state instead of green success. The reconciler
// will confirm the actual outcome on its next pass via backend.list.
for _, n := range result.Nodes {
if n.Status == galleryop.NodeStatusRunningOnWorker {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
}
// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
@@ -392,7 +549,11 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
targetNodeIDs[n.NodeID] = true
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// Empty opID: the caller (galleryop) doesn't thread an op ID into
// UpgradeBackend today, so we can't tag per-node sink writes with the
// right OpStatus key. Until the upgrade path takes a ManagementOp the
// way InstallBackend does, the sink stays no-op here.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
if err != nil {
// Rolling-update fallback: an older worker doesn't know
@@ -417,7 +578,18 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
if err != nil {
return err
}
return result.Err()
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// Same in-progress surfacing as InstallBackend: a long-running worker
// upgrade that timed out the NATS round-trip must not be reported as
// green success.
for _, n := range result.Nodes {
if n.Status == galleryop.NodeStatusRunningOnWorker {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
}
// IsDistributed reports that installs from this manager fan out across the
@@ -443,3 +615,16 @@ func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[stri
// it used to come from the empty frontend filesystem.
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
}
// summarizeRunningOnWorker builds a short human-readable summary of which
// nodes are still installing in the background, for inclusion in the
// wrapped ErrWorkerStillInstalling error.
func summarizeRunningOnWorker(nodes []NodeOpStatus) string {
var names []string
for _, n := range nodes {
if n.Status == galleryop.NodeStatusRunningOnWorker {
names = append(names, n.NodeName)
}
}
return strings.Join(names, ", ")
}

View File

@@ -3,6 +3,7 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"runtime"
"sync"
"time"
@@ -12,6 +13,7 @@ import (
. "github.com/onsi/gomega"
"gorm.io/gorm"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
@@ -22,11 +24,35 @@ import (
// (or error). Used so each fan-out request can simulate a different worker
// outcome without spinning up real NATS.
type scriptedMessagingClient struct {
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
publishes []progressPublishCall
scheduledProgressPublishes []scheduledProgressPublish
subscribes []string
}
// progressPublishCall records a single Publish invocation. The progress
// publisher tests assert on the sequence of BackendInstallProgressEvent
// values written to a per-op subject, so we capture both subject and the
// decoded event. Named to avoid clashing with the simpler `publishCall`
// already defined in unloader_test.go (which stores raw JSON bytes for
// non-progress assertions).
type progressPublishCall struct {
Subject string
Event messaging.BackendInstallProgressEvent
}
// scheduledProgressPublish queues a batch of BackendInstallProgressEvent
// values to be delivered the next time Subscribe is called with the matching
// subject. This lets master-side tests assert that the adapter installs its
// handler BEFORE publishing the install request, by scripting events to be
// delivered as soon as the subscription appears.
type scheduledProgressPublish struct {
subject string
events []messaging.BackendInstallProgressEvent
}
// matchedReply lets a test script a canned reply that only fires when the
@@ -98,10 +124,10 @@ func (s *scriptedMessagingClient) scriptReplyMatching(subject string, pred func(
})
}
func (s *scriptedMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
func (s *scriptedMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.calls = append(s.calls, requestCall{Subject: subject, Data: data})
s.calls = append(s.calls, requestCall{Subject: subject, Data: data, Timeout: timeout})
// Predicate-matched replies take precedence over flat scriptReply.
if matchers, ok := s.matchedReplies[subject]; ok {
@@ -135,8 +161,88 @@ func (s *scriptedMessagingClient) Request(subject string, data []byte, _ time.Du
return nil, &fakeNoRespondersErr{}
}
func (s *scriptedMessagingClient) Publish(_ string, _ any) error { return nil }
func (s *scriptedMessagingClient) Subscribe(_ string, _ func([]byte)) (messaging.Subscription, error) {
// Publish records each call so progress-publisher tests can assert on the
// stream of events written to a subject. The real messaging.Client JSON
// encodes the payload before sending, but our publisher hands a typed
// struct directly, so we handle both shapes.
func (s *scriptedMessagingClient) Publish(subject string, data any) error {
s.mu.Lock()
defer s.mu.Unlock()
switch ev := data.(type) {
case messaging.BackendInstallProgressEvent:
s.publishes = append(s.publishes, progressPublishCall{Subject: subject, Event: ev})
case []byte:
var e messaging.BackendInstallProgressEvent
_ = json.Unmarshal(ev, &e)
s.publishes = append(s.publishes, progressPublishCall{Subject: subject, Event: e})
}
return nil
}
// publishCalls returns every BackendInstallProgressEvent that was published
// to `subject`, in order. Lets tests assert on debounce behavior without
// depending on internal Publish timing.
func (s *scriptedMessagingClient) publishCalls(subject string) []messaging.BackendInstallProgressEvent {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]messaging.BackendInstallProgressEvent, 0)
for _, c := range s.publishes {
if c.Subject != subject {
continue
}
out = append(out, c.Event)
}
return out
}
// scheduleProgressPublish queues a set of BackendInstallProgressEvent values
// to be delivered on the next Subscribe call matching the per-op progress
// subject. A short delay before delivery gives the subscriber time to install
// its message handler before the events arrive.
func (s *scriptedMessagingClient) scheduleProgressPublish(nodeID, opID string, events []messaging.BackendInstallProgressEvent) {
s.mu.Lock()
defer s.mu.Unlock()
s.scheduledProgressPublishes = append(s.scheduledProgressPublishes, scheduledProgressPublish{
subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID),
events: events,
})
}
// subscribeCalls returns the subjects on which Subscribe was invoked.
// Used to confirm the master skipped subscription when onProgress was nil.
func (s *scriptedMessagingClient) subscribeCalls() []string {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]string, len(s.subscribes))
copy(out, s.subscribes)
return out
}
func (s *scriptedMessagingClient) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
s.mu.Lock()
s.subscribes = append(s.subscribes, subject)
matched := []scheduledProgressPublish{}
remaining := s.scheduledProgressPublishes[:0]
for _, sp := range s.scheduledProgressPublishes {
if sp.subject == subject {
matched = append(matched, sp)
} else {
remaining = append(remaining, sp)
}
}
s.scheduledProgressPublishes = remaining
s.mu.Unlock()
go func() {
time.Sleep(20 * time.Millisecond)
for _, sp := range matched {
for _, ev := range sp.events {
raw, _ := json.Marshal(ev)
handler(raw)
}
}
}()
return &fakeSubscription{}, nil
}
func (s *scriptedMessagingClient) QueueSubscribe(_ string, _ string, _ func([]byte)) (messaging.Subscription, error) {
@@ -151,8 +257,43 @@ func (s *scriptedMessagingClient) SubscribeReply(_ string, _ func([]byte, func([
func (s *scriptedMessagingClient) IsConnected() bool { return true }
func (s *scriptedMessagingClient) Close() {}
// recordingNodeCall captures a single UpdateNodeProgress invocation so
// per-node OpStatus tests can assert on the sequence of writes the
// DistributedBackendManager fans out into the sink.
type recordingNodeCall struct {
OpID string
NodeID string
Progress galleryop.NodeProgress
}
// recordingProgressSink is a test-only nodeProgressSink that just records
// every call. Used by the per-node OpStatus specs below to assert the
// manager wrote the expected terminal and downloading entries.
type recordingProgressSink struct {
mu sync.Mutex
calls []recordingNodeCall
}
func (r *recordingProgressSink) UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, recordingNodeCall{OpID: opID, NodeID: nodeID, Progress: np})
}
func (r *recordingProgressSink) callsFor(opID, nodeID string) []galleryop.NodeProgress {
r.mu.Lock()
defer r.mu.Unlock()
out := []galleryop.NodeProgress{}
for _, c := range r.calls {
if c.OpID == opID && c.NodeID == nodeID {
out = append(out, c.Progress)
}
}
return out
}
// fakeNoRespondersErr is the unscripted-subject default. It matches
// nats.ErrNoResponders by string only used when a test forgets to script
// nats.ErrNoResponders by string only - used when a test forgets to script
// a node so the failure is loud but doesn't tickle errors.Is(...) sentinel
// paths the test wasn't deliberately exercising. Tests that DO want the
// real sentinel (e.g. to drive the manager's NoResponders fallback) call
@@ -204,7 +345,7 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(err).ToNot(HaveOccurred())
mc = newScriptedMessagingClient()
adapter = NewRemoteUnloaderAdapter(nil, mc)
adapter = NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
mgr = &DistributedBackendManager{
local: stubLocalBackendManager{},
adapter: adapter,
@@ -352,6 +493,263 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(mc.calls).To(BeEmpty())
})
})
Context("when InstallBackend times out on a worker", func() {
It("returns galleryop.ErrWorkerStillInstalling and keeps the queue row with NextRetryAt pushed out", func() {
n := registerHealthyBackend("slow", "10.0.0.1:50051")
// Script a NATS timeout on the install subject. The adapter
// wraps this into galleryop.ErrWorkerStillInstalling, which
// the manager should treat as a soft failure.
mc.scriptErr(messaging.SubjectNodeBackendInstall(n.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
rows, err := registry.ListPendingBackendOps(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(HaveLen(1))
Expect(rows[0].Backend).To(Equal("vllm"))
// The adapter is configured with a 3m install timeout in this
// suite (NewRemoteUnloaderAdapter above). NextRetryAt should
// be ~now+3m; a > now+2m bound is safe-but-tight enough to
// catch the buggy short default (30s exponential backoff).
Expect(rows[0].NextRetryAt).To(BeTemporally(">", time.Now().Add(2*time.Minute)),
"NextRetryAt should be pushed to ~now+installTimeout, not the short default")
})
})
Context("end-to-end: timeout then successful reconcile via backend.list", func() {
It("surfaces the install in ListBackends after the worker finishes", func() {
// Use the same node-registration helper the Task 5 test uses
// so the test fixture is identical to the prior context.
node := registerHealthyBackend("jetson", "10.0.0.2:50051")
// First install attempt: NATS times out. The adapter wraps
// this as galleryop.ErrWorkerStillInstalling and the manager
// keeps the pending_backend_ops row alive with NextRetryAt
// pushed out (asserted in the previous context).
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
rows, listErr := registry.ListPendingBackendOps(ctx)
Expect(listErr).ToNot(HaveOccurred())
Expect(rows).To(HaveLen(1))
// The worker finished installing in the background. Script
// backend.list on the same scriptedMessagingClient so the
// manager's ListBackends fan-out reports the backend.
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
backends, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
Expect(backends).To(HaveKey("vllm"))
Expect(backends["vllm"].Nodes).To(HaveLen(1))
Expect(backends["vllm"].Nodes[0].NodeID).To(Equal(node.ID))
// Phase 1b shipped: ListBackends proactively clears install rows
// whose intent is now satisfied by backend.list confirmation. The
// operator UI clears immediately instead of waiting for the next
// reconciler tick after NextRetryAt.
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(BeEmpty(),
"install row should clear once backend.list confirms presence on the target node")
})
})
Context("ListBackends clears confirmed install rows", func() {
It("deletes the pending_backend_ops install row when the backend is reported installed on its target node", func() {
node := registerHealthyBackend("worker-a", "10.0.0.5:50051")
// Pre-stage: simulate an admin install that timed out at the NATS
// round-trip, leaving an install row in the queue.
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue())
rows, _ := registry.ListPendingBackendOps(ctx)
Expect(rows).To(HaveLen(1))
// Worker finishes installing in the background. backend.list now
// confirms presence; ListBackends should proactively clear the row.
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
backends, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
Expect(backends).To(HaveKey("vllm"))
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(BeEmpty(),
"ListBackends should clear install rows whose intent is now satisfied by backend.list")
})
It("does NOT clear an upgrade row even if the backend is reported installed", func() {
node := registerHealthyBackend("worker-b", "10.0.0.6:50051")
Expect(registry.UpsertPendingBackendOp(ctx, node.ID, "vllm", OpBackendUpgrade, []byte("[]"))).To(Succeed())
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
_, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(HaveLen(1), "upgrade rows must not be cleared by backend.list presence")
})
})
Context("InstallBackend streams progress events to the caller's progressCb", func() {
It("invokes progressCb once per worker-published progress event", func() {
node := registerHealthyBackend("worker-prog", "10.0.0.7:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.7:50051"})
mc.scheduleProgressPublish(node.ID, "op-prog-1", []messaging.BackendInstallProgressEvent{
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100},
})
type tick struct {
FileName, Current, Total string
Percentage float64
}
var (
pcCalls []tick
mu sync.Mutex
)
progressCb := func(file, current, total string, pct float64) {
mu.Lock()
defer mu.Unlock()
pcCalls = append(pcCalls, tick{file, current, total, pct})
}
opVal := op("vllm")
opVal.ID = "op-prog-1"
Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(pcCalls)
}, "1s").Should(Equal(2))
mu.Lock()
defer mu.Unlock()
// The adapter dispatches each progress event to its own goroutine
// (see unloader.go: `go onProgress(ev)`) so two events emitted back
// to back can land at the bridge in either order. Assert the set of
// percentages observed contains both ticks, rather than depending
// on goroutine scheduling for ordering.
pcts := []float64{pcCalls[0].Percentage, pcCalls[1].Percentage}
Expect(pcts).To(ConsistOf(10.0, 100.0))
})
})
Context("InstallBackend tolerates silent (pre-Phase-2) workers", func() {
It("completes successfully even when no progress events are ever published", func() {
node := registerHealthyBackend("worker-silent", "10.0.0.8:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.8:50051"})
// NO scheduleProgressPublish call - silent worker.
var ticks int
var mu sync.Mutex
progressCb := func(file, current, total string, pct float64) {
mu.Lock()
defer mu.Unlock()
ticks++
}
opVal := op("vllm")
opVal.ID = "op-silent-1"
Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed())
Consistently(func() int {
mu.Lock()
defer mu.Unlock()
return ticks
}, "200ms").Should(Equal(0))
})
})
Context("populates per-node OpStatus entries", func() {
var sink *recordingProgressSink
BeforeEach(func() {
// Reconstruct mgr with the recording sink so the new code
// path (per-node OpStatus writes) is exercised. The default
// mgr in the outer BeforeEach has progressSink=nil so the
// pre-existing specs keep verifying the no-sink behavior.
sink = &recordingProgressSink{}
appCfg := &config.ApplicationConfig{}
mgr = NewDistributedBackendManager(appCfg, nil, adapter, registry, sink)
// stubLocalBackendManager mirrors the production behaviour
// where the frontend node rarely has the backend installed
// locally - the NATS fan-out is what these specs verify.
mgr.local = stubLocalBackendManager{}
})
It("emits a success entry for each healthy node visited", func() {
node := registerHealthyBackend("worker-ok", "10.0.0.9:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true, Address: "10.0.0.9:50051"})
opVal := op("vllm")
opVal.ID = "op-node-success"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
calls := sink.callsFor("op-node-success", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal(galleryop.NodeStatusSuccess))
Expect(calls[len(calls)-1].NodeName).To(Equal("worker-ok"))
})
It("emits a running_on_worker entry when NATS times out", func() {
node := registerHealthyBackend("worker-slow", "10.0.0.10:50051")
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
opVal := op("vllm")
opVal.ID = "op-node-slow"
// Soft failure: returns wrapped ErrWorkerStillInstalling.
_ = mgr.InstallBackend(ctx, opVal, nil)
calls := sink.callsFor("op-node-slow", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal(galleryop.NodeStatusRunningOnWorker))
})
It("emits downloading entries from progress events", func() {
node := registerHealthyBackend("worker-dl", "10.0.0.11:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true})
mc.scheduleProgressPublish(node.ID, "op-node-dl", []messaging.BackendInstallProgressEvent{
{OpID: "op-node-dl", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100, Phase: messaging.PhaseDownloading},
})
opVal := op("vllm")
opVal.ID = "op-node-dl"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
Eventually(func() bool {
for _, np := range sink.callsFor("op-node-dl", node.ID) {
if np.Status == galleryop.NodeStatusDownloading && np.Percentage == 100.0 {
return true
}
}
return false
}, "1s").Should(BeTrue())
})
})
})
Describe("UpgradeBackend", func() {

View File

@@ -68,9 +68,9 @@ type ModelScheduler interface {
// ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler.
type ReplicaReconcilerOptions struct {
Registry *NodeRegistry
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
Unloader NodeCommandSender
// Adapter is the NATS sender used to retry pending backend ops. When nil,
// the state-reconciler pending-drain pass is a no-op (single-node mode).
Adapter *RemoteUnloaderAdapter
@@ -78,7 +78,7 @@ type ReplicaReconcilerOptions struct {
// addresses. Matches the worker's token so HealthCheck auth succeeds.
RegistrationToken string
// Prober overrides the default gRPC health probe (used by tests).
Prober ModelProber
Prober ModelProber
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
@@ -191,7 +191,7 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// Pending-op drain for admin install — not a per-replica load.
// Replica 0 is the conventional admin slot. Install is idempotent:
// the worker short-circuits if the backend is already running.
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, "", nil)
if err != nil {
applyErr = err
} else if !reply.Success {

View File

@@ -17,24 +17,24 @@ import (
// Workers are generic — they don't have a fixed backend type.
// The SmartRouter dynamically installs backends via NATS backend.install events.
type BackendNode struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
// ReservedVRAM is a soft, in-tick reservation deducted by the scheduler when
// it picks this node to load a model. Workers reset it back to 0 on each
// heartbeat (the worker is the source of truth for actual free VRAM); the
// reservation is only here to keep two scheduling decisions within the
// same heartbeat window from over-committing the same node.
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
// MaxReplicasPerModel caps how many replicas of any one model can run on
// this node concurrently. Default 1 preserves the historical "one
// (node, model)" assumption; set higher (via worker --max-replicas-per-model)
@@ -44,12 +44,12 @@ type BackendNode struct {
// admin override. When true, the worker's CLI value is ignored on
// re-registration so the override survives worker restarts. Cleared
// by an explicit "reset to worker default" action.
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
const (
@@ -79,17 +79,17 @@ const (
// gRPC Address (each replica is a separate worker process on its own port),
// and its own InFlight counter.
type NodeModel struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
@@ -1287,7 +1287,7 @@ func (r *NodeRegistry) UpdateMaxReplicasPerModel(ctx context.Context, nodeID str
res := r.db.WithContext(ctx).Model(&BackendNode{}).
Where("id = ?", nodeID).
Updates(map[string]any{
ColMaxReplicasPerModel: n,
ColMaxReplicasPerModel: n,
"max_replicas_per_model_manually_set": true,
})
if res.Error != nil {
@@ -1460,7 +1460,7 @@ func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backe
NextRetryAt: time.Now(),
}
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
}).Create(&row).Error
}
@@ -1515,6 +1515,27 @@ func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uin
})
}
// RecordPendingBackendOpInFlight is the "soft failure" cousin of
// RecordPendingBackendOpFailure. Used when a NATS install round-trip timed
// out but the worker is still installing in the background. Stores the
// message in LastError and pushes NextRetryAt out by `retryDelay` (typically
// the install timeout) so the reconciler does not immediately re-fire
// another install while the worker is still busy.
//
// Attempts is intentionally NOT incremented: an in-flight timeout is not a
// failed attempt, it is a still-in-progress one. Incrementing it would let a
// genuinely-progressing slow install (e.g. 30 GB CUDA image on Wi-Fi) trip
// the maxPendingBackendOpAttempts cap in the reconciler and dead-letter the
// row while the worker is still legitimately working.
func (r *NodeRegistry) RecordPendingBackendOpInFlight(ctx context.Context, id uint, lastError string, retryDelay time.Duration) error {
return r.db.WithContext(ctx).Model(&PendingBackendOp{}).
Where("id = ?", id).
Updates(map[string]any{
"last_error": lastError,
"next_retry_at": time.Now().Add(retryDelay),
}).Error
}
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
// reconciler tick is 30s so anything shorter would just re-fire immediately.
func backoffForAttempt(attempts int) time.Duration {

View File

@@ -688,7 +688,7 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
v, err, _ := r.installFlight.Do(key, func() (any, error) {
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
if err != nil {
return "", err
}

View File

@@ -330,7 +330,7 @@ type upgradeCall struct {
replica int
}
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) {
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int, _ string, _ func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error) {
// installHook intentionally runs OUTSIDE the mutex: the hook may block
// on a channel and we don't want to serialize concurrent callers,
// which would defeat the singleflight-overlap test.

View File

@@ -2,9 +2,15 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/xlog"
)
@@ -28,7 +34,7 @@ type backendStopRequest struct {
// nats.ErrNoResponders for old workers that don't subscribe to the new
// backend.upgrade subject.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
@@ -43,18 +49,33 @@ type NodeCommandSender interface {
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
// over NATS for remote nodes.
type RemoteUnloaderAdapter struct {
registry ModelLocator
nats messaging.MessagingClient
registry ModelLocator
nats messaging.MessagingClient
installTimeout time.Duration
upgradeTimeout time.Duration
}
// NewRemoteUnloaderAdapter creates a new adapter.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
// NewRemoteUnloaderAdapter creates a new adapter. installTimeout and
// upgradeTimeout govern the NATS request-reply deadlines for backend.install
// and backend.upgrade respectively. Use
// DistributedConfig.BackendInstallTimeoutOrDefault() /
// BackendUpgradeTimeoutOrDefault() at construction.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient, installTimeout, upgradeTimeout time.Duration) *RemoteUnloaderAdapter {
return &RemoteUnloaderAdapter{
registry: registry,
nats: nats,
registry: registry,
nats: nats,
installTimeout: installTimeout,
upgradeTimeout: upgradeTimeout,
}
}
// InstallTimeout returns the configured backend.install round-trip timeout.
// Used by DistributedBackendManager to push NextRetryAt out by this duration
// when a worker times out replying but is still installing in the background.
func (a *RemoteUnloaderAdapter) InstallTimeout() time.Duration {
return a.installTimeout
}
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
// to stop their backend process via NATS backend.stop event.
// The worker process handles: Free() → kill process.
@@ -87,18 +108,59 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
// is on disk, the worker just spawns a process; only a missing binary
// triggers a full gallery pull.
//
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
// case while still failing fast enough to surface real worker hangs.
// Timeout: configured via DistributedConfig.BackendInstallTimeoutOrDefault
// (default 15m). Most calls return in under 2 seconds (process already
// running). The 15-minute ceiling covers the cold-binary spawn-after-download
// case on slow links (Jetson Wi-Fi, multi-GB CUDA images) while still
// failing fast enough to surface real worker hangs.
//
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead -
// it lives on a different NATS subject so it cannot head-of-line-block
// routine load traffic on the same worker.
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
func (a *RemoteUnloaderAdapter) InstallBackend(
nodeID, backendType, modelID, galleriesJSON, uri, name, alias string,
replicaIndex int,
opID string,
onProgress func(messaging.BackendInstallProgressEvent),
) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "opID", opID)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
// Subscribe to the per-op progress subject BEFORE publishing the install
// request so we don't miss early events. When onProgress is nil OR opID
// is empty (the reconciler-driven retry path), skip subscription entirely:
// silent installs cost nothing extra.
var sub messaging.Subscription
if onProgress != nil && opID != "" {
progressSubject := messaging.SubjectNodeBackendInstallProgress(nodeID, opID)
s, subErr := a.nats.Subscribe(progressSubject, func(raw []byte) {
var ev messaging.BackendInstallProgressEvent
if err := json.Unmarshal(raw, &ev); err != nil {
xlog.Debug("malformed install progress event", "subject", progressSubject, "error", err)
return
}
// Goroutine guard: a slow onProgress callback must not stall
// the NATS reader thread.
//
// NOTE: events spawn one goroutine each, so ordering at the
// consumer is best-effort. In practice the worker debounces to
// ~250ms which is far larger than goroutine scheduling jitter,
// so reordering is rare. The worker's final Flush() event is
// intended to win as the terminal tick. A future hardening pass
// could add a Seq uint64 field to BackendInstallProgressEvent
// and drop stale-by-seq at the bridge if reordering becomes a
// real UX issue.
go onProgress(ev)
})
if subErr != nil {
xlog.Warn("Failed to subscribe to install progress subject; proceeding without progress streaming",
"subject", progressSubject, "error", subErr)
} else {
sub = s
}
}
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
ModelID: modelID,
BackendGalleries: galleriesJSON,
@@ -106,29 +168,46 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 3*time.Minute)
OpID: opID,
}, a.installTimeout)
if sub != nil {
_ = sub.Unsubscribe()
}
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
// The worker stops every live process for this backend, force-reinstalls
// from the gallery (overwriting the on-disk artifact), and replies. The
// next routine InstallBackend call spawns a fresh process with the new
// binary upgrade itself does not start a process.
// binary - upgrade itself does not start a process.
//
// Timeout: 15 minutes. Real-world worst case observed: 810 minutes for
// large CUDA-l4t backend images on Jetson over WiFi.
// Timeout: configured via DistributedConfig.BackendUpgradeTimeoutOrDefault
// (default 15m). Real-world worst case observed: 8-10 minutes for large
// CUDA-l4t backend images on Jetson over WiFi.
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
reply, err := messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 15*time.Minute)
}, a.upgradeTimeout)
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}
// installWithForceFallback is the rolling-update fallback used by
@@ -141,7 +220,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
@@ -149,7 +228,12 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: true,
}, 15*time.Minute)
}, a.upgradeTimeout)
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}
// ListBackends queries a worker node for its installed backends via NATS request-reply.
@@ -228,3 +312,14 @@ func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
subject := messaging.SubjectNodeStop(nodeID)
return a.nats.Publish(subject, nil)
}
// isNATSTimeout returns true if err looks like a NATS request-reply timeout.
// nats.ErrTimeout is the canonical sentinel; context.DeadlineExceeded can
// also surface depending on the client's path; we accept both, plus a
// string-match fallback for clients that return a bare error.
func isNATSTimeout(err error) bool {
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
return true
}
return err != nil && strings.Contains(err.Error(), "nats: timeout")
}

View File

@@ -3,13 +3,16 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/nats-io/nats.go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
)
@@ -60,6 +63,7 @@ type publishCall struct {
type requestCall struct {
Subject string
Data []byte
Timeout time.Duration
}
func (f *fakeMessagingClient) Publish(subject string, data any) error {
@@ -93,10 +97,10 @@ func (f *fakeMessagingClient) SubscribeReply(_ string, _ func(data []byte, reply
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
func (f *fakeMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data})
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data, Timeout: timeout})
return f.requestReply, f.requestErr
}
@@ -119,7 +123,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
BeforeEach(func() {
locator = &fakeModelLocator{}
mc = &fakeMessagingClient{}
adapter = NewRemoteUnloaderAdapter(locator, mc)
adapter = NewRemoteUnloaderAdapter(locator, mc, 3*time.Minute, 15*time.Minute)
})
Describe("UnloadRemoteModel", func() {
@@ -154,7 +158,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
}
// Use a messaging client that fails the first Publish call only.
failOnce := &failOnceMessagingClient{inner: mc, failOn: 0}
adapter = NewRemoteUnloaderAdapter(locator, failOnce)
adapter = NewRemoteUnloaderAdapter(locator, failOnce, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
@@ -259,3 +263,96 @@ func (f *failOnceMessagingClient) Request(subject string, data []byte, timeout t
func (f *failOnceMessagingClient) IsConnected() bool { return true }
func (f *failOnceMessagingClient) Close() {}
var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
It("passes the configured install timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(7 * time.Minute))
})
It("passes the configured upgrade timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendUpgrade("n1"), messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute))
})
})
var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() {
It("wraps nats.ErrTimeout from InstallBackend in galleryop.ErrWorkerStillInstalling", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
})
It("does NOT wrap non-timeout errors", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse())
Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue())
})
})
var _ = Describe("RemoteUnloaderAdapter install progress streaming", func() {
It("forwards BackendInstallProgressEvent values into the onProgress callback when the worker publishes them", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
mc.scheduleProgressPublish("n1", "op-abc", []messaging.BackendInstallProgressEvent{
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "500 MB", Total: "1 GB", Percentage: 50},
})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
var (
received []messaging.BackendInstallProgressEvent
mu sync.Mutex
)
onProgress := func(ev messaging.BackendInstallProgressEvent) {
mu.Lock()
defer mu.Unlock()
received = append(received, ev)
}
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "op-abc", onProgress)
Expect(err).ToNot(HaveOccurred())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(received)
}, "1s").Should(Equal(2))
})
It("does NOT subscribe when onProgress is nil (reconciler retry path)", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.subscribeCalls()).To(BeEmpty(),
"reconciler-driven retries must not subscribe to the per-op progress subject")
})
})

View File

@@ -1,6 +1,8 @@
package nodes
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -15,7 +17,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc)
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(reply.Success).To(BeTrue())
@@ -24,7 +26,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
It("returns the underlying error when the subject has no responders", func() {
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
adapter := NewRemoteUnloaderAdapter(nil, mc)
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
Expect(err).To(HaveOccurred())
})

View File

@@ -7,14 +7,22 @@ import (
"os"
"path/filepath"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/xlog"
)
// installProgressDebounce is the leading-edge window the worker uses when
// streaming download progress to the master. 250ms caps wire chatter at
// ~4 events/sec per in-flight install while still surfacing every
// meaningful percentage jump.
const installProgressDebounce = 250 * time.Millisecond
// buildProcessKey is the supervisor's stable identifier for a backend gRPC
// process. It includes the replica index so the same model can run multiple
// processes on a worker simultaneously without colliding on the same map slot
@@ -100,6 +108,20 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest,
}
}
// When the master tagged this install with an OpID, stream the
// gallery download progress back to it on the per-op NATS subject.
// Old masters that omit OpID stay on the silent path so they keep
// working without changes. The publisher releases its mutex before
// every Publish so a slow link never stalls the download loop, and
// the deferred Flush guarantees a terminal-percentage event reaches
// the master even when the install errors out.
var downloadCb func(file, current, total string, percentage float64)
if req.OpID != "" && s.nats != nil {
publisher := nodes.NewDebouncedInstallProgressPublisher(s.nats, s.nodeID, req.OpID, req.Backend, installProgressDebounce)
downloadCb = publisher.OnDownload
defer publisher.Flush()
}
// On upgrade, run the gallery install path even if the binary already
// exists on disk: findBackend would otherwise short-circuit and we'd
// restart the same stale binary. The force flag passed to
@@ -112,14 +134,14 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest,
if req.URI != "" {
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", force)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
} else {
xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", force)
if err := gallery.InstallBackendFromGallery(
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, force, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, req.Backend, downloadCb, force, s.cfg.RequireBackendIntegrity,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}

View File

@@ -16,8 +16,12 @@ const MaxSnippetSeconds = 30
// AudioSnippet captures the first MaxSnippetSeconds of a WAV file and computes
// quality metrics. The result is a map suitable for merging into a BackendTrace
// Data field.
func AudioSnippet(wavPath string) map[string]any {
// Data field. maxBytes caps the embedded base64 waveform so a single TTS or
// transcription trace cannot blow past the backend-trace body cap (~1.3 MiB
// of base64 per 30s of 16 kHz mono int16 PCM otherwise); when the encoded
// waveform would exceed the cap the audio_wav_base64 field is dropped and
// the rest of the metrics are returned. maxBytes <= 0 disables the cap.
func AudioSnippet(wavPath string, maxBytes int) map[string]any {
raw, err := os.ReadFile(wavPath)
if err != nil {
xlog.Warn("audio snippet: read failed", "path", wavPath, "error", err)
@@ -34,12 +38,14 @@ func AudioSnippet(wavPath string) map[string]any {
sampleRate = 16000
}
return AudioSnippetFromPCM(pcm, sampleRate, len(pcm))
return AudioSnippetFromPCM(pcm, sampleRate, len(pcm), maxBytes)
}
// AudioSnippetFromPCM builds an audio snippet from raw PCM bytes (int16 LE mono).
// totalPCMBytes is the full audio size before truncation (used to compute total duration).
func AudioSnippetFromPCM(pcm []byte, sampleRate int, totalPCMBytes int) map[string]any {
// totalPCMBytes is the full audio size before truncation (used to compute
// total duration). maxBytes caps the embedded base64 waveform as described
// on AudioSnippet.
func AudioSnippetFromPCM(pcm []byte, sampleRate, totalPCMBytes, maxBytes int) map[string]any {
if len(pcm) == 0 || len(pcm)%2 != 0 {
return nil
}
@@ -89,8 +95,7 @@ func AudioSnippetFromPCM(pcm []byte, sampleRate int, totalPCMBytes int) map[stri
}
buf.Write(snippetPCM)
return map[string]any{
"audio_wav_base64": base64.StdEncoding.EncodeToString(buf.Bytes()),
out := map[string]any{
"audio_duration_s": math.Round(durationS*100) / 100,
"audio_snippet_s": math.Round(snippetDuration*100) / 100,
"audio_sample_rate": sampleRate,
@@ -99,4 +104,15 @@ func AudioSnippetFromPCM(pcm []byte, sampleRate int, totalPCMBytes int) map[stri
"audio_peak_dbfs": math.Round(peakDBFS*10) / 10,
"audio_dc_offset": math.Round(dcOffset*10000) / 10000,
}
// Skip the embedded waveform when it would dominate the trace payload.
// Truncating mid-base64 produces an undecodable string, so the right
// move is to drop the field and let the UI render just the metrics.
encodedSize := base64.StdEncoding.EncodedLen(buf.Len())
if maxBytes <= 0 || encodedSize <= maxBytes {
out["audio_wav_base64"] = base64.StdEncoding.EncodeToString(buf.Bytes())
} else {
xlog.Debug("audio snippet: dropping audio_wav_base64", "encoded_bytes", encodedSize, "max_bytes", maxBytes)
out["audio_wav_base64_dropped_bytes"] = encodedSize
}
return out
}

View File

@@ -0,0 +1,49 @@
package trace_test
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/trace"
)
// One second of mono 16-bit PCM at 16 kHz: 32 KiB raw. After the 44-byte
// WAV header and base64 encoding the snippet runs ~42 KiB, which is well
// over the small caps used here and matches the smallest realistic TTS
// output size.
const (
snippetSampleRate = 16000
snippetSeconds = 1
)
func makePCM(seconds, sampleRate int) []byte {
return make([]byte, seconds*sampleRate*2) // int16 mono
}
var _ = Describe("AudioSnippetFromPCM byte cap", func() {
pcm := makePCM(snippetSeconds, snippetSampleRate)
totalPCM := len(pcm)
It("omits audio_wav_base64 when the encoded snippet would exceed the cap, keeping the metrics", func() {
out := trace.AudioSnippetFromPCM(pcm, snippetSampleRate, totalPCM, 1024)
Expect(out).ToNot(BeNil(), "metrics must still be returned even when the waveform is dropped")
Expect(out).ToNot(HaveKey("audio_wav_base64"), "oversized base64 must be dropped so the UI does not try to render invalid audio data")
Expect(out).To(HaveKey("audio_duration_s"))
Expect(out).To(HaveKey("audio_sample_rate"))
Expect(out).To(HaveKey("audio_rms_dbfs"))
})
It("includes audio_wav_base64 when the snippet fits under the cap", func() {
out := trace.AudioSnippetFromPCM(pcm, snippetSampleRate, totalPCM, 1024*1024)
Expect(out).To(HaveKey("audio_wav_base64"))
Expect(out["audio_wav_base64"]).ToNot(BeEmpty())
})
It("includes audio_wav_base64 when the cap is disabled (0)", func() {
out := trace.AudioSnippetFromPCM(pcm, snippetSampleRate, totalPCM, 0)
Expect(out).To(HaveKey("audio_wav_base64"))
})
})

View File

@@ -2,6 +2,7 @@ package trace
import (
"encoding/json"
"fmt"
"slices"
"sync"
"time"
@@ -49,13 +50,25 @@ var backendMu sync.Mutex
var backendLogChan = make(chan *BackendTrace, 100)
var backendInitOnce sync.Once
func InitBackendTracingIfEnabled(maxItems int) {
// backendMaxBodyBytes caps each captured string value in a BackendTrace.Data
// field to keep the /api/backend-traces JSON small enough for the admin UI to
// load on every 5s auto-refresh. Mirrors the API-trace body cap added in
// commit 61bf34ea: without it a chatty LLM workload (full message history per
// trace) or any TTS run (~1.3 MiB of audio_wav_base64 per trace) blows the
// payload past tens of MiB and locks the Traces page in a loading state.
//
// 0 disables the cap. Set on the first InitBackendTracingIfEnabled call only,
// matching the sync.Once-guarded maxItems semantics.
var backendMaxBodyBytes int
func InitBackendTracingIfEnabled(maxItems, maxBodyBytes int) {
backendInitOnce.Do(func() {
if maxItems <= 0 {
maxItems = 100
}
backendMu.Lock()
backendTraceBuffer = circularbuffer.New[*BackendTrace](maxItems)
backendMaxBodyBytes = maxBodyBytes
backendMu.Unlock()
go func() {
@@ -71,6 +84,9 @@ func InitBackendTracingIfEnabled(maxItems int) {
}
func RecordBackendTrace(t BackendTrace) {
if t.Data != nil && backendMaxBodyBytes > 0 {
t.Data = capDataStrings(t.Data, backendMaxBodyBytes)
}
select {
case backendLogChan <- &t:
default:
@@ -78,6 +94,35 @@ func RecordBackendTrace(t BackendTrace) {
}
}
// capDataStrings walks a trace Data map and replaces any string value (at any
// depth) that exceeds maxBytes with a fixed-size marker that names the
// original byte count. The replacement is intentionally short and not valid
// base64/JSON: the goal is to flag "this was dropped" cheaply, not to keep a
// partial value that the UI might try to render. Non-string scalars and
// non-map containers pass through untouched so structural fields like
// total_deltas or audio_sample_rate remain useful.
func capDataStrings(data map[string]any, maxBytes int) map[string]any {
out := make(map[string]any, len(data))
for k, v := range data {
out[k] = capValue(v, maxBytes)
}
return out
}
func capValue(v any, maxBytes int) any {
switch val := v.(type) {
case string:
if len(val) > maxBytes {
return fmt.Sprintf("<truncated: %d bytes>", len(val))
}
return val
case map[string]any:
return capDataStrings(val, maxBytes)
default:
return v
}
}
func GetBackendTraces() []BackendTrace {
backendMu.Lock()
if backendTraceBuffer == nil {
@@ -136,3 +181,24 @@ func TruncateString(s string, maxLen int) string {
}
return s[:maxLen] + "..."
}
// TruncateToBytes caps a string at exactly maxBytes, preserving the leading
// content and appending a marker so the UI knows the value was clipped.
// Unlike TruncateString it guarantees output <= maxBytes, which matters for
// fields that feed back into the trace pipeline: capDataStrings in
// RecordBackendTrace re-checks size and would otherwise replace a producer's
// head-preserving truncation with the bare marker, losing the prefix.
//
// maxBytes <= 0 disables the cap, matching backendMaxBodyBytes semantics.
func TruncateToBytes(s string, maxBytes int) string {
if maxBytes <= 0 || len(s) <= maxBytes {
return s
}
suffix := fmt.Sprintf("...[truncated, %d bytes]", len(s))
if len(suffix) >= maxBytes {
// Pathologically small caps can't fit the marker; fall back to a
// hard cut so the contract (output <= maxBytes) still holds.
return s[:maxBytes]
}
return s[:maxBytes-len(suffix)] + suffix
}

View File

@@ -0,0 +1,160 @@
package trace_test
import (
"strings"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/trace"
)
// The /api/backend-traces endpoint ships up to TracingMaxItems entries to the
// admin Traces UI on every 5s auto-refresh. Without a cap on the per-trace
// Data field, a chatty agent-pool workload (LLM traces carry the full
// `messages` array, TTS traces carry ~1.3 MiB of audio_wav_base64) makes the
// response tens of MiB. The UI then stays in "loading" forever because the
// download + parse runs longer than the refresh interval: the same symptom
// the API-trace fix (commit 61bf34ea) addressed on the other side.
//
// These specs pin the generic safety net (Option A) so any future producer
// that stuffs a large string into Data is automatically bounded.
const (
smallCap = 1024
smallCapStep = 16
)
var _ = Describe("RecordBackendTrace Data capping", func() {
BeforeEach(func() {
// Init is sync.Once so the first test wins; subsequent tests just
// clear the buffer. The cap value below has to match the first call.
trace.InitBackendTracingIfEnabled(64, smallCap)
trace.ClearBackendTraces()
})
It("replaces oversized top-level string values with a truncation marker", func() {
oversized := strings.Repeat("x", smallCap*4)
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: time.Now(),
Type: trace.BackendTraceLLM,
ModelName: "m",
Data: map[string]any{
"messages": oversized,
"small": "fits",
},
})
Eventually(trace.GetBackendTraces).Should(HaveLen(1))
got := trace.GetBackendTraces()[0]
Expect(got.Data["small"]).To(Equal("fits"), "fields under the cap must pass through untouched")
// The marker is the contract the UI reads to show truncation; the
// concrete shape can evolve but it must be a short fixed-size string
// that encodes the original byte count so users know what was dropped.
msg, ok := got.Data["messages"].(string)
Expect(ok).To(BeTrue(), "string fields stay strings after capping")
Expect(len(msg)).To(BeNumerically("<", smallCap), "capped value must fit under the configured cap")
Expect(msg).To(ContainSubstring("truncated"))
Expect(msg).To(ContainSubstring("4096"), "marker should reference the original byte count for diagnostics")
})
It("recurses into nested maps so deeply nested oversized strings are also bounded", func() {
oversized := strings.Repeat("y", smallCap*2)
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: time.Now(),
Type: trace.BackendTraceLLM,
ModelName: "m",
Data: map[string]any{
"chat_deltas": map[string]any{
"content": oversized,
"total_deltas": 5,
"tool_call_count": 0,
},
},
})
Eventually(trace.GetBackendTraces).Should(HaveLen(1))
got := trace.GetBackendTraces()[0]
deltas, ok := got.Data["chat_deltas"].(map[string]any)
Expect(ok).To(BeTrue(), "nested map structure must be preserved")
Expect(deltas["total_deltas"]).To(Equal(5), "non-string siblings must pass through untouched")
content, ok := deltas["content"].(string)
Expect(ok).To(BeTrue())
Expect(len(content)).To(BeNumerically("<", smallCap), "nested oversized string must still be capped")
Expect(content).To(ContainSubstring("truncated"))
})
It("leaves values within the cap untouched", func() {
smallVal := strings.Repeat("z", smallCap-smallCapStep)
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: time.Now(),
Type: trace.BackendTraceEmbedding,
ModelName: "m",
Data: map[string]any{
"input_text": smallVal,
},
})
Eventually(trace.GetBackendTraces).Should(HaveLen(1))
got := trace.GetBackendTraces()[0]
Expect(got.Data["input_text"]).To(Equal(smallVal))
})
It("does not re-truncate values that producers already capped with TruncateToBytes", func() {
// Producers (LLM messages/response, etc.) prefer head-preserving
// truncation so users can still read the start of the conversation.
// TruncateToBytes guarantees output <= cap, so the generic safety
// net below must leave it alone, otherwise the kept prefix gets
// thrown away and replaced with the marker.
preTruncated := trace.TruncateToBytes(strings.Repeat("a", smallCap*4), smallCap)
Expect(len(preTruncated)).To(BeNumerically("<=", smallCap))
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: time.Now(),
Type: trace.BackendTraceLLM,
ModelName: "m",
Data: map[string]any{
"messages": preTruncated,
},
})
Eventually(trace.GetBackendTraces).Should(HaveLen(1))
got := trace.GetBackendTraces()[0]
Expect(got.Data["messages"]).To(Equal(preTruncated))
})
})
var _ = Describe("TruncateToBytes", func() {
It("returns the input unchanged when it fits", func() {
Expect(trace.TruncateToBytes("hello", 1024)).To(Equal("hello"))
})
It("treats maxBytes <= 0 as unlimited", func() {
Expect(trace.TruncateToBytes("hello", 0)).To(Equal("hello"))
Expect(trace.TruncateToBytes("hello", -1)).To(Equal("hello"))
})
It("caps oversized input to at most maxBytes and preserves the head", func() {
in := strings.Repeat("a", 5000)
out := trace.TruncateToBytes(in, 100)
Expect(len(out)).To(BeNumerically("<=", 100), "output must never exceed the cap so the generic Record-time safety net doesn't fire")
Expect(out).To(HavePrefix("a"), "should keep the leading content readable")
Expect(out).To(ContainSubstring("truncated"), "should mark the value as truncated for the UI")
})
It("falls back to plain truncation when the cap is smaller than the suffix", func() {
in := strings.Repeat("a", 100)
out := trace.TruncateToBytes(in, 4)
Expect(len(out)).To(Equal(4))
Expect(out).To(Equal("aaaa"))
})
})

View File

@@ -0,0 +1,13 @@
package trace_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestTrace(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Trace test suite")
}

View File

@@ -86,6 +86,8 @@ The frontend is a standard LocalAI instance with distributed mode enabled. These
| `--auto-approve-nodes` | `LOCALAI_AUTO_APPROVE_NODES` | `false` | Auto-approve new worker nodes (skip admin approval) |
| `--auth` | `LOCALAI_AUTH` | `false` | **Must be `true`** for distributed mode |
| `--auth-database-url` | `LOCALAI_AUTH_DATABASE_URL` | *(required)* | PostgreSQL connection URL |
| `--backend-install-timeout` | `LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT` | `15m` | How long the frontend waits for a worker to acknowledge a backend install before considering the request stalled. Raise it when workers pull large backend images over slow links. If a worker takes longer than this, the operation shows as "still installing in background" in the admin UI and clears once the worker finishes. |
| `--backend-upgrade-timeout` | `LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT` | `15m` | Same as the install timeout, applied to backend upgrades (force-reinstall). |
### Optional: S3 Object Storage
@@ -103,6 +105,31 @@ When S3 is not configured, model files are transferred directly from the fronten
For high-throughput or very large model files, S3 can be more efficient since it avoids streaming through the frontend.
### Watching Backend Installs
While a worker downloads a backend, the admin **Operations Bar** at the top
of the UI shows real-time progress: current file, downloaded/total bytes,
and percentage. This works the same as single-node mode.
When an install targets more than one worker, an **N nodes** chevron
appears on the operation row. Click it to expand a per-node breakdown,
with one row per worker showing:
- A status pill: **Queued** (gray), **Downloading** (blue), **Worker busy**
(yellow), **Done** (green), or **Failed** (red).
- The file currently being downloaded with current/total bytes and percentage.
- A thin per-node progress bar.
- Any error returned by the worker.
The yellow **Worker busy** pill means the worker took longer than
`--backend-install-timeout` to acknowledge but is most likely still
working in the background. The admin UI clears it as soon as the worker
finishes; no action is required from the operator.
If a worker is running an older LocalAI release that does not report
progress, its row in the breakdown will still show terminal status
(queued / done / failed / worker busy) but no per-file progress.
## Worker Configuration
Workers are started with the `worker` subcommand. Each worker is generic — it doesn't need a backend type at startup:

View File

@@ -3236,6 +3236,38 @@ const docTemplate = `{
}
}
},
"galleryop.NodeProgress": {
"type": "object",
"properties": {
"current": {
"type": "string"
},
"error": {
"type": "string"
},
"file_name": {
"type": "string"
},
"node_id": {
"type": "string"
},
"node_name": {
"type": "string"
},
"percentage": {
"type": "number"
},
"phase": {
"type": "string"
},
"status": {
"type": "string"
},
"total": {
"type": "string"
}
}
},
"galleryop.OpStatus": {
"type": "object",
"properties": {
@@ -3267,6 +3299,13 @@ const docTemplate = `{
"message": {
"type": "string"
},
"nodes": {
"description": "Nodes is the per-node breakdown for a fanned-out backend install.\nPopulated by DistributedBackendManager (per-node terminal status)\nand by the Phase 2 progress bridge (per-byte ticks). The\n/api/operations handler surfaces this so the UI can render an\nexpandable per-node view of an in-flight install.",
"type": "array",
"items": {
"$ref": "#/definitions/galleryop.NodeProgress"
}
},
"processed": {
"type": "boolean"
},

View File

@@ -3233,6 +3233,38 @@
}
}
},
"galleryop.NodeProgress": {
"type": "object",
"properties": {
"current": {
"type": "string"
},
"error": {
"type": "string"
},
"file_name": {
"type": "string"
},
"node_id": {
"type": "string"
},
"node_name": {
"type": "string"
},
"percentage": {
"type": "number"
},
"phase": {
"type": "string"
},
"status": {
"type": "string"
},
"total": {
"type": "string"
}
}
},
"galleryop.OpStatus": {
"type": "object",
"properties": {
@@ -3264,6 +3296,13 @@
"message": {
"type": "string"
},
"nodes": {
"description": "Nodes is the per-node breakdown for a fanned-out backend install.\nPopulated by DistributedBackendManager (per-node terminal status)\nand by the Phase 2 progress bridge (per-byte ticks). The\n/api/operations handler surfaces this so the UI can render an\nexpandable per-node view of an in-flight install.",
"type": "array",
"items": {
"$ref": "#/definitions/galleryop.NodeProgress"
}
},
"processed": {
"type": "boolean"
},

View File

@@ -210,6 +210,27 @@ definitions:
$ref: '#/definitions/gallery.NodeDriftInfo'
type: array
type: object
galleryop.NodeProgress:
properties:
current:
type: string
error:
type: string
file_name:
type: string
node_id:
type: string
node_name:
type: string
percentage:
type: number
phase:
type: string
status:
type: string
total:
type: string
type: object
galleryop.OpStatus:
properties:
cancellable:
@@ -232,6 +253,16 @@ definitions:
type: string
message:
type: string
nodes:
description: |-
Nodes is the per-node breakdown for a fanned-out backend install.
Populated by DistributedBackendManager (per-node terminal status)
and by the Phase 2 progress bridge (per-byte ticks). The
/api/operations handler surfaces this so the UI can render an
expandable per-node view of an in-flight install.
items:
$ref: '#/definitions/galleryop.NodeProgress'
type: array
processed:
type: boolean
progress:

View File

@@ -225,7 +225,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
// newTestSmartRouter creates a SmartRouter with NATS wired up and a mock
// backend.install handler that always replies success for all registered nodes.
newTestSmartRouter := func(reg *nodes.NodeRegistry, extraOpts ...nodes.SmartRouterOptions) *nodes.SmartRouter {
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC, 3*time.Minute, 15*time.Minute)
opts := nodes.SmartRouterOptions{
Unloader: unloader,
@@ -395,7 +395,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
Expect(err).ToNot(HaveOccurred())
// Create RemoteUnloaderAdapter and unload model
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
err = unloader.UnloadRemoteModel("old-model")
Expect(err).ToNot(HaveOccurred())

View File

@@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
@@ -175,7 +176,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedModelManager(appCfg, ml, adapter)
err = distMgr.DeleteModel("big-model")
@@ -251,8 +252,8 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
err = distMgr.DeleteBackend("my-backend")
Expect(err).ToNot(HaveOccurred())
@@ -298,8 +299,8 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
// Should NOT return an error even though the backend doesn't exist locally
err = distMgr.DeleteBackend("remote-only-backend")

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -56,8 +57,8 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeTrue())
})
@@ -77,8 +78,8 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeFalse())
Expect(installReply.Error).To(ContainSubstring("backend not found"))
@@ -103,7 +104,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
// Frontend calls UnloadRemoteModel (triggered by UI "Stop" or WatchDog)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("whisper-large")).To(Succeed())
Eventually(func() int32 { return stopReceived.Load() }, "5s").Should(Equal(int32(1)))
@@ -133,14 +134,14 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter.UnloadRemoteModel("shared-model")
Eventually(func() int32 { return count.Load() }, "5s").Should(Equal(int32(2)))
})
It("should be no-op for models not on any node", func() {
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("nonexistent-model")).To(Succeed())
})
})
@@ -161,7 +162,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.StopNode(node.ID)).To(Succeed())
Eventually(func() int32 { return stopped.Load() }, "5s").Should(Equal(int32(1)))

View File

@@ -3,6 +3,7 @@ package distributed_test
import (
"context"
"encoding/json"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -78,7 +79,7 @@ var _ = Describe("SmartRouter trackingKey", Label("Distributed"), func() {
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
nodeID = node.ID
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
router = nodes.NewSmartRouter(registry, nodes.SmartRouterOptions{
Unloader: unloader,
})