mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-09 01:07:09 -04:00
* fix(galleryop): self-evict terminal ops from OpCache.GetStatus The processingBackends map (the UI 'reinstalling' spinner source) only cleared an op when a client polled /api/backends/job/:uid. The Manage-page Reinstall and Upgrade buttons never poll, so completed installs leaked into processingBackends forever and the backend card spun 'reinstalling' even though the install had finished. Evict terminal ops on the list read instead; DeleteUUID already broadcasts the eviction so peer replicas converge. Reproduced on a live 5-node distributed cluster: 5 backends sat in processingBackends with underlying jobs reporting completed:true,progress:100. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(nodes): clear pending backend ops behind offline/draining nodes ListDuePendingBackendOps filters status=healthy, so a backend op queued against a node that went offline (stale heartbeat) or draining (admin action) was never retried, aged out, or deleted - it leaked forever and kept the UI operation spinning. Add DeleteStalePendingBackendOps and run it each reconcile pass: draining nodes are cleared immediately (model rows already purged), offline nodes once their heartbeat is older than a grace window (blip protection). Reproduced on a live cluster: orphaned llama-cpp install rows targeting an offline (nvidia-thor) and a draining (mac-mini-m4) node sat at attempts=0 indefinitely. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(nodes): stream per-node progress during backend upgrade The install dispatch subscribed to a per-op progress subject and streamed per-node download ticks; the upgrade dispatch did a bare 15-minute blocking NATS round-trip with no subscription, so the UI showed progress:0 the whole time (the 'reinstalling but nothing happens' report on a slow node). Thread the op ID through BackendManager.UpgradeBackend -> the distributed manager -> the adapter, and have the adapter subscribe to the per-op progress subject before the request (extracted into a shared subscribeProgress helper reused by install/upgrade/force-fallback). The worker's upgradeBackend now creates the same DebouncedInstallProgressPublisher installBackend uses. An upgrade is a force-reinstall, so it reuses SubjectNodeBackendInstallProgress rather than minting a new subject - no new NATS permission, no new rolling-update compat surface. Reconciler-driven retries pass empty opID/onProgress and stay on the silent path. Reproduced on a live cluster: upgrade of llama-cpp-development on agx-orin-slow sat at progress:0 for 4+ minutes with no per-node feedback. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(galleryop): persist cancellation + periodically reap orphaned ops Two distributed gaps surfaced when a replica was killed mid-upgrade on a live cluster, leaving the backend stuck 'processing' in the UI forever: 1. CancelOperation flipped the in-memory status to cancelled and broadcast a NATS event but never persisted the terminal status. On the next replica restart the still-active row re-hydrated straight back into processingBackends and the UI spun again. It now calls store.Cancel(id) so the cancel survives a restart. 2. CleanStale (which marks abandoned active ops failed) only ran once on startup, so an op orphaned AFTER startup - its owning replica's foreground handler goroutine gone - was never reaped until the next restart. Add GalleryService.ReapStaleOperations and run it on a 15m ticker (CleanStale now returns the reaped count for observability). Neither is covered by the OpCache self-evict fix: an orphaned op never reaches Processed, so it would never self-evict. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(review): address self-review findings on the distributed install fixes Three findings from an adversarial review of this branch: 1. CRITICAL - OpCache.GetStatus crashed under concurrent load. m.Map() returns the live internal map by reference, so deleting from it on the read path was an unsynchronized write to a map four HTTP handlers poll every ~1s -> a 'concurrent map writes' fatal. Rewritten to iterate a Keys() snapshot, build a fresh result map, and apply evictions via the locked DeleteUUID after the loop. Added a -race concurrency regression guard. 2. HIGH - GetStatus evicted failed ops too, hiding them from /api/operations and breaking the dismiss-failed-op flow (the panel keeps Error != nil ops so the admin can read the error and click Dismiss). Eviction now fires only for terminal ops with Error == nil (success/cancelled); failures are retained. 3. MEDIUM - DeleteStalePendingBackendOps missed StatusUnhealthy nodes. A node marked unhealthy on a NATS ErrNoResponders never transitions to offline (health.go skips re-marking it), so its pending ops leaked exactly like the offline case. Unhealthy is now reaped via the same stale-heartbeat grace path (a fresh-heartbeat node is recovering and keeps its op). Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(review-2): don't evict the still-installing soft-path; don't spin on failed ops Second review pass found two issues: 1. MEDIUM (Go) - OpCache.GetStatus evicted the ErrWorkerStillInstalling soft-path op. That op is deliberately Processed=true with no error to show a yellow in-progress state when a worker timed out the NATS round-trip but is still installing in the background; the reconciler confirms the real outcome later. Evicting it (and broadcasting OpEnd + marking the DB completed) hid an install that may still fail. Eviction is now scoped to a clean success (progress 100 + 'completed', matching the job-poll's historical condition) or a cancellation - the soft-path (progress != 100) and failures are kept. 2. MEDIUM (React) - the Backends gallery card rendered ANY operation as an 'Installing...' spinner, so a failed op (now intentionally kept in the list for the OperationsBar error + Dismiss) spun forever. Exclude errored ops from the card spinner, mirroring Models.jsx (isInstalling already excludes op.error). The error + Dismiss still surface in the global OperationsBar. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(ui): refresh Manage backends table when an operation settles The Manage backends table fetched installed backends only on mount/after delete and checked upgrades only on tab activation. After a reinstall/upgrade completed neither re-ran, so the installed-version cell and the 'update available' badge stayed stale until the user switched tabs - the op looked like it 'did nothing'. Watch the operations list (via useOperations) and re-fetch installed backends + available upgrades whenever the count settles, mirroring the operations.length watch Backends.jsx already uses. Consolidates the prior tab-activation upgrades check into the same effect. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
261 lines
11 KiB
Go
261 lines
11 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/services/galleryop"
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/LocalAI/core/services/nodes"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// installProgressDebounce is the leading-edge window the worker uses when
|
|
// streaming download progress to the master. 250ms caps wire chatter at
|
|
// ~4 events/sec per in-flight install while still surfacing every
|
|
// meaningful percentage jump.
|
|
const installProgressDebounce = 250 * time.Millisecond
|
|
|
|
// buildProcessKey is the supervisor's stable identifier for a backend gRPC
|
|
// process. It includes the replica index so the same model can run multiple
|
|
// processes on a worker simultaneously without colliding on the same map slot
|
|
// or port. The "#N" suffix is purely internal — the controller never reads it.
|
|
func buildProcessKey(modelID, backend string, replicaIndex int) string {
|
|
base := modelID
|
|
if base == "" {
|
|
base = backend
|
|
}
|
|
return fmt.Sprintf("%s#%d", base, replicaIndex)
|
|
}
|
|
|
|
// installBackend handles the backend.install flow. force=true is the
|
|
// upgrade path; force=false is the routine load path.
|
|
//
|
|
// The caller is responsible for holding s.lockBackend(req.Backend) for
|
|
// the duration of the call so the gallery directory isn't raced.
|
|
//
|
|
// 1. If already running for this (model, replica) slot AND force is false,
|
|
// return existing address (the fast path used by routine load events that
|
|
// just want to know which port a backend already serves on).
|
|
// 2. If force is true, stop any process(es) currently using this backend
|
|
// so the gallery install can replace the on-disk artifact and the freshly
|
|
// started process picks up the new binary. This is the upgrade path —
|
|
// without it, every backend.install we receive after the first hits the
|
|
// fast path and silently no-ops, leaving the cluster on a stale build.
|
|
// 3. Install backend from gallery (force passed through so existing artifacts
|
|
// get overwritten on upgrade).
|
|
// 4. Find backend binary
|
|
// 5. Start gRPC process on a new port
|
|
//
|
|
// Returns the gRPC address of the backend process.
|
|
//
|
|
// ProcessKey includes the replica index so a worker with MaxReplicasPerModel>1
|
|
// can host multiple processes for the same model on distinct ports. Old
|
|
// controllers (no replica_index in the request) implicitly target replica 0,
|
|
// which preserves single-replica behavior.
|
|
func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest, force bool) (string, error) {
|
|
processKey := buildProcessKey(req.ModelID, req.Backend, int(req.ReplicaIndex))
|
|
|
|
if !force {
|
|
// Fast path: already running for this model+replica → return existing
|
|
// address. Verify liveness before trusting the cached entry: a process
|
|
// that died without the supervisor noticing leaves a stale (key, addr)
|
|
// pair, and getAddr would otherwise hand the controller an address
|
|
// that immediately ECONNREFUSEDs. The reconciler then marks the
|
|
// replica failed, retries the install, the supervisor says "already
|
|
// running" again, and the cluster loops on a dead replica forever.
|
|
if addr := s.getAddr(processKey); addr != "" {
|
|
if s.isRunning(processKey) {
|
|
xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
|
|
return addr, nil
|
|
}
|
|
xlog.Warn("Stale process entry for backend (dead process); cleaning up before reinstall",
|
|
"backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
|
|
s.stopBackendExact(processKey)
|
|
}
|
|
} else {
|
|
// Upgrade path: stop every live process that shares this backend so the
|
|
// gallery install can overwrite the on-disk artifact and the restarted
|
|
// process picks up the new binary. resolveProcessKeys catches peer
|
|
// replicas of the same backend (whisper#0, whisper#1, ...) on workers
|
|
// configured with MaxReplicasPerModel>1. We also stop the exact
|
|
// processKey from the request tuple — keys created with an explicit
|
|
// modelID don't share the bare-name prefix the resolver matches, but
|
|
// they're still using the old binary and need to come down. Both calls
|
|
// are no-ops on missing keys.
|
|
toStop := s.resolveProcessKeys(req.Backend)
|
|
toStop = append(toStop, processKey)
|
|
for _, key := range toStop {
|
|
xlog.Info("Force install: stopping running backend before reinstall",
|
|
"backend", req.Backend, "processKey", key)
|
|
s.stopBackendExact(key)
|
|
}
|
|
}
|
|
|
|
// Parse galleries from request (override local config if provided)
|
|
galleries := s.galleries
|
|
if req.BackendGalleries != "" {
|
|
var reqGalleries []config.Gallery
|
|
if err := json.Unmarshal([]byte(req.BackendGalleries), &reqGalleries); err == nil {
|
|
galleries = reqGalleries
|
|
}
|
|
}
|
|
|
|
// When the master tagged this install with an OpID, stream the
|
|
// gallery download progress back to it on the per-op NATS subject.
|
|
// Old masters that omit OpID stay on the silent path so they keep
|
|
// working without changes. The publisher releases its mutex before
|
|
// every Publish so a slow link never stalls the download loop, and
|
|
// the deferred Flush guarantees a terminal-percentage event reaches
|
|
// the master even when the install errors out.
|
|
var downloadCb func(file, current, total string, percentage float64)
|
|
if req.OpID != "" && s.nats != nil {
|
|
publisher := nodes.NewDebouncedInstallProgressPublisher(s.nats, s.nodeID, req.OpID, req.Backend, installProgressDebounce)
|
|
downloadCb = publisher.OnDownload
|
|
defer publisher.Flush()
|
|
}
|
|
|
|
// On upgrade, run the gallery install path even if the binary already
|
|
// exists on disk: findBackend would otherwise short-circuit and we'd
|
|
// restart the same stale binary. The force flag passed to
|
|
// InstallBackendFromGallery makes it overwrite the existing artifact.
|
|
backendPath := ""
|
|
if !force {
|
|
backendPath = s.findBackend(req.Backend)
|
|
}
|
|
if backendPath == "" {
|
|
if req.URI != "" {
|
|
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", force)
|
|
if err := galleryop.InstallExternalBackend(
|
|
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
|
|
); err != nil {
|
|
return "", fmt.Errorf("installing backend from gallery: %w", err)
|
|
}
|
|
} else {
|
|
xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", force)
|
|
if err := gallery.InstallBackendFromGallery(
|
|
context.Background(), galleries, s.systemState, s.ml, req.Backend, downloadCb, force, s.cfg.RequireBackendIntegrity,
|
|
); err != nil {
|
|
return "", fmt.Errorf("installing backend from gallery: %w", err)
|
|
}
|
|
}
|
|
// Re-register after install and retry
|
|
gallery.RegisterBackends(s.systemState, s.ml)
|
|
backendPath = s.findBackend(req.Backend)
|
|
}
|
|
|
|
if backendPath == "" {
|
|
return "", fmt.Errorf("backend %q not found after install attempt", req.Backend)
|
|
}
|
|
|
|
xlog.Info("Found backend binary", "path", backendPath, "processKey", processKey)
|
|
|
|
// Start the gRPC process on a new port (keyed by model, not just backend)
|
|
return s.startBackend(processKey, backendPath)
|
|
}
|
|
|
|
// upgradeBackend stops every running process for `backend`, force-reinstalls
|
|
// from gallery (overwriting the on-disk artifact), and re-registers backends.
|
|
// It does NOT start any new gRPC process — the next routine model load via
|
|
// backend.install will spawn a fresh process picking up the new binary.
|
|
//
|
|
// The caller is responsible for holding s.lockBackend(req.Backend).
|
|
func (s *backendSupervisor) upgradeBackend(req messaging.BackendUpgradeRequest) error {
|
|
// Stop every live process for this backend (peer replicas + the bare
|
|
// processKey). Same logic as the force branch in installBackend.
|
|
toStop := s.resolveProcessKeys(req.Backend)
|
|
toStop = append(toStop, buildProcessKey("", req.Backend, int(req.ReplicaIndex)))
|
|
for _, key := range toStop {
|
|
xlog.Info("Upgrade: stopping running backend before reinstall",
|
|
"backend", req.Backend, "processKey", key)
|
|
s.stopBackendExact(key)
|
|
}
|
|
|
|
galleries := s.galleries
|
|
if req.BackendGalleries != "" {
|
|
var reqGalleries []config.Gallery
|
|
if err := json.Unmarshal([]byte(req.BackendGalleries), &reqGalleries); err == nil {
|
|
galleries = reqGalleries
|
|
}
|
|
}
|
|
|
|
// When the master tagged this upgrade with an OpID, stream gallery download
|
|
// progress back on the per-op subject (reused from install — an upgrade is a
|
|
// force-reinstall). Old masters omit OpID and stay on the silent path. The
|
|
// deferred Flush guarantees a terminal-percentage event even if the upgrade
|
|
// errors out, so the master's per-node bar never hangs mid-download.
|
|
var downloadCb func(file, current, total string, percentage float64)
|
|
if req.OpID != "" && s.nats != nil {
|
|
publisher := nodes.NewDebouncedInstallProgressPublisher(s.nats, s.nodeID, req.OpID, req.Backend, installProgressDebounce)
|
|
downloadCb = publisher.OnDownload
|
|
defer publisher.Flush()
|
|
}
|
|
|
|
if req.URI != "" {
|
|
xlog.Info("Upgrading backend from external URI", "backend", req.Backend, "uri", req.URI)
|
|
if err := galleryop.InstallExternalBackend(
|
|
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
|
|
); err != nil {
|
|
return fmt.Errorf("upgrading backend from external URI: %w", err)
|
|
}
|
|
} else {
|
|
xlog.Info("Upgrading backend from gallery", "backend", req.Backend)
|
|
if err := gallery.InstallBackendFromGallery(
|
|
context.Background(), galleries, s.systemState, s.ml, req.Backend, downloadCb, true, /* force */
|
|
s.cfg.RequireBackendIntegrity,
|
|
); err != nil {
|
|
return fmt.Errorf("upgrading backend from gallery: %w", err)
|
|
}
|
|
}
|
|
|
|
gallery.RegisterBackends(s.systemState, s.ml)
|
|
return nil
|
|
}
|
|
|
|
// findBackend looks for the backend binary in the backends path and system path.
|
|
func (s *backendSupervisor) findBackend(backend string) string {
|
|
candidates := []string{
|
|
filepath.Join(s.cfg.BackendsPath, backend),
|
|
filepath.Join(s.cfg.BackendsPath, backend, backend),
|
|
filepath.Join(s.cfg.BackendsSystemPath, backend),
|
|
filepath.Join(s.cfg.BackendsSystemPath, backend, backend),
|
|
}
|
|
if uri := s.ml.GetExternalBackend(backend); uri != "" {
|
|
if fi, err := os.Stat(uri); err == nil && !fi.IsDir() {
|
|
return uri
|
|
}
|
|
}
|
|
for _, path := range candidates {
|
|
fi, err := os.Stat(path)
|
|
if err == nil && !fi.IsDir() {
|
|
return path
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// lockBackend returns a release function for a per-backend mutex. Different
|
|
// backend names lock independently. The first caller for a name allocates
|
|
// the mutex under s.mu; subsequent callers for the same name reuse it.
|
|
func (s *backendSupervisor) lockBackend(name string) func() {
|
|
s.mu.Lock()
|
|
if s.backendLocks == nil {
|
|
s.backendLocks = make(map[string]*sync.Mutex)
|
|
}
|
|
m, ok := s.backendLocks[name]
|
|
if !ok {
|
|
m = &sync.Mutex{}
|
|
s.backendLocks[name] = m
|
|
}
|
|
s.mu.Unlock()
|
|
m.Lock()
|
|
return m.Unlock
|
|
}
|