Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
8fbf18490e fix: remove deprecated cosign bundle flag from backend merge workflow
Agent-Logs-Url: https://github.com/mudler/LocalAI/sessions/4207dabc-14ec-4655-9594-487338977fcf

Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-22 22:16:44 +00:00
copilot-swe-agent[bot]
b334a77405 Initial plan 2026-05-22 22:13:44 +00:00
37 changed files with 178 additions and 1996 deletions

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=1acee6bf8939948f9bcbf4b14034e4b475f06069
LLAMA_VERSION?=bb28c1fe246b72276ee1d00ce89306be7b865766
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=0baf721215f45335a5df8caf0ecb34e870c956e7
STABLEDIFFUSION_GGML_VERSION?=3a8788cb7d74f185d6b18688e9563015524ecaf5
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=0ccd896f5b882628e1c077f9769735ef4ce52860
WHISPER_CPP_VERSION?=8443cf05e3fa8ce1b32348e1bcbcf8fc31f7f3ae
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -233,12 +233,7 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
xlog.Info("File stager initialized (HTTP direct transfer)")
}
// Create RemoteUnloaderAdapter — needed by SmartRouter and startup.go
remoteUnloader := nodes.NewRemoteUnloaderAdapter(
registry,
natsClient,
cfg.Distributed.BackendInstallTimeoutOrDefault(),
cfg.Distributed.BackendUpgradeTimeoutOrDefault(),
)
remoteUnloader := nodes.NewRemoteUnloaderAdapter(registry, natsClient)
// All dependencies ready — build SmartRouter with all options at once
var conflictResolver nodes.ConcurrencyConflictResolver

View File

@@ -17,9 +17,9 @@ import (
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/vram"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/sanitize"
@@ -200,7 +200,7 @@ func New(opts ...config.AppOption) (*Application, error) {
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),
)
application.galleryService.SetBackendManager(
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry, application.galleryService),
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry),
)
}
}

View File

@@ -39,19 +39,19 @@ type RunCMD struct {
LocalaiConfigDir string `env:"LOCALAI_CONFIG_DIR" type:"path" default:"${basepath}/configuration" help:"Directory for dynamic loading of certain configuration files (currently api_keys.json and external_backends.json)" group:"storage"`
LocalaiConfigDirPollInterval time.Duration `env:"LOCALAI_CONFIG_DIR_POLL_INTERVAL" help:"Typically the config path picks up changes automatically, but if your system has broken fsnotify events, set this to an interval to poll the LocalAI Config Dir (example: 1m)" group:"storage"`
// The alias on this option is there to preserve functionality with the old `--config-file` parameter
ModelsConfigFile string `env:"LOCALAI_MODELS_CONFIG_FILE,CONFIG_FILE" aliases:"config-file" help:"YAML file containing a list of model backend configs" group:"storage"`
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"backends" default:"${backends}"`
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of galleries" group:"models" default:"${galleries}"`
AutoloadGalleries bool `env:"LOCALAI_AUTOLOAD_GALLERIES,AUTOLOAD_GALLERIES" group:"models" default:"true"`
AutoloadBackendGalleries bool `env:"LOCALAI_AUTOLOAD_BACKEND_GALLERIES,AUTOLOAD_BACKEND_GALLERIES" group:"backends" default:"true"`
BackendImagesReleaseTag string `env:"LOCALAI_BACKEND_IMAGES_RELEASE_TAG,BACKEND_IMAGES_RELEASE_TAG" help:"Fallback release tag for backend images" group:"backends" default:"latest"`
BackendImagesBranchTag string `env:"LOCALAI_BACKEND_IMAGES_BRANCH_TAG,BACKEND_IMAGES_BRANCH_TAG" help:"Fallback branch tag for backend images" group:"backends" default:"master"`
BackendDevSuffix string `env:"LOCALAI_BACKEND_DEV_SUFFIX,BACKEND_DEV_SUFFIX" help:"Development suffix for backend images" group:"backends" default:"development"`
ModelsConfigFile string `env:"LOCALAI_MODELS_CONFIG_FILE,CONFIG_FILE" aliases:"config-file" help:"YAML file containing a list of model backend configs" group:"storage"`
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"backends" default:"${backends}"`
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of galleries" group:"models" default:"${galleries}"`
AutoloadGalleries bool `env:"LOCALAI_AUTOLOAD_GALLERIES,AUTOLOAD_GALLERIES" group:"models" default:"true"`
AutoloadBackendGalleries bool `env:"LOCALAI_AUTOLOAD_BACKEND_GALLERIES,AUTOLOAD_BACKEND_GALLERIES" group:"backends" default:"true"`
BackendImagesReleaseTag string `env:"LOCALAI_BACKEND_IMAGES_RELEASE_TAG,BACKEND_IMAGES_RELEASE_TAG" help:"Fallback release tag for backend images" group:"backends" default:"latest"`
BackendImagesBranchTag string `env:"LOCALAI_BACKEND_IMAGES_BRANCH_TAG,BACKEND_IMAGES_BRANCH_TAG" help:"Fallback branch tag for backend images" group:"backends" default:"master"`
BackendDevSuffix string `env:"LOCALAI_BACKEND_DEV_SUFFIX,BACKEND_DEV_SUFFIX" help:"Development suffix for backend images" group:"backends" default:"development"`
AutoUpgradeBackends bool `env:"LOCALAI_AUTO_UPGRADE_BACKENDS,AUTO_UPGRADE_BACKENDS" help:"Automatically upgrade backends when new versions are detected" group:"backends" default:"false"`
PreferDevelopmentBackends bool `env:"LOCALAI_PREFER_DEV_BACKENDS,PREFER_DEV_BACKENDS" help:"Prefer development backend versions (shows development backends by default in UI)" group:"backends" default:"false"`
PreloadModels string `env:"LOCALAI_PRELOAD_MODELS,PRELOAD_MODELS" help:"A List of models to apply in JSON at start" group:"models"`
Models []string `env:"LOCALAI_MODELS,MODELS" help:"A List of model configuration URLs to load" group:"models"`
PreloadModelsConfig string `env:"LOCALAI_PRELOAD_MODELS_CONFIG,PRELOAD_MODELS_CONFIG" help:"A List of models to apply at startup. Path to a YAML config file" group:"models"`
Models []string `env:"LOCALAI_MODELS,MODELS" help:"A List of model configuration URLs to load" group:"models"`
PreloadModelsConfig string `env:"LOCALAI_PRELOAD_MODELS_CONFIG,PRELOAD_MODELS_CONFIG" help:"A List of models to apply at startup. Path to a YAML config file" group:"models"`
F16 bool `name:"f16" env:"LOCALAI_F16,F16" help:"Enable GPU acceleration" group:"performance"`
Threads int `env:"LOCALAI_THREADS,THREADS" short:"t" help:"Number of threads used for parallel computation. Usage of the number of physical cores in the system is suggested" group:"performance"`
@@ -145,18 +145,16 @@ type RunCMD struct {
DefaultAPIKeyExpiry string `env:"LOCALAI_DEFAULT_API_KEY_EXPIRY" help:"Default expiry for API keys (e.g. 90d, 1y; empty = no expiry)" group:"auth"`
// Distributed / Horizontal Scaling
Distributed bool `env:"LOCALAI_DISTRIBUTED" default:"false" help:"Enable distributed mode (requires PostgreSQL + NATS)" group:"distributed"`
InstanceID string `env:"LOCALAI_INSTANCE_ID" help:"Unique instance ID for distributed mode (auto-generated UUID if empty)" group:"distributed"`
NatsURL string `env:"LOCALAI_NATS_URL" help:"NATS server URL (e.g., nats://localhost:4222)" group:"distributed"`
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3-compatible storage endpoint URL (e.g., http://minio:9000)" group:"distributed"`
StorageBucket string `env:"LOCALAI_STORAGE_BUCKET" default:"localai" help:"S3 bucket name for object storage" group:"distributed"`
StorageRegion string `env:"LOCALAI_STORAGE_REGION" default:"us-east-1" help:"S3 region" group:"distributed"`
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
BackendInstallTimeout string `env:"LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT" help:"NATS round-trip timeout for backend.install requests sent to worker nodes (default 15m). Increase for slow links pulling multi-GB images." group:"distributed"`
BackendUpgradeTimeout string `env:"LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT" help:"NATS round-trip timeout for backend.upgrade requests (default 15m)." group:"distributed"`
Distributed bool `env:"LOCALAI_DISTRIBUTED" default:"false" help:"Enable distributed mode (requires PostgreSQL + NATS)" group:"distributed"`
InstanceID string `env:"LOCALAI_INSTANCE_ID" help:"Unique instance ID for distributed mode (auto-generated UUID if empty)" group:"distributed"`
NatsURL string `env:"LOCALAI_NATS_URL" help:"NATS server URL (e.g., nats://localhost:4222)" group:"distributed"`
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3-compatible storage endpoint URL (e.g., http://minio:9000)" group:"distributed"`
StorageBucket string `env:"LOCALAI_STORAGE_BUCKET" default:"localai" help:"S3 bucket name for object storage" group:"distributed"`
StorageRegion string `env:"LOCALAI_STORAGE_REGION" default:"us-east-1" help:"S3 region" group:"distributed"`
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
Version bool
}
@@ -257,20 +255,6 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if r.StorageSecretKey != "" {
opts = append(opts, config.WithStorageSecretKey(r.StorageSecretKey))
}
if r.BackendInstallTimeout != "" {
d, err := time.ParseDuration(r.BackendInstallTimeout)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT %q: %w", r.BackendInstallTimeout, err)
}
opts = append(opts, config.WithBackendInstallTimeout(d))
}
if r.BackendUpgradeTimeout != "" {
d, err := time.ParseDuration(r.BackendUpgradeTimeout)
if err != nil {
return fmt.Errorf("invalid LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT %q: %w", r.BackendUpgradeTimeout, err)
}
opts = append(opts, config.WithBackendUpgradeTimeout(d))
}
if r.RegistrationToken != "" {
opts = append(opts, config.WithRegistrationToken(r.RegistrationToken))
}

View File

@@ -40,10 +40,7 @@ type DistributedConfig struct {
// model-row cleanup on MarkUnhealthy / MarkDraining).
DisablePerModelHealthCheck bool
MCPCIJobTimeout time.Duration // MCP CI job execution timeout (default 10m)
BackendInstallTimeout time.Duration // NATS round-trip timeout for backend.install (default 15m)
BackendUpgradeTimeout time.Duration // NATS round-trip timeout for backend.upgrade (default 15m)
MCPCIJobTimeout time.Duration // MCP CI job execution timeout (default 10m)
MaxUploadSize int64 // Maximum upload body size in bytes (default 50 GB)
@@ -71,15 +68,13 @@ func (c DistributedConfig) Validate() error {
}
// Check for negative durations
for name, d := range map[string]time.Duration{
FlagMCPToolTimeout: c.MCPToolTimeout,
FlagMCPDiscoveryTimeout: c.MCPDiscoveryTimeout,
FlagWorkerWaitTimeout: c.WorkerWaitTimeout,
FlagDrainTimeout: c.DrainTimeout,
FlagHealthCheckInterval: c.HealthCheckInterval,
FlagStaleNodeThreshold: c.StaleNodeThreshold,
FlagMCPCIJobTimeout: c.MCPCIJobTimeout,
FlagBackendInstallTimeout: c.BackendInstallTimeout,
FlagBackendUpgradeTimeout: c.BackendUpgradeTimeout,
"mcp-tool-timeout": c.MCPToolTimeout,
"mcp-discovery-timeout": c.MCPDiscoveryTimeout,
"worker-wait-timeout": c.WorkerWaitTimeout,
"drain-timeout": c.DrainTimeout,
"health-check-interval": c.HealthCheckInterval,
"stale-node-threshold": c.StaleNodeThreshold,
"mcp-ci-job-timeout": c.MCPCIJobTimeout,
} {
if d < 0 {
return fmt.Errorf("%s must not be negative", name)
@@ -142,66 +137,24 @@ func WithStorageSecretKey(key string) AppOption {
}
}
func WithBackendInstallTimeout(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.BackendInstallTimeout = d
}
}
func WithBackendUpgradeTimeout(d time.Duration) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.BackendUpgradeTimeout = d
}
}
var EnableAutoApproveNodes = func(o *ApplicationConfig) {
o.Distributed.AutoApproveNodes = true
}
// Flag names for distributed timeout / interval configuration. These are
// the kebab-case identifiers kong derives from the matching RunCMD struct
// fields; they appear in Validate error messages and any other operator-
// facing surface that needs to reference a specific knob by name. Keeping
// them as constants prevents the string from drifting from the actual
// flag a future rename would produce.
const (
FlagMCPToolTimeout = "mcp-tool-timeout"
FlagMCPDiscoveryTimeout = "mcp-discovery-timeout"
FlagWorkerWaitTimeout = "worker-wait-timeout"
FlagDrainTimeout = "drain-timeout"
FlagHealthCheckInterval = "health-check-interval"
FlagStaleNodeThreshold = "stale-node-threshold"
FlagMCPCIJobTimeout = "mcp-ci-job-timeout"
FlagBackendInstallTimeout = "backend-install-timeout"
FlagBackendUpgradeTimeout = "backend-upgrade-timeout"
)
// Defaults for distributed timeouts.
const (
DefaultMCPToolTimeout = 360 * time.Second
DefaultMCPDiscoveryTimeout = 60 * time.Second
DefaultWorkerWaitTimeout = 5 * time.Minute
DefaultDrainTimeout = 30 * time.Second
DefaultHealthCheckInterval = 15 * time.Second
DefaultStaleNodeThreshold = 60 * time.Second
DefaultMCPCIJobTimeout = 10 * time.Minute
DefaultBackendInstallTimeout = 15 * time.Minute
DefaultBackendUpgradeTimeout = 15 * time.Minute
DefaultMCPToolTimeout = 360 * time.Second
DefaultMCPDiscoveryTimeout = 60 * time.Second
DefaultWorkerWaitTimeout = 5 * time.Minute
DefaultDrainTimeout = 30 * time.Second
DefaultHealthCheckInterval = 15 * time.Second
DefaultStaleNodeThreshold = 60 * time.Second
DefaultMCPCIJobTimeout = 10 * time.Minute
)
// DefaultMaxUploadSize is the default maximum upload body size (50 GB).
const DefaultMaxUploadSize int64 = 50 << 30
// BackendInstallTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendInstallTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendInstallTimeout, DefaultBackendInstallTimeout)
}
// BackendUpgradeTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) BackendUpgradeTimeoutOrDefault() time.Duration {
return cmp.Or(c.BackendUpgradeTimeout, DefaultBackendUpgradeTimeout)
}
// MCPToolTimeoutOrDefault returns the configured timeout or the default.
func (c DistributedConfig) MCPToolTimeoutOrDefault() time.Duration {
return cmp.Or(c.MCPToolTimeout, DefaultMCPToolTimeout)

View File

@@ -1,90 +0,0 @@
package config_test
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
)
var _ = Describe("DistributedConfig backend NATS timeouts", func() {
Context("BackendInstallTimeoutOrDefault", func() {
It("returns 15 minutes when unset", func() {
c := config.DistributedConfig{}
Expect(c.BackendInstallTimeoutOrDefault()).To(Equal(15 * time.Minute))
})
It("returns the configured value when set", func() {
c := config.DistributedConfig{BackendInstallTimeout: 42 * time.Minute}
Expect(c.BackendInstallTimeoutOrDefault()).To(Equal(42 * time.Minute))
})
})
Context("BackendUpgradeTimeoutOrDefault", func() {
It("returns 15 minutes when unset", func() {
c := config.DistributedConfig{}
Expect(c.BackendUpgradeTimeoutOrDefault()).To(Equal(15 * time.Minute))
})
It("returns the configured value when set", func() {
c := config.DistributedConfig{BackendUpgradeTimeout: 30 * time.Minute}
Expect(c.BackendUpgradeTimeoutOrDefault()).To(Equal(30 * time.Minute))
})
})
})
var _ = Describe("DistributedConfig flag-name constants", func() {
// Pin the kebab-case strings so a rename of the Go field name (or a
// CLI flag naming convention change) forces the constant to update,
// keeping the Validate error messages and any future operator-facing
// surface in sync with the actual CLI flag.
DescribeTable("flag name constants",
func(actual, expected string) {
Expect(actual).To(Equal(expected))
},
Entry("MCP tool timeout", config.FlagMCPToolTimeout, "mcp-tool-timeout"),
Entry("MCP discovery timeout", config.FlagMCPDiscoveryTimeout, "mcp-discovery-timeout"),
Entry("worker wait timeout", config.FlagWorkerWaitTimeout, "worker-wait-timeout"),
Entry("drain timeout", config.FlagDrainTimeout, "drain-timeout"),
Entry("health check interval", config.FlagHealthCheckInterval, "health-check-interval"),
Entry("stale node threshold", config.FlagStaleNodeThreshold, "stale-node-threshold"),
Entry("MCP CI job timeout", config.FlagMCPCIJobTimeout, "mcp-ci-job-timeout"),
Entry("backend install timeout", config.FlagBackendInstallTimeout, "backend-install-timeout"),
Entry("backend upgrade timeout", config.FlagBackendUpgradeTimeout, "backend-upgrade-timeout"),
)
})
var _ = Describe("DistributedConfig.Validate negative-duration errors", func() {
It("rejects a negative BackendInstallTimeout with the flag name in the error", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
BackendInstallTimeout: -1 * time.Second,
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring(config.FlagBackendInstallTimeout))
Expect(err.Error()).To(ContainSubstring("must not be negative"))
})
It("rejects a negative BackendUpgradeTimeout with the flag name in the error", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
BackendUpgradeTimeout: -1 * time.Second,
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring(config.FlagBackendUpgradeTimeout))
})
It("accepts all-zero durations as valid (defaults apply)", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
}
Expect(c.Validate()).To(Succeed())
})
})

View File

@@ -649,7 +649,6 @@
align-items: center;
gap: var(--spacing-md);
padding: var(--spacing-xs) 0;
flex-wrap: wrap;
}
.operation-info {
@@ -740,110 +739,6 @@
color: var(--color-error);
}
/* Operations bar: per-node breakdown (multi-worker installs) */
.operation-expand {
background: none;
border: none;
color: var(--color-text-muted);
cursor: pointer;
padding: 0 var(--spacing-xs);
font-size: var(--text-xs);
display: inline-flex;
align-items: center;
gap: 0.25rem;
}
.operation-expand:hover {
color: var(--color-text-primary);
}
.operation-expand-label {
font-size: var(--text-xs);
}
.operation-nodes-list {
list-style: none;
margin: var(--spacing-xs) 0 0;
padding: var(--spacing-xs) 0 0;
border-top: 1px solid var(--color-border-subtle);
flex-basis: 100%;
width: 100%;
}
.operation-node {
display: flex;
align-items: center;
gap: var(--spacing-sm);
padding: var(--spacing-xs) 0;
font-size: var(--text-xs);
color: var(--color-text-muted);
flex-wrap: wrap;
}
.operation-node-status {
padding: 2px 6px;
border-radius: var(--radius-md);
font-size: 0.65rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.025em;
white-space: nowrap;
}
.operation-node-status-success {
background: var(--color-success-light);
color: var(--color-success);
}
.operation-node-status-error {
background: var(--color-error-light);
color: var(--color-error);
}
.operation-node-status-queued {
background: var(--color-bg-tertiary);
color: var(--color-text-muted);
}
.operation-node-status-running_on_worker {
background: var(--color-warning-light);
color: var(--color-warning);
}
.operation-node-status-downloading {
background: var(--color-primary-light);
color: var(--color-primary);
}
.operation-node-name {
font-weight: 500;
color: var(--color-text-secondary);
}
.operation-node-file {
font-family: var(--font-mono);
color: var(--color-text-tertiary);
overflow: hidden;
text-overflow: ellipsis;
max-width: 30ch;
white-space: nowrap;
}
.operation-node-bytes {
font-variant-numeric: tabular-nums;
color: var(--color-text-tertiary);
}
.operation-node-pct {
font-variant-numeric: tabular-nums;
color: var(--color-primary);
font-weight: 500;
}
.operation-node-error {
color: var(--color-error);
}
.operation-node-bar-container {
flex-basis: 100%;
height: 3px;
background: var(--color-surface-sunken);
border-radius: var(--radius-full);
overflow: hidden;
margin-top: 0.25rem;
}
.operation-node-bar {
height: 100%;
background: var(--color-primary);
border-radius: var(--radius-full);
transition: width var(--duration-slow, 0.3s) var(--ease-spring, ease);
}
/* Toast */
.toast-container {
position: fixed;

View File

@@ -1,33 +1,14 @@
import { useState } from 'react'
import { useOperations } from '../hooks/useOperations'
const nodeStatusLabels = {
success: 'Done',
error: 'Failed',
queued: 'Queued',
running_on_worker: 'Worker busy',
downloading: 'Downloading',
}
const runningOnWorkerTooltip = 'NATS round-trip timed out, but the worker is still installing in the background. The reconciler will confirm completion.'
export default function OperationsBar() {
const { operations, cancelOperation, dismissFailedOp } = useOperations()
const [expanded, setExpanded] = useState({})
if (operations.length === 0) return null
const toggle = (key) => setExpanded((m) => ({ ...m, [key]: !m[key] }))
return (
<div className="operations-bar">
{operations.map(op => {
const key = op.jobID || op.id
const nodes = Array.isArray(op.nodes) ? op.nodes : []
const canExpand = nodes.length > 1
const isOpen = !!expanded[key]
return (
<div key={key} className="operation-item">
{operations.map(op => (
<div key={op.jobID || op.id} className="operation-item">
<div className="operation-info">
{op.error ? (
<i className="fas fa-circle-exclamation" style={{ color: 'var(--color-error)', marginRight: 'var(--spacing-xs)' }} />
@@ -99,55 +80,8 @@ export default function OperationsBar() {
<i className="fas fa-xmark" />
</button>
) : null}
{canExpand && (
<button
type="button"
className="operation-expand"
onClick={() => toggle(key)}
aria-expanded={isOpen}
title={isOpen ? 'Hide per-node detail' : `Show ${nodes.length} nodes`}
>
<i className={`fas fa-chevron-${isOpen ? 'up' : 'down'}`} />
<span className="operation-expand-label">{nodes.length} nodes</span>
</button>
)}
{canExpand && isOpen && (
<ul className="operation-nodes-list">
{nodes.map((n) => (
<li key={n.node_id} className={`operation-node operation-node-${n.status}`}>
<span
className={`operation-node-status operation-node-status-${n.status}`}
title={n.status === 'running_on_worker' ? runningOnWorkerTooltip : undefined}
>
{nodeStatusLabels[n.status] || n.status}
</span>
<span className="operation-node-name">{n.node_name || n.node_id}</span>
{n.file_name && <span className="operation-node-file">{n.file_name}</span>}
{(n.current || n.total) && (
<span className="operation-node-bytes">
{n.current || '?'} / {n.total || '?'}
</span>
)}
{n.percentage > 0 && (
<span className="operation-node-pct">{Math.round(n.percentage)}%</span>
)}
{n.error && (
<span className="operation-node-error" title={n.error}>
{n.error.length > 80 ? n.error.slice(0, 80) + '...' : n.error}
</span>
)}
{n.percentage > 0 && n.percentage < 100 && (
<div className="operation-node-bar-container">
<div className="operation-node-bar" style={{ width: `${n.percentage}%` }} />
</div>
)}
</li>
))}
</ul>
)}
</div>
)
})}
))}
</div>
)
}

View File

@@ -10,7 +10,6 @@ import (
"net/http"
"net/url"
"slices"
"sort"
"strconv"
"strings"
"time"
@@ -58,6 +57,7 @@ var usecaseFilters = map[string]config.ModelConfigUsecase{
config.UsecaseRealtimeAudio: config.FLAG_REALTIME_AUDIO,
}
// extractHFRepo tries to find a HuggingFace repo ID from model overrides or URLs.
func extractHFRepo(overrides map[string]any, urls []string) string {
if overrides != nil {
@@ -257,44 +257,6 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
if status != nil && status.Error != nil {
opData["error"] = status.Error.Error()
}
// Expose the per-node breakdown when the Phase 4 progress sink
// has populated OpStatus.Nodes (distributed backend installs).
// We sort by node_name for stable UI rendering across polls;
// the underlying slice is order-dependent on UpdateNodeProgress
// arrival order, which the UI must not depend on. Single-node
// ops and model installs leave Nodes empty so this block emits
// no key, preserving the legacy payload shape.
if status != nil && len(status.Nodes) > 0 {
nodes := make([]map[string]any, 0, len(status.Nodes))
for _, n := range status.Nodes {
entry := map[string]any{
"node_id": n.NodeID,
"node_name": n.NodeName,
"status": n.Status,
"percentage": n.Percentage,
}
if n.FileName != "" {
entry["file_name"] = n.FileName
}
if n.Current != "" {
entry["current"] = n.Current
}
if n.Total != "" {
entry["total"] = n.Total
}
if n.Phase != "" {
entry["phase"] = n.Phase
}
if n.Error != "" {
entry["error"] = n.Error
}
nodes = append(nodes, entry)
}
sort.SliceStable(nodes, func(i, j int) bool {
return fmt.Sprintf("%v", nodes[i]["node_name"]) < fmt.Sprintf("%v", nodes[j]["node_name"])
})
opData["nodes"] = nodes
}
operations = append(operations, opData)
}
@@ -595,11 +557,11 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
NodeStatus string `json:"node_status"`
}
type modelCapability struct {
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
ID string `json:"id"`
Capabilities []string `json:"capabilities"`
Backend string `json:"backend"`
Disabled bool `json:"disabled"`
Pinned bool `json:"pinned"`
// LoadedOn is populated only when the node registry is active
// (distributed mode). Lets the UI show "loaded on worker-1" without
// the operator having to expand every node manually. An empty slice
@@ -1197,17 +1159,17 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
}
return c.JSON(200, map[string]any{
"backends": backendsJSON,
"repositories": appConfig.BackendGalleries,
"allTags": tags,
"processingBackends": processingBackendsData,
"taskTypes": taskTypes,
"availableBackends": totalBackends,
"installedBackends": installedBackendsCount,
"currentPage": pageNum,
"totalPages": totalPages,
"prevPage": prevPage,
"nextPage": nextPage,
"backends": backendsJSON,
"repositories": appConfig.BackendGalleries,
"allTags": tags,
"processingBackends": processingBackendsData,
"taskTypes": taskTypes,
"availableBackends": totalBackends,
"installedBackends": installedBackendsCount,
"currentPage": pageNum,
"totalPages": totalPages,
"prevPage": prevPage,
"nextPage": nextPage,
"systemCapability": detectedCapability,
"preferDevelopmentBackends": appConfig.PreferDevelopmentBackends,
})
@@ -1637,3 +1599,4 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
app.DELETE("/api/branding/asset/:kind", localai.DeleteBrandingAssetEndpoint(appConfig), adminMiddleware)
}

View File

@@ -62,63 +62,6 @@ var _ = Describe("/api/operations with node-scoped backend ops", func() {
Expect(found["isBackend"]).To(Equal(true))
})
It("surfaces per-node OpStatus entries on /api/operations", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)
opcache := galleryop.NewOpCache(galleryService)
jobID := "test-op-nodes-1"
// Register a backend op so the handler treats this as a backend
// install (no need to consult the gallery during the test).
opcache.SetBackend("vllm", jobID)
// Populate per-node entries via the P4.2 helper. The helper also
// allocates an OpStatus under jobID, which the handler will read.
galleryService.UpdateNodeProgress(jobID, "node-b", galleryop.NodeProgress{
NodeID: "node-b", NodeName: "worker-b", Status: galleryop.NodeStatusRunningOnWorker,
})
galleryService.UpdateNodeProgress(jobID, "node-a", galleryop.NodeProgress{
NodeID: "node-a", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 30, FileName: "vllm.tar",
})
e := echo.New()
routes.RegisterUIAPIRoutes(e, nil, nil, appCfg, galleryService, opcache, &application.Application{}, noopMw)
req := httptest.NewRequest(http.MethodGet, "/api/operations", nil)
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
Expect(rec.Code).To(Equal(http.StatusOK))
var envelope struct {
Operations []map[string]any `json:"operations"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &envelope)).To(Succeed())
var found map[string]any
for _, op := range envelope.Operations {
if op["jobID"] == jobID {
found = op
break
}
}
Expect(found).ToNot(BeNil(), "operation should appear in /api/operations")
nodes, ok := found["nodes"].([]any)
Expect(ok).To(BeTrue(), "operation should have a nodes array")
Expect(nodes).To(HaveLen(2))
// Stable sort by node_name: "worker-a" comes before "worker-b"
// even though UpdateNodeProgress was called in reverse order.
first := nodes[0].(map[string]any)
Expect(first["node_name"]).To(Equal("worker-a"))
Expect(first["status"]).To(Equal("downloading"))
Expect(first["file_name"]).To(Equal("vllm.tar"))
Expect(first["percentage"]).To(Equal(30.0))
second := nodes[1].(map[string]any)
Expect(second["node_name"]).To(Equal("worker-b"))
Expect(second["status"]).To(Equal("running_on_worker"))
})
It("does not emit nodeID for non-node-scoped backend ops", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)

View File

@@ -91,21 +91,6 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend,
})
return err
}
if errors.Is(err, ErrWorkerStillInstalling) {
// Soft failure: at least one worker timed out replying but is
// still running the install in the background. Mark the op as
// processed with a non-error message so the admin UI shows a
// yellow in-progress state rather than red. The reconciler's
// next pass will reconcile the actual outcome via backend.list.
xlog.Info("worker still installing in background", "backend", op.GalleryElementName, "error", err)
g.UpdateStatus(op.ID, &OpStatus{
Processed: true,
GalleryElementName: op.GalleryElementName,
Message: fmt.Sprintf("backend %s: worker still installing in background; reconciler will confirm completion (%v)", op.GalleryElementName, err),
Cancellable: false,
})
return nil
}
xlog.Error("error installing backend", "error", err, "backend", op.GalleryElementName)
if !op.Delete {
// If we didn't install the backend, we need to make sure we don't have a leftover directory

View File

@@ -1,13 +0,0 @@
package galleryop
import "errors"
// ErrWorkerStillInstalling indicates a distributed backend install
// timed out at the NATS round-trip layer but the worker is most likely
// still pulling the OCI image in the background. Producers
// (DistributedBackendManager) wrap this when the round-trip times out;
// consumers (backendHandler) use errors.Is(err, ErrWorkerStillInstalling)
// to surface a yellow "in progress" OpStatus instead of a red error,
// leaving the pending_backend_ops row in place for the reconciler to
// confirm via backend.list.
var ErrWorkerStillInstalling = errors.New("worker did not reply in time; install may still be running in the background")

View File

@@ -1,149 +0,0 @@
package galleryop_test
import (
"encoding/json"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
)
var _ = Describe("NodeStatus constants", func() {
// Pin the wire-format string values. A future refactor that renames
// a constant must NOT silently change the JSON value the UI receives
// (or the cross-package contract with the nodes package, which
// reuses these constants for NodeOpStatus.Status).
DescribeTable("status constant",
func(actual, expected string) {
Expect(actual).To(Equal(expected))
},
Entry("queued", galleryop.NodeStatusQueued, "queued"),
Entry("downloading", galleryop.NodeStatusDownloading, "downloading"),
Entry("running on worker", galleryop.NodeStatusRunningOnWorker, "running_on_worker"),
Entry("success", galleryop.NodeStatusSuccess, "success"),
Entry("error", galleryop.NodeStatusError, "error"),
)
})
var _ = Describe("OpStatus.Nodes", func() {
It("defaults to empty on a fresh OpStatus", func() {
os := &galleryop.OpStatus{}
Expect(os.Nodes).To(BeEmpty())
})
It("JSON round-trips with all NodeProgress fields", func() {
os := &galleryop.OpStatus{
Nodes: []galleryop.NodeProgress{
{
NodeID: "node-1",
NodeName: "worker-a",
Status: galleryop.NodeStatusRunningOnWorker,
FileName: "vllm.tar.zst",
Current: "412 MB",
Total: "2.1 GB",
Percentage: 19.6,
Phase: "downloading", // literal pins the wire-format value
Error: "",
},
},
}
raw, err := json.Marshal(os)
Expect(err).ToNot(HaveOccurred())
got := &galleryop.OpStatus{}
Expect(json.Unmarshal(raw, got)).To(Succeed())
Expect(got.Nodes).To(HaveLen(1))
Expect(got.Nodes[0]).To(Equal(os.Nodes[0]))
})
})
var _ = Describe("GalleryService.UpdateNodeProgress", func() {
var svc *galleryop.GalleryService
BeforeEach(func() {
// UpdateNodeProgress + GetStatus only touch the in-memory statuses
// map. A zero-value ApplicationConfig is enough to get past the
// LocalModelManager / LocalBackendManager constructors.
svc = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
})
It("creates a node entry on first call", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 12.0,
})
st := svc.GetStatus("op1")
Expect(st).ToNot(BeNil())
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].NodeID).To(Equal("n1"))
Expect(st.Nodes[0].Percentage).To(Equal(12.0))
})
It("merges subsequent updates into the same NodeID entry, not appending", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 48.0, FileName: "vllm.tar"})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].Percentage).To(Equal(48.0))
Expect(st.Nodes[0].FileName).To(Equal("vllm.tar"))
})
It("appends a new entry for a different NodeID", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n2", galleryop.NodeProgress{NodeID: "n2", NodeName: "worker-b", Status: galleryop.NodeStatusQueued})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(2))
})
It("mirrors the latest tick into the aggregate OpStatus fields", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading,
Percentage: 33.0, FileName: "vllm.tar", Current: "330 MB", Total: "1 GB",
})
st := svc.GetStatus("op1")
Expect(st.Progress).To(Equal(33.0))
Expect(st.FileName).To(Equal("vllm.tar"))
Expect(st.DownloadedFileSize).To(Equal("330 MB"))
Expect(st.TotalFileSize).To(Equal("1 GB"))
})
It("preserves accumulated Nodes when a subsequent UpdateStatus comes through the legacy path", func() {
// Regression: the Phase 2 progress bridge also calls the legacy
// progressCb -> UpdateStatus(opID, &OpStatus{...}) on every tick.
// Without preservation that overwrite would wipe the Nodes slice
// and the UI would flicker between one node and another on a
// multi-worker install. UpdateStatus must carry forward existing
// Nodes when the incoming op has none.
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusSuccess})
svc.UpdateNodeProgress("op1", "n2", galleryop.NodeProgress{NodeID: "n2", NodeName: "worker-b", Status: galleryop.NodeStatusDownloading, Percentage: 30.0})
// Now simulate the legacy progressCb path: a fresh OpStatus
// pointer with no Nodes set, carrying only aggregate fields.
svc.UpdateStatus("op1", &galleryop.OpStatus{
Progress: 30.0,
Message: "downloading",
})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(2), "Nodes accumulated before the legacy UpdateStatus must be preserved")
ids := []string{st.Nodes[0].NodeID, st.Nodes[1].NodeID}
Expect(ids).To(ConsistOf("n1", "n2"))
})
It("allows an explicit empty-then-populated Nodes transition to win when caller sets Nodes", func() {
// If a caller explicitly passes a non-empty Nodes slice on the
// incoming op, that should replace the existing slice (no merge).
// Only an EMPTY incoming slice triggers the carry-forward.
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusSuccess})
svc.UpdateStatus("op1", &galleryop.OpStatus{
Progress: 100.0,
Nodes: []galleryop.NodeProgress{
{NodeID: "n9", NodeName: "worker-final", Status: galleryop.NodeStatusSuccess},
},
})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].NodeID).To(Equal("n9"))
})
})

View File

@@ -53,45 +53,6 @@ type OpStatus struct {
GalleryElementName string `json:"gallery_element_name"`
Cancelled bool `json:"cancelled"` // Cancelled is true if the operation was cancelled
Cancellable bool `json:"cancellable"` // Cancellable is true if the operation can be cancelled
// Nodes is the per-node breakdown for a fanned-out backend install.
// Populated by DistributedBackendManager (per-node terminal status)
// and by the Phase 2 progress bridge (per-byte ticks). The
// /api/operations handler surfaces this so the UI can render an
// expandable per-node view of an in-flight install.
Nodes []NodeProgress `json:"nodes,omitempty"`
}
// NodeStatus values shared between NodeProgress (per-node tick) and the
// NodeOpStatus surfaced by DistributedBackendManager's fan-out. Defined
// as exported constants so producers (the manager, the progress bridge)
// and consumers (the /api/operations handler, the React OperationsBar
// through its JSON contract) stay in sync via a single source of truth.
const (
NodeStatusQueued = "queued" // node accepted the intent but install has not started
NodeStatusDownloading = "downloading" // worker is actively pulling the OCI image
NodeStatusRunningOnWorker = "running_on_worker" // NATS round-trip timed out but worker is still installing
NodeStatusSuccess = "success" // install completed on this node
NodeStatusError = "error" // install failed on this node
)
// NodeProgress is a single node's contribution to a backend install
// operation. Populated by DistributedBackendManager (per-node terminal
// status) and by the Phase 2 progress bridge (per-byte ticks). Read by
// the /api/operations handler so the UI can render an expandable
// per-node breakdown.
//
// Status holds one of the NodeStatus* constants above.
type NodeProgress struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"`
FileName string `json:"file_name,omitempty"`
Current string `json:"current,omitempty"`
Total string `json:"total,omitempty"`
Percentage float64 `json:"percentage"`
Phase string `json:"phase,omitempty"`
Error string `json:"error,omitempty"`
}
type OpCache struct {

View File

@@ -110,18 +110,6 @@ func (g *GalleryService) DeleteBackend(name string) error {
func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
g.Lock()
defer g.Unlock()
// Preserve any per-node entries already accumulated by UpdateNodeProgress:
// the legacy progressCb path (used by the Phase 2 install bridge) calls
// UpdateStatus with a fresh *OpStatus on every tick, which would otherwise
// wipe the Nodes slice and leave the UI flickering between one node and
// another. If the caller explicitly populates Nodes on the incoming op,
// that wins; an empty Nodes slice on the incoming op is treated as "no
// new per-node data" and the previous Nodes are carried forward.
if op != nil && len(op.Nodes) == 0 {
if prev := g.statuses[s]; prev != nil && len(prev.Nodes) > 0 {
op.Nodes = prev.Nodes
}
}
g.statuses[s] = op
// Persist to PostgreSQL in distributed mode
@@ -147,47 +135,6 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
}
}
// UpdateNodeProgress merges a per-node progress tick into OpStatus.Nodes,
// keyed by nodeID, and mirrors the latest values into the aggregate
// Progress / FileName / DownloadedFileSize / TotalFileSize / Message
// fields so the legacy single-bar OperationsBar view keeps working
// unchanged alongside the new per-node breakdown.
//
// We deliberately do NOT delegate the aggregate mirror to UpdateStatus
// here: UpdateStatus overwrites the entire OpStatus, which would clobber
// the Nodes slice we just merged into. Doing the merge + mirror under a
// single lock keeps both views consistent and concurrent-safe.
func (g *GalleryService) UpdateNodeProgress(opID, nodeID string, np NodeProgress) {
g.Lock()
defer g.Unlock()
status := g.statuses[opID]
if status == nil {
status = &OpStatus{}
g.statuses[opID] = status
}
merged := false
for i := range status.Nodes {
if status.Nodes[i].NodeID == nodeID {
status.Nodes[i] = np
merged = true
break
}
}
if !merged {
status.Nodes = append(status.Nodes, np)
}
// Mirror the latest tick into the legacy aggregate fields so the
// existing single-bar UI keeps rendering meaningful progress.
status.FileName = np.FileName
status.Progress = np.Percentage
status.DownloadedFileSize = np.Current
status.TotalFileSize = np.Total
if np.Phase != "" {
status.Message = np.Phase
}
}
func (g *GalleryService) GetStatus(s string) *OpStatus {
g.Lock()
defer g.Unlock()

View File

@@ -1,36 +0,0 @@
package messaging
// Phase values published on the BackendInstallProgressEvent.Phase field.
// Defined as exported constants so producer (worker install handler) and
// consumer (master bridge into OpStatus) share a single source of truth
// instead of two copies of the literal string.
const (
PhaseResolving = "resolving" // worker is locating the gallery / image manifest
PhaseDownloading = "downloading" // worker is actively pulling layers
PhaseExtracting = "extracting" // worker is unpacking the downloaded archive
PhaseStarting = "starting" // worker is spawning the gRPC backend process
)
// BackendInstallProgressEvent is the wire payload published by a worker to
// nodes.<nodeID>.backend.install.<opID>.progress while a long-running install
// is in flight. Transient: dropped events are acceptable, the master relies
// on BackendInstallReply for ground truth on success/failure.
//
// Phase holds one of the Phase* constants above.
type BackendInstallProgressEvent struct {
OpID string `json:"op_id"`
NodeID string `json:"node_id"`
Backend string `json:"backend"`
FileName string `json:"file_name,omitempty"`
Current string `json:"current,omitempty"` // human-readable size, e.g. "412 MB"
Total string `json:"total,omitempty"` // human-readable size, e.g. "2.1 GB"
Percentage float64 `json:"percentage"`
Phase string `json:"phase,omitempty"`
}
// SubjectNodeBackendInstallProgress returns the NATS subject for transient
// progress events emitted by a worker during a single backend.install run.
// Per-op so multiple concurrent installs on the same node never alias.
func SubjectNodeBackendInstallProgress(nodeID, opID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.install." + sanitizeSubjectToken(opID) + ".progress"
}

View File

@@ -1,66 +0,0 @@
package messaging_test
import (
"encoding/json"
"strings"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("Phase constants", func() {
// Pin the wire-format string values. A future refactor that renames
// a constant must NOT silently change the JSON value the master
// receives or break consumers that switch on Phase.
DescribeTable("phase constant",
func(actual, expected string) {
Expect(actual).To(Equal(expected))
},
Entry("resolving", messaging.PhaseResolving, "resolving"),
Entry("downloading", messaging.PhaseDownloading, "downloading"),
Entry("extracting", messaging.PhaseExtracting, "extracting"),
Entry("starting", messaging.PhaseStarting, "starting"),
)
})
var _ = Describe("BackendInstallProgress", func() {
Context("SubjectNodeBackendInstallProgress", func() {
It("composes the per-op progress subject", func() {
Expect(messaging.SubjectNodeBackendInstallProgress("node-abc", "op-123")).
To(Equal("nodes.node-abc.backend.install.op-123.progress"))
})
It("sanitizes NATS-reserved characters in node and op tokens", func() {
// '.' is the NATS hierarchy delimiter, '*' and '>' are wildcards,
// and whitespace must be stripped - sanitizeSubjectToken replaces
// all of them with '-'. The resulting subject must still parse as
// exactly six hierarchy segments: nodes/<node>/backend/install/<op>/progress.
subj := messaging.SubjectNodeBackendInstallProgress("a.b c", "x.y z")
Expect(subj).ToNot(ContainSubstring(" "))
Expect(strings.Count(subj, ".")).To(Equal(5))
})
})
Context("BackendInstallProgressEvent", func() {
It("JSON round-trips with all known fields", func() {
ev := messaging.BackendInstallProgressEvent{
OpID: "op-123",
NodeID: "node-abc",
Backend: "vllm",
FileName: "vllm-cpu.tar.zst",
Current: "412 MB",
Total: "2.1 GB",
Percentage: 19.6,
Phase: "downloading",
}
raw, err := json.Marshal(ev)
Expect(err).ToNot(HaveOccurred())
var got messaging.BackendInstallProgressEvent
Expect(json.Unmarshal(raw, &got)).To(Succeed())
Expect(got).To(Equal(ev))
})
})
})

View File

@@ -144,12 +144,6 @@ type BackendInstallRequest struct {
// worker still works (the master's install fallback path also uses this
// when backend.upgrade returns nats.ErrNoResponders).
Force bool `json:"force,omitempty"`
// OpID identifies the admin-side operation. When non-empty the worker
// publishes BackendInstallProgressEvent values to
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the install is
// running, debounced to roughly 250ms. Empty means the caller is a
// reconciler-driven retry that does not need progress streamed.
OpID string `json:"op_id,omitempty"`
}
// BackendInstallReply is the response from a backend.install NATS request.

View File

@@ -1,120 +0,0 @@
package nodes
import (
"sync"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
)
// DebouncedInstallProgressPublisher buffers backend-install download ticks
// and publishes them to the per-op NATS progress subject at most once per
// `interval`. Always publishes the final event on Flush so the UI sees the
// terminal percentage.
//
// Behavior: leading-edge debounce. The first OnDownload after a quiet window
// publishes immediately; subsequent ticks within `interval` only buffer the
// latest event, which is then emitted via a single trailing timer. This
// keeps the wire chatter bounded (~4 events per second at 250ms) while
// still surfacing every meaningful percentage jump.
//
// Lock ordering: never hold p.mu across a Publish call. Publish hits the
// NATS client which may block on a slow link, and we don't want a stalled
// network to stall the underlying gallery download loop.
type DebouncedInstallProgressPublisher struct {
mu sync.Mutex
client messaging.MessagingClient
subject string
nodeID string
opID string
backend string
interval time.Duration
lastPublishedAt time.Time
pending *messaging.BackendInstallProgressEvent
timer *time.Timer
}
// NewDebouncedInstallProgressPublisher constructs a publisher for one
// install operation. interval is the leading-edge debounce window
// (~250ms in production).
func NewDebouncedInstallProgressPublisher(client messaging.MessagingClient, nodeID, opID, backend string, interval time.Duration) *DebouncedInstallProgressPublisher {
return &DebouncedInstallProgressPublisher{
client: client,
subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID),
nodeID: nodeID,
opID: opID,
backend: backend,
interval: interval,
}
}
// OnDownload is the callback shape gallery.InstallBackendFromGallery and
// galleryop.InstallExternalBackend pass into the worker. Each invocation
// represents a single tick from the underlying io.Reader copy loop.
func (p *DebouncedInstallProgressPublisher) OnDownload(file, current, total string, percentage float64) {
ev := messaging.BackendInstallProgressEvent{
OpID: p.opID,
NodeID: p.nodeID,
Backend: p.backend,
FileName: file,
Current: current,
Total: total,
Percentage: percentage,
Phase: messaging.PhaseDownloading,
}
p.mu.Lock()
now := time.Now()
if p.lastPublishedAt.IsZero() || now.Sub(p.lastPublishedAt) >= p.interval {
// Leading edge: publish immediately.
p.lastPublishedAt = now
p.pending = nil
p.mu.Unlock()
_ = p.client.Publish(p.subject, ev)
return
}
// Within the window: buffer the latest event and arm a trailing
// publish. If a timer is already armed, we just overwrite p.pending so
// the trailing publish carries the freshest data.
p.pending = &ev
if p.timer == nil {
delay := p.interval - now.Sub(p.lastPublishedAt)
p.timer = time.AfterFunc(delay, p.flushPending)
}
p.mu.Unlock()
}
// flushPending is the trailing-edge publisher fired by the AfterFunc timer.
// It clears the pending slot under the lock, then publishes outside the
// lock so Publish never blocks an in-progress OnDownload call.
func (p *DebouncedInstallProgressPublisher) flushPending() {
p.mu.Lock()
p.timer = nil
pending := p.pending
p.pending = nil
if pending != nil {
p.lastPublishedAt = time.Now()
}
p.mu.Unlock()
if pending != nil {
_ = p.client.Publish(p.subject, *pending)
}
}
// Flush publishes any pending buffered event synchronously and stops the
// pending timer. Safe to call multiple times. Callers MUST defer Flush
// after constructing the publisher so the terminal percentage reaches the
// master even on error returns.
func (p *DebouncedInstallProgressPublisher) Flush() {
p.mu.Lock()
if p.timer != nil {
p.timer.Stop()
p.timer = nil
}
pending := p.pending
p.pending = nil
p.mu.Unlock()
if pending != nil {
_ = p.client.Publish(p.subject, *pending)
}
}

View File

@@ -1,48 +0,0 @@
package nodes
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("DebouncedInstallProgressPublisher", func() {
It("publishes the first event immediately and debounces subsequent ones within the window", func() {
mc := newScriptedMessagingClient()
pub := NewDebouncedInstallProgressPublisher(mc, "n1", "op1", "vllm", 50*time.Millisecond)
// Three rapid-fire ticks within the debounce window.
pub.OnDownload("vllm.tar.zst", "100 MB", "1 GB", 10.0)
pub.OnDownload("vllm.tar.zst", "200 MB", "1 GB", 20.0)
pub.OnDownload("vllm.tar.zst", "300 MB", "1 GB", 30.0)
pub.Flush()
// First event publishes immediately; the others coalesce; Flush guarantees a final.
// So we expect at least 2 publishes and at most 4 (lead + final + any window-bounded).
Eventually(func() int {
return len(mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1")))
}, "1s").Should(BeNumerically(">=", 2))
calls := mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1"))
Expect(len(calls)).To(BeNumerically("<=", 4),
"three ticks within the debounce window should produce at most ~4 publishes")
})
It("publishes the final event after Flush with the latest percentage", func() {
mc := newScriptedMessagingClient()
pub := NewDebouncedInstallProgressPublisher(mc, "n1", "op1", "vllm", 50*time.Millisecond)
pub.OnDownload("vllm.tar.zst", "1 GB", "1 GB", 100.0)
pub.Flush()
Eventually(func() float64 {
calls := mc.publishCalls(messaging.SubjectNodeBackendInstallProgress("n1", "op1"))
if len(calls) == 0 {
return -1
}
return calls[len(calls)-1].Percentage
}, "1s").Should(Equal(100.0))
})
})

View File

@@ -10,7 +10,6 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
@@ -49,13 +48,6 @@ func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryo
return d.local.InstallModel(ctx, op, progressCb)
}
// nodeProgressSink is the narrow interface DistributedBackendManager uses to
// publish per-node progress without dragging in the full *GalleryService.
// nil means "no sink, skip per-node writes" (used by single-node tests).
type nodeProgressSink interface {
UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress)
}
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
// for backend deletion so worker nodes clean up stale files.
type DistributedBackendManager struct {
@@ -64,31 +56,26 @@ type DistributedBackendManager struct {
registry *NodeRegistry
backendGalleries []config.Gallery
systemState *system.SystemState
progressSink nodeProgressSink
}
// NewDistributedBackendManager creates a DistributedBackendManager.
// progressSink may be nil to disable per-node OpStatus writes (single-node
// tests don't need it).
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry, progressSink nodeProgressSink) *DistributedBackendManager {
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
return &DistributedBackendManager{
local: galleryop.NewLocalBackendManager(appConfig, ml),
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
progressSink: progressSink,
}
}
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
// Returned as part of BackendOpResult so the frontend can surface exactly
// what happened on each worker instead of a single joined error string.
// Status holds one of the galleryop.NodeStatus* constants.
type NodeOpStatus struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"`
Status string `json:"status"` // "success" | "queued" | "error"
Error string `json:"error,omitempty"`
}
@@ -106,7 +93,7 @@ type BackendOpResult struct {
func (r BackendOpResult) Err() error {
var failures []string
for _, n := range r.Nodes {
if n.Status == galleryop.NodeStatusError {
if n.Status == "error" {
failures = append(failures, fmt.Sprintf("%s: %s", n.NodeName, n.Error))
}
}
@@ -129,48 +116,25 @@ func (r BackendOpResult) Err() error {
// when the node returns.
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
// never had the backend installed to "upgrade" it - such requests fail at the
// never had the backend installed to "upgrade" it such requests fail at the
// gallery (no platform variant) and would otherwise leave a forever-retrying
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
//
// opID is the gallery operation identifier; when non-empty and progressSink is
// set, every per-node terminal status appended to BackendOpResult is also
// mirrored into the sink so the UI's per-node OpStatus.Nodes view stays in
// lockstep with the manager's view. opID may be empty for ops that aren't
// gallery-tracked (e.g. DeleteBackend's plain code path).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, opID, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
// emitNodeProgress is a small helper that funnels every NodeOpStatus we
// append to result.Nodes into the per-node OpStatus sink (when configured
// and opID is known). Keeping it inline avoids drift between the
// BackendOpResult view and the sink view - they're written from the same
// code path on the same terminal statuses.
emitNodeProgress := func(node BackendNode, status, errMsg string) {
if d.progressSink == nil || opID == "" {
return
}
d.progressSink.UpdateNodeProgress(opID, node.ID, galleryop.NodeProgress{
NodeID: node.ID,
NodeName: node.Name,
Status: status,
Error: errMsg,
})
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet - no intent to apply.
// Pending nodes haven't been approved yet no intent to apply.
if node.Status == StatusPending {
continue
}
// Backend lifecycle ops only make sense on backend-type workers.
// Agent workers don't subscribe to backend.install/delete/list, so
// enqueueing for them guarantees a forever-retrying row that the
// reconciler can never drain. Silently skip - they aren't consumers.
// reconciler can never drain. Silently skip they aren't consumers.
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
@@ -179,23 +143,19 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
errMsg := fmt.Sprintf("enqueue failed: %v", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusError,
Error: errMsg,
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
})
emitNodeProgress(node, galleryop.NodeStatusError, errMsg)
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
errMsg := fmt.Sprintf("node %s, will retry when healthy", node.Status)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusQueued,
Error: errMsg,
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
})
emitNodeProgress(node, galleryop.NodeStatusQueued, errMsg)
continue
}
@@ -207,33 +167,14 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
xlog.Debug("Failed to clear pending backend op after success", "error", err)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusSuccess,
NodeID: node.ID, NodeName: node.Name, Status: "success",
})
emitNodeProgress(node, galleryop.NodeStatusSuccess, "")
continue
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL - mark unhealthy so the router stops picking it too.
// gone AWOL mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
// Worker-still-installing is a "soft" failure: the worker is most
// likely still pulling the OCI image. Keep the row, push NextRetryAt
// out so the reconciler does not immediately re-fire another install
// while the worker is still busy, and report the in-progress state
// to the caller. The next reconciler pass / backend.list confirms
// the actual outcome.
if errors.Is(applyErr, galleryop.ErrWorkerStillInstalling) {
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpInFlight(ctx, id, errMsg, d.adapter.InstallTimeout())
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusRunningOnWorker, Error: errMsg,
})
emitNodeProgress(node, galleryop.NodeStatusRunningOnWorker, errMsg)
continue
}
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
@@ -242,9 +183,8 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: galleryop.NodeStatusError, Error: errMsg,
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
})
emitNodeProgress(node, galleryop.NodeStatusError, errMsg)
}
return result, nil
}
@@ -286,11 +226,7 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error {
}
ctx := context.Background()
// Empty opID: plain DeleteBackend isn't gallery-tracked the same way as
// Install/Upgrade (no progress dialog), so we skip the per-node sink
// writes here. DeleteBackendDetailed is the HTTP path that surfaces
// per-node results in its own response.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -313,7 +249,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -388,60 +324,9 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
result[b.Name] = entry
}
}
// Proactively clear pending_backend_ops install rows whose intent is now
// satisfied: the backend is reported installed on its target node. Without
// this, the row sits in the queue until next_retry_at expires (up to the
// install timeout, default 15m) and the operator UI shows the install as
// "still installing in background" for that whole window even though the
// worker has actually been ready for minutes. We only clear install rows;
// upgrade and delete rows have presence-based semantics that do NOT match
// backend.list confirmation.
d.clearSatisfiedInstallRows(context.Background(), result)
return result, nil
}
// clearSatisfiedInstallRows removes pending_backend_ops install rows whose
// (nodeID, backend) pair now appears in the cluster-wide backend listing.
// Called by ListBackends after fan-out so the proactive clear sees every
// node's report. Best-effort: a DB failure is logged and the row stays for
// the reconciler to drain via its slower path.
func (d *DistributedBackendManager) clearSatisfiedInstallRows(ctx context.Context, backends gallery.SystemBackends) {
rows, err := d.registry.ListPendingBackendOps(ctx)
if err != nil {
xlog.Debug("clearSatisfiedInstallRows: failed to list pending ops", "error", err)
return
}
if len(rows) == 0 {
return
}
// Build a (nodeID, backend) presence set from the listing.
present := make(map[string]map[string]bool, len(backends))
for name, b := range backends {
for _, ref := range b.Nodes {
if present[ref.NodeID] == nil {
present[ref.NodeID] = make(map[string]bool)
}
present[ref.NodeID][name] = true
}
}
for _, row := range rows {
if row.Op != OpBackendInstall {
continue
}
if !present[row.NodeID][row.Backend] {
continue
}
if err := d.registry.DeletePendingBackendOp(ctx, row.ID); err != nil {
xlog.Debug("clearSatisfiedInstallRows: delete failed",
"id", row.ID, "node", row.NodeID, "backend", row.Backend, "error", err)
continue
}
xlog.Info("Reconciler: pending install row satisfied by backend.list",
"node", row.NodeID, "backend", row.Backend)
}
}
// InstallBackend fans out installation through the pending-ops queue so
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
@@ -460,41 +345,11 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
targetNodeIDs = map[string]bool{op.TargetNodeID: true}
}
result, err := d.enqueueAndDrainBackendOp(ctx, op.ID, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// onProgress fans each BackendInstallProgressEvent into two
// observers: the legacy single-bar progressCb (kept so callers
// that only consume the aggregate view keep working) and the
// per-node sink (so OpStatus.Nodes gets a "downloading" tick
// per file/percentage with node attribution). Defined inside the
// loop so each node captures its own node.Name into the closure.
onProgress := func(ev messaging.BackendInstallProgressEvent) {
if progressCb != nil {
progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage)
}
if d.progressSink != nil && op.ID != "" {
d.progressSink.UpdateNodeProgress(op.ID, ev.NodeID, galleryop.NodeProgress{
NodeID: ev.NodeID,
NodeName: node.Name,
Status: galleryop.NodeStatusDownloading,
FileName: ev.FileName,
Current: ev.Current,
Total: ev.Total,
Percentage: ev.Percentage,
Phase: ev.Phase,
})
}
}
// nil-callback shortcut: when there is nothing to deliver to,
// hand the adapter a nil onProgress so it skips the per-op NATS
// subscription. Matches the pre-Phase-4 bridgeProgressCb semantics.
var onProgressArg func(messaging.BackendInstallProgressEvent)
if progressCb != nil || d.progressSink != nil {
onProgressArg = onProgress
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, onProgressArg)
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
if err != nil {
return err
}
@@ -506,19 +361,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
if err != nil {
return err
}
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// No hard failures, but if at least one node reported running_on_worker,
// surface a wrapped ErrWorkerStillInstalling so galleryop can render a
// yellow in-progress state instead of green success. The reconciler
// will confirm the actual outcome on its next pass via backend.list.
for _, n := range result.Nodes {
if n.Status == galleryop.NodeStatusRunningOnWorker {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
return result.Err()
}
// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
@@ -549,11 +392,7 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
targetNodeIDs[n.NodeID] = true
}
// Empty opID: the caller (galleryop) doesn't thread an op ID into
// UpgradeBackend today, so we can't tag per-node sink writes with the
// right OpStatus key. Until the upgrade path takes a ManagementOp the
// way InstallBackend does, the sink stays no-op here.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
if err != nil {
// Rolling-update fallback: an older worker doesn't know
@@ -578,18 +417,7 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
if err != nil {
return err
}
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// Same in-progress surfacing as InstallBackend: a long-running worker
// upgrade that timed out the NATS round-trip must not be reported as
// green success.
for _, n := range result.Nodes {
if n.Status == galleryop.NodeStatusRunningOnWorker {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
return result.Err()
}
// IsDistributed reports that installs from this manager fan out across the
@@ -615,16 +443,3 @@ func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[stri
// it used to come from the empty frontend filesystem.
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
}
// summarizeRunningOnWorker builds a short human-readable summary of which
// nodes are still installing in the background, for inclusion in the
// wrapped ErrWorkerStillInstalling error.
func summarizeRunningOnWorker(nodes []NodeOpStatus) string {
var names []string
for _, n := range nodes {
if n.Status == galleryop.NodeStatusRunningOnWorker {
names = append(names, n.NodeName)
}
}
return strings.Join(names, ", ")
}

View File

@@ -3,7 +3,6 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"runtime"
"sync"
"time"
@@ -13,7 +12,6 @@ import (
. "github.com/onsi/gomega"
"gorm.io/gorm"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
@@ -24,35 +22,11 @@ import (
// (or error). Used so each fan-out request can simulate a different worker
// outcome without spinning up real NATS.
type scriptedMessagingClient struct {
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
publishes []progressPublishCall
scheduledProgressPublishes []scheduledProgressPublish
subscribes []string
}
// progressPublishCall records a single Publish invocation. The progress
// publisher tests assert on the sequence of BackendInstallProgressEvent
// values written to a per-op subject, so we capture both subject and the
// decoded event. Named to avoid clashing with the simpler `publishCall`
// already defined in unloader_test.go (which stores raw JSON bytes for
// non-progress assertions).
type progressPublishCall struct {
Subject string
Event messaging.BackendInstallProgressEvent
}
// scheduledProgressPublish queues a batch of BackendInstallProgressEvent
// values to be delivered the next time Subscribe is called with the matching
// subject. This lets master-side tests assert that the adapter installs its
// handler BEFORE publishing the install request, by scripting events to be
// delivered as soon as the subscription appears.
type scheduledProgressPublish struct {
subject string
events []messaging.BackendInstallProgressEvent
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
}
// matchedReply lets a test script a canned reply that only fires when the
@@ -124,10 +98,10 @@ func (s *scriptedMessagingClient) scriptReplyMatching(subject string, pred func(
})
}
func (s *scriptedMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
func (s *scriptedMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.calls = append(s.calls, requestCall{Subject: subject, Data: data, Timeout: timeout})
s.calls = append(s.calls, requestCall{Subject: subject, Data: data})
// Predicate-matched replies take precedence over flat scriptReply.
if matchers, ok := s.matchedReplies[subject]; ok {
@@ -161,88 +135,8 @@ func (s *scriptedMessagingClient) Request(subject string, data []byte, timeout t
return nil, &fakeNoRespondersErr{}
}
// Publish records each call so progress-publisher tests can assert on the
// stream of events written to a subject. The real messaging.Client JSON
// encodes the payload before sending, but our publisher hands a typed
// struct directly, so we handle both shapes.
func (s *scriptedMessagingClient) Publish(subject string, data any) error {
s.mu.Lock()
defer s.mu.Unlock()
switch ev := data.(type) {
case messaging.BackendInstallProgressEvent:
s.publishes = append(s.publishes, progressPublishCall{Subject: subject, Event: ev})
case []byte:
var e messaging.BackendInstallProgressEvent
_ = json.Unmarshal(ev, &e)
s.publishes = append(s.publishes, progressPublishCall{Subject: subject, Event: e})
}
return nil
}
// publishCalls returns every BackendInstallProgressEvent that was published
// to `subject`, in order. Lets tests assert on debounce behavior without
// depending on internal Publish timing.
func (s *scriptedMessagingClient) publishCalls(subject string) []messaging.BackendInstallProgressEvent {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]messaging.BackendInstallProgressEvent, 0)
for _, c := range s.publishes {
if c.Subject != subject {
continue
}
out = append(out, c.Event)
}
return out
}
// scheduleProgressPublish queues a set of BackendInstallProgressEvent values
// to be delivered on the next Subscribe call matching the per-op progress
// subject. A short delay before delivery gives the subscriber time to install
// its message handler before the events arrive.
func (s *scriptedMessagingClient) scheduleProgressPublish(nodeID, opID string, events []messaging.BackendInstallProgressEvent) {
s.mu.Lock()
defer s.mu.Unlock()
s.scheduledProgressPublishes = append(s.scheduledProgressPublishes, scheduledProgressPublish{
subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID),
events: events,
})
}
// subscribeCalls returns the subjects on which Subscribe was invoked.
// Used to confirm the master skipped subscription when onProgress was nil.
func (s *scriptedMessagingClient) subscribeCalls() []string {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]string, len(s.subscribes))
copy(out, s.subscribes)
return out
}
func (s *scriptedMessagingClient) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
s.mu.Lock()
s.subscribes = append(s.subscribes, subject)
matched := []scheduledProgressPublish{}
remaining := s.scheduledProgressPublishes[:0]
for _, sp := range s.scheduledProgressPublishes {
if sp.subject == subject {
matched = append(matched, sp)
} else {
remaining = append(remaining, sp)
}
}
s.scheduledProgressPublishes = remaining
s.mu.Unlock()
go func() {
time.Sleep(20 * time.Millisecond)
for _, sp := range matched {
for _, ev := range sp.events {
raw, _ := json.Marshal(ev)
handler(raw)
}
}
}()
func (s *scriptedMessagingClient) Publish(_ string, _ any) error { return nil }
func (s *scriptedMessagingClient) Subscribe(_ string, _ func([]byte)) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (s *scriptedMessagingClient) QueueSubscribe(_ string, _ string, _ func([]byte)) (messaging.Subscription, error) {
@@ -257,43 +151,8 @@ func (s *scriptedMessagingClient) SubscribeReply(_ string, _ func([]byte, func([
func (s *scriptedMessagingClient) IsConnected() bool { return true }
func (s *scriptedMessagingClient) Close() {}
// recordingNodeCall captures a single UpdateNodeProgress invocation so
// per-node OpStatus tests can assert on the sequence of writes the
// DistributedBackendManager fans out into the sink.
type recordingNodeCall struct {
OpID string
NodeID string
Progress galleryop.NodeProgress
}
// recordingProgressSink is a test-only nodeProgressSink that just records
// every call. Used by the per-node OpStatus specs below to assert the
// manager wrote the expected terminal and downloading entries.
type recordingProgressSink struct {
mu sync.Mutex
calls []recordingNodeCall
}
func (r *recordingProgressSink) UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, recordingNodeCall{OpID: opID, NodeID: nodeID, Progress: np})
}
func (r *recordingProgressSink) callsFor(opID, nodeID string) []galleryop.NodeProgress {
r.mu.Lock()
defer r.mu.Unlock()
out := []galleryop.NodeProgress{}
for _, c := range r.calls {
if c.OpID == opID && c.NodeID == nodeID {
out = append(out, c.Progress)
}
}
return out
}
// fakeNoRespondersErr is the unscripted-subject default. It matches
// nats.ErrNoResponders by string only - used when a test forgets to script
// nats.ErrNoResponders by string only used when a test forgets to script
// a node so the failure is loud but doesn't tickle errors.Is(...) sentinel
// paths the test wasn't deliberately exercising. Tests that DO want the
// real sentinel (e.g. to drive the manager's NoResponders fallback) call
@@ -345,7 +204,7 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(err).ToNot(HaveOccurred())
mc = newScriptedMessagingClient()
adapter = NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
adapter = NewRemoteUnloaderAdapter(nil, mc)
mgr = &DistributedBackendManager{
local: stubLocalBackendManager{},
adapter: adapter,
@@ -493,263 +352,6 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(mc.calls).To(BeEmpty())
})
})
Context("when InstallBackend times out on a worker", func() {
It("returns galleryop.ErrWorkerStillInstalling and keeps the queue row with NextRetryAt pushed out", func() {
n := registerHealthyBackend("slow", "10.0.0.1:50051")
// Script a NATS timeout on the install subject. The adapter
// wraps this into galleryop.ErrWorkerStillInstalling, which
// the manager should treat as a soft failure.
mc.scriptErr(messaging.SubjectNodeBackendInstall(n.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
rows, err := registry.ListPendingBackendOps(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(HaveLen(1))
Expect(rows[0].Backend).To(Equal("vllm"))
// The adapter is configured with a 3m install timeout in this
// suite (NewRemoteUnloaderAdapter above). NextRetryAt should
// be ~now+3m; a > now+2m bound is safe-but-tight enough to
// catch the buggy short default (30s exponential backoff).
Expect(rows[0].NextRetryAt).To(BeTemporally(">", time.Now().Add(2*time.Minute)),
"NextRetryAt should be pushed to ~now+installTimeout, not the short default")
})
})
Context("end-to-end: timeout then successful reconcile via backend.list", func() {
It("surfaces the install in ListBackends after the worker finishes", func() {
// Use the same node-registration helper the Task 5 test uses
// so the test fixture is identical to the prior context.
node := registerHealthyBackend("jetson", "10.0.0.2:50051")
// First install attempt: NATS times out. The adapter wraps
// this as galleryop.ErrWorkerStillInstalling and the manager
// keeps the pending_backend_ops row alive with NextRetryAt
// pushed out (asserted in the previous context).
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
rows, listErr := registry.ListPendingBackendOps(ctx)
Expect(listErr).ToNot(HaveOccurred())
Expect(rows).To(HaveLen(1))
// The worker finished installing in the background. Script
// backend.list on the same scriptedMessagingClient so the
// manager's ListBackends fan-out reports the backend.
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
backends, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
Expect(backends).To(HaveKey("vllm"))
Expect(backends["vllm"].Nodes).To(HaveLen(1))
Expect(backends["vllm"].Nodes[0].NodeID).To(Equal(node.ID))
// Phase 1b shipped: ListBackends proactively clears install rows
// whose intent is now satisfied by backend.list confirmation. The
// operator UI clears immediately instead of waiting for the next
// reconciler tick after NextRetryAt.
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(BeEmpty(),
"install row should clear once backend.list confirms presence on the target node")
})
})
Context("ListBackends clears confirmed install rows", func() {
It("deletes the pending_backend_ops install row when the backend is reported installed on its target node", func() {
node := registerHealthyBackend("worker-a", "10.0.0.5:50051")
// Pre-stage: simulate an admin install that timed out at the NATS
// round-trip, leaving an install row in the queue.
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue())
rows, _ := registry.ListPendingBackendOps(ctx)
Expect(rows).To(HaveLen(1))
// Worker finishes installing in the background. backend.list now
// confirms presence; ListBackends should proactively clear the row.
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
backends, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
Expect(backends).To(HaveKey("vllm"))
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(BeEmpty(),
"ListBackends should clear install rows whose intent is now satisfied by backend.list")
})
It("does NOT clear an upgrade row even if the backend is reported installed", func() {
node := registerHealthyBackend("worker-b", "10.0.0.6:50051")
Expect(registry.UpsertPendingBackendOp(ctx, node.ID, "vllm", OpBackendUpgrade, []byte("[]"))).To(Succeed())
mc.scriptReply(messaging.SubjectNodeBackendList(node.ID), messaging.BackendListReply{
Backends: []messaging.NodeBackendInfo{{Name: "vllm"}},
})
_, listErr := mgr.ListBackends()
Expect(listErr).ToNot(HaveOccurred())
rowsAfter, _ := registry.ListPendingBackendOps(ctx)
Expect(rowsAfter).To(HaveLen(1), "upgrade rows must not be cleared by backend.list presence")
})
})
Context("InstallBackend streams progress events to the caller's progressCb", func() {
It("invokes progressCb once per worker-published progress event", func() {
node := registerHealthyBackend("worker-prog", "10.0.0.7:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.7:50051"})
mc.scheduleProgressPublish(node.ID, "op-prog-1", []messaging.BackendInstallProgressEvent{
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100},
})
type tick struct {
FileName, Current, Total string
Percentage float64
}
var (
pcCalls []tick
mu sync.Mutex
)
progressCb := func(file, current, total string, pct float64) {
mu.Lock()
defer mu.Unlock()
pcCalls = append(pcCalls, tick{file, current, total, pct})
}
opVal := op("vllm")
opVal.ID = "op-prog-1"
Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(pcCalls)
}, "1s").Should(Equal(2))
mu.Lock()
defer mu.Unlock()
// The adapter dispatches each progress event to its own goroutine
// (see unloader.go: `go onProgress(ev)`) so two events emitted back
// to back can land at the bridge in either order. Assert the set of
// percentages observed contains both ticks, rather than depending
// on goroutine scheduling for ordering.
pcts := []float64{pcCalls[0].Percentage, pcCalls[1].Percentage}
Expect(pcts).To(ConsistOf(10.0, 100.0))
})
})
Context("InstallBackend tolerates silent (pre-Phase-2) workers", func() {
It("completes successfully even when no progress events are ever published", func() {
node := registerHealthyBackend("worker-silent", "10.0.0.8:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.8:50051"})
// NO scheduleProgressPublish call - silent worker.
var ticks int
var mu sync.Mutex
progressCb := func(file, current, total string, pct float64) {
mu.Lock()
defer mu.Unlock()
ticks++
}
opVal := op("vllm")
opVal.ID = "op-silent-1"
Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed())
Consistently(func() int {
mu.Lock()
defer mu.Unlock()
return ticks
}, "200ms").Should(Equal(0))
})
})
Context("populates per-node OpStatus entries", func() {
var sink *recordingProgressSink
BeforeEach(func() {
// Reconstruct mgr with the recording sink so the new code
// path (per-node OpStatus writes) is exercised. The default
// mgr in the outer BeforeEach has progressSink=nil so the
// pre-existing specs keep verifying the no-sink behavior.
sink = &recordingProgressSink{}
appCfg := &config.ApplicationConfig{}
mgr = NewDistributedBackendManager(appCfg, nil, adapter, registry, sink)
// stubLocalBackendManager mirrors the production behaviour
// where the frontend node rarely has the backend installed
// locally - the NATS fan-out is what these specs verify.
mgr.local = stubLocalBackendManager{}
})
It("emits a success entry for each healthy node visited", func() {
node := registerHealthyBackend("worker-ok", "10.0.0.9:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true, Address: "10.0.0.9:50051"})
opVal := op("vllm")
opVal.ID = "op-node-success"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
calls := sink.callsFor("op-node-success", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal(galleryop.NodeStatusSuccess))
Expect(calls[len(calls)-1].NodeName).To(Equal("worker-ok"))
})
It("emits a running_on_worker entry when NATS times out", func() {
node := registerHealthyBackend("worker-slow", "10.0.0.10:50051")
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
opVal := op("vllm")
opVal.ID = "op-node-slow"
// Soft failure: returns wrapped ErrWorkerStillInstalling.
_ = mgr.InstallBackend(ctx, opVal, nil)
calls := sink.callsFor("op-node-slow", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal(galleryop.NodeStatusRunningOnWorker))
})
It("emits downloading entries from progress events", func() {
node := registerHealthyBackend("worker-dl", "10.0.0.11:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true})
mc.scheduleProgressPublish(node.ID, "op-node-dl", []messaging.BackendInstallProgressEvent{
{OpID: "op-node-dl", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100, Phase: messaging.PhaseDownloading},
})
opVal := op("vllm")
opVal.ID = "op-node-dl"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
Eventually(func() bool {
for _, np := range sink.callsFor("op-node-dl", node.ID) {
if np.Status == galleryop.NodeStatusDownloading && np.Percentage == 100.0 {
return true
}
}
return false
}, "1s").Should(BeTrue())
})
})
})
Describe("UpgradeBackend", func() {

View File

@@ -68,9 +68,9 @@ type ModelScheduler interface {
// ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler.
type ReplicaReconcilerOptions struct {
Registry *NodeRegistry
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
Unloader NodeCommandSender
// Adapter is the NATS sender used to retry pending backend ops. When nil,
// the state-reconciler pending-drain pass is a no-op (single-node mode).
Adapter *RemoteUnloaderAdapter
@@ -78,7 +78,7 @@ type ReplicaReconcilerOptions struct {
// addresses. Matches the worker's token so HealthCheck auth succeeds.
RegistrationToken string
// Prober overrides the default gRPC health probe (used by tests).
Prober ModelProber
Prober ModelProber
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
@@ -191,7 +191,7 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// Pending-op drain for admin install — not a per-replica load.
// Replica 0 is the conventional admin slot. Install is idempotent:
// the worker short-circuits if the backend is already running.
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, "", nil)
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
if err != nil {
applyErr = err
} else if !reply.Success {

View File

@@ -17,24 +17,24 @@ import (
// Workers are generic — they don't have a fixed backend type.
// The SmartRouter dynamically installs backends via NATS backend.install events.
type BackendNode struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
// ReservedVRAM is a soft, in-tick reservation deducted by the scheduler when
// it picks this node to load a model. Workers reset it back to 0 on each
// heartbeat (the worker is the source of truth for actual free VRAM); the
// reservation is only here to keep two scheduling decisions within the
// same heartbeat window from over-committing the same node.
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
// MaxReplicasPerModel caps how many replicas of any one model can run on
// this node concurrently. Default 1 preserves the historical "one
// (node, model)" assumption; set higher (via worker --max-replicas-per-model)
@@ -44,12 +44,12 @@ type BackendNode struct {
// admin override. When true, the worker's CLI value is ignored on
// re-registration so the override survives worker restarts. Cleared
// by an explicit "reset to worker default" action.
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
const (
@@ -79,17 +79,17 @@ const (
// gRPC Address (each replica is a separate worker process on its own port),
// and its own InFlight counter.
type NodeModel struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
@@ -1287,7 +1287,7 @@ func (r *NodeRegistry) UpdateMaxReplicasPerModel(ctx context.Context, nodeID str
res := r.db.WithContext(ctx).Model(&BackendNode{}).
Where("id = ?", nodeID).
Updates(map[string]any{
ColMaxReplicasPerModel: n,
ColMaxReplicasPerModel: n,
"max_replicas_per_model_manually_set": true,
})
if res.Error != nil {
@@ -1460,7 +1460,7 @@ func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backe
NextRetryAt: time.Now(),
}
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
}).Create(&row).Error
}
@@ -1515,27 +1515,6 @@ func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uin
})
}
// RecordPendingBackendOpInFlight is the "soft failure" cousin of
// RecordPendingBackendOpFailure. Used when a NATS install round-trip timed
// out but the worker is still installing in the background. Stores the
// message in LastError and pushes NextRetryAt out by `retryDelay` (typically
// the install timeout) so the reconciler does not immediately re-fire
// another install while the worker is still busy.
//
// Attempts is intentionally NOT incremented: an in-flight timeout is not a
// failed attempt, it is a still-in-progress one. Incrementing it would let a
// genuinely-progressing slow install (e.g. 30 GB CUDA image on Wi-Fi) trip
// the maxPendingBackendOpAttempts cap in the reconciler and dead-letter the
// row while the worker is still legitimately working.
func (r *NodeRegistry) RecordPendingBackendOpInFlight(ctx context.Context, id uint, lastError string, retryDelay time.Duration) error {
return r.db.WithContext(ctx).Model(&PendingBackendOp{}).
Where("id = ?", id).
Updates(map[string]any{
"last_error": lastError,
"next_retry_at": time.Now().Add(retryDelay),
}).Error
}
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
// reconciler tick is 30s so anything shorter would just re-fire immediately.
func backoffForAttempt(attempts int) time.Duration {

View File

@@ -688,7 +688,7 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
v, err, _ := r.installFlight.Do(key, func() (any, error) {
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
if err != nil {
return "", err
}

View File

@@ -330,7 +330,7 @@ type upgradeCall struct {
replica int
}
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int, _ string, _ func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error) {
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) {
// installHook intentionally runs OUTSIDE the mutex: the hook may block
// on a channel and we don't want to serialize concurrent callers,
// which would defeat the singleflight-overlap test.

View File

@@ -2,15 +2,9 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/xlog"
)
@@ -34,7 +28,7 @@ type backendStopRequest struct {
// nats.ErrNoResponders for old workers that don't subscribe to the new
// backend.upgrade subject.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error)
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
@@ -49,33 +43,18 @@ type NodeCommandSender interface {
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
// over NATS for remote nodes.
type RemoteUnloaderAdapter struct {
registry ModelLocator
nats messaging.MessagingClient
installTimeout time.Duration
upgradeTimeout time.Duration
registry ModelLocator
nats messaging.MessagingClient
}
// NewRemoteUnloaderAdapter creates a new adapter. installTimeout and
// upgradeTimeout govern the NATS request-reply deadlines for backend.install
// and backend.upgrade respectively. Use
// DistributedConfig.BackendInstallTimeoutOrDefault() /
// BackendUpgradeTimeoutOrDefault() at construction.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient, installTimeout, upgradeTimeout time.Duration) *RemoteUnloaderAdapter {
// NewRemoteUnloaderAdapter creates a new adapter.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
return &RemoteUnloaderAdapter{
registry: registry,
nats: nats,
installTimeout: installTimeout,
upgradeTimeout: upgradeTimeout,
registry: registry,
nats: nats,
}
}
// InstallTimeout returns the configured backend.install round-trip timeout.
// Used by DistributedBackendManager to push NextRetryAt out by this duration
// when a worker times out replying but is still installing in the background.
func (a *RemoteUnloaderAdapter) InstallTimeout() time.Duration {
return a.installTimeout
}
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
// to stop their backend process via NATS backend.stop event.
// The worker process handles: Free() → kill process.
@@ -108,59 +87,18 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
// is on disk, the worker just spawns a process; only a missing binary
// triggers a full gallery pull.
//
// Timeout: configured via DistributedConfig.BackendInstallTimeoutOrDefault
// (default 15m). Most calls return in under 2 seconds (process already
// running). The 15-minute ceiling covers the cold-binary spawn-after-download
// case on slow links (Jetson Wi-Fi, multi-GB CUDA images) while still
// failing fast enough to surface real worker hangs.
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
// case while still failing fast enough to surface real worker hangs.
//
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead -
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead
// it lives on a different NATS subject so it cannot head-of-line-block
// routine load traffic on the same worker.
func (a *RemoteUnloaderAdapter) InstallBackend(
nodeID, backendType, modelID, galleriesJSON, uri, name, alias string,
replicaIndex int,
opID string,
onProgress func(messaging.BackendInstallProgressEvent),
) (*messaging.BackendInstallReply, error) {
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "opID", opID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
// Subscribe to the per-op progress subject BEFORE publishing the install
// request so we don't miss early events. When onProgress is nil OR opID
// is empty (the reconciler-driven retry path), skip subscription entirely:
// silent installs cost nothing extra.
var sub messaging.Subscription
if onProgress != nil && opID != "" {
progressSubject := messaging.SubjectNodeBackendInstallProgress(nodeID, opID)
s, subErr := a.nats.Subscribe(progressSubject, func(raw []byte) {
var ev messaging.BackendInstallProgressEvent
if err := json.Unmarshal(raw, &ev); err != nil {
xlog.Debug("malformed install progress event", "subject", progressSubject, "error", err)
return
}
// Goroutine guard: a slow onProgress callback must not stall
// the NATS reader thread.
//
// NOTE: events spawn one goroutine each, so ordering at the
// consumer is best-effort. In practice the worker debounces to
// ~250ms which is far larger than goroutine scheduling jitter,
// so reordering is rare. The worker's final Flush() event is
// intended to win as the terminal tick. A future hardening pass
// could add a Seq uint64 field to BackendInstallProgressEvent
// and drop stale-by-seq at the bridge if reordering becomes a
// real UX issue.
go onProgress(ev)
})
if subErr != nil {
xlog.Warn("Failed to subscribe to install progress subject; proceeding without progress streaming",
"subject", progressSubject, "error", subErr)
} else {
sub = s
}
}
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
ModelID: modelID,
BackendGalleries: galleriesJSON,
@@ -168,46 +106,29 @@ func (a *RemoteUnloaderAdapter) InstallBackend(
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
OpID: opID,
}, a.installTimeout)
if sub != nil {
_ = sub.Unsubscribe()
}
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}, 3*time.Minute)
}
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
// The worker stops every live process for this backend, force-reinstalls
// from the gallery (overwriting the on-disk artifact), and replies. The
// next routine InstallBackend call spawns a fresh process with the new
// binary - upgrade itself does not start a process.
// binary upgrade itself does not start a process.
//
// Timeout: configured via DistributedConfig.BackendUpgradeTimeoutOrDefault
// (default 15m). Real-world worst case observed: 8-10 minutes for large
// CUDA-l4t backend images on Jetson over WiFi.
// Timeout: 15 minutes. Real-world worst case observed: 810 minutes for
// large CUDA-l4t backend images on Jetson over WiFi.
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
reply, err := messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, a.upgradeTimeout)
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}, 15*time.Minute)
}
// installWithForceFallback is the rolling-update fallback used by
@@ -220,7 +141,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
BackendGalleries: galleriesJSON,
URI: uri,
@@ -228,12 +149,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: true,
}, a.upgradeTimeout)
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
}
return reply, err
}, 15*time.Minute)
}
// ListBackends queries a worker node for its installed backends via NATS request-reply.
@@ -312,14 +228,3 @@ func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
subject := messaging.SubjectNodeStop(nodeID)
return a.nats.Publish(subject, nil)
}
// isNATSTimeout returns true if err looks like a NATS request-reply timeout.
// nats.ErrTimeout is the canonical sentinel; context.DeadlineExceeded can
// also surface depending on the client's path; we accept both, plus a
// string-match fallback for clients that return a bare error.
func isNATSTimeout(err error) bool {
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
return true
}
return err != nil && strings.Contains(err.Error(), "nats: timeout")
}

View File

@@ -3,16 +3,13 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/nats-io/nats.go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
)
@@ -63,7 +60,6 @@ type publishCall struct {
type requestCall struct {
Subject string
Data []byte
Timeout time.Duration
}
func (f *fakeMessagingClient) Publish(subject string, data any) error {
@@ -97,10 +93,10 @@ func (f *fakeMessagingClient) SubscribeReply(_ string, _ func(data []byte, reply
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
func (f *fakeMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data, Timeout: timeout})
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data})
return f.requestReply, f.requestErr
}
@@ -123,7 +119,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
BeforeEach(func() {
locator = &fakeModelLocator{}
mc = &fakeMessagingClient{}
adapter = NewRemoteUnloaderAdapter(locator, mc, 3*time.Minute, 15*time.Minute)
adapter = NewRemoteUnloaderAdapter(locator, mc)
})
Describe("UnloadRemoteModel", func() {
@@ -158,7 +154,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
}
// Use a messaging client that fails the first Publish call only.
failOnce := &failOnceMessagingClient{inner: mc, failOn: 0}
adapter = NewRemoteUnloaderAdapter(locator, failOnce, 3*time.Minute, 15*time.Minute)
adapter = NewRemoteUnloaderAdapter(locator, failOnce)
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
@@ -263,96 +259,3 @@ func (f *failOnceMessagingClient) Request(subject string, data []byte, timeout t
func (f *failOnceMessagingClient) IsConnected() bool { return true }
func (f *failOnceMessagingClient) Close() {}
var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
It("passes the configured install timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(7 * time.Minute))
})
It("passes the configured upgrade timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendUpgrade("n1"), messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute))
})
})
var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() {
It("wraps nats.ErrTimeout from InstallBackend in galleryop.ErrWorkerStillInstalling", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
})
It("does NOT wrap non-timeout errors", func() {
mc := newScriptedMessagingClient()
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse())
Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue())
})
})
var _ = Describe("RemoteUnloaderAdapter install progress streaming", func() {
It("forwards BackendInstallProgressEvent values into the onProgress callback when the worker publishes them", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
mc.scheduleProgressPublish("n1", "op-abc", []messaging.BackendInstallProgressEvent{
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "500 MB", Total: "1 GB", Percentage: 50},
})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
var (
received []messaging.BackendInstallProgressEvent
mu sync.Mutex
)
onProgress := func(ev messaging.BackendInstallProgressEvent) {
mu.Lock()
defer mu.Unlock()
received = append(received, ev)
}
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "op-abc", onProgress)
Expect(err).ToNot(HaveOccurred())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(received)
}, "1s").Should(Equal(2))
})
It("does NOT subscribe when onProgress is nil (reconciler retry path)", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.subscribeCalls()).To(BeEmpty(),
"reconciler-driven retries must not subscribe to the per-op progress subject")
})
})

View File

@@ -1,8 +1,6 @@
package nodes
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -17,7 +15,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
adapter := NewRemoteUnloaderAdapter(nil, mc)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(reply.Success).To(BeTrue())
@@ -26,7 +24,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
It("returns the underlying error when the subject has no responders", func() {
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
adapter := NewRemoteUnloaderAdapter(nil, mc)
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
Expect(err).To(HaveOccurred())
})

View File

@@ -7,22 +7,14 @@ import (
"os"
"path/filepath"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/xlog"
)
// installProgressDebounce is the leading-edge window the worker uses when
// streaming download progress to the master. 250ms caps wire chatter at
// ~4 events/sec per in-flight install while still surfacing every
// meaningful percentage jump.
const installProgressDebounce = 250 * time.Millisecond
// buildProcessKey is the supervisor's stable identifier for a backend gRPC
// process. It includes the replica index so the same model can run multiple
// processes on a worker simultaneously without colliding on the same map slot
@@ -108,20 +100,6 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest,
}
}
// When the master tagged this install with an OpID, stream the
// gallery download progress back to it on the per-op NATS subject.
// Old masters that omit OpID stay on the silent path so they keep
// working without changes. The publisher releases its mutex before
// every Publish so a slow link never stalls the download loop, and
// the deferred Flush guarantees a terminal-percentage event reaches
// the master even when the install errors out.
var downloadCb func(file, current, total string, percentage float64)
if req.OpID != "" && s.nats != nil {
publisher := nodes.NewDebouncedInstallProgressPublisher(s.nats, s.nodeID, req.OpID, req.Backend, installProgressDebounce)
downloadCb = publisher.OnDownload
defer publisher.Flush()
}
// On upgrade, run the gallery install path even if the binary already
// exists on disk: findBackend would otherwise short-circuit and we'd
// restart the same stale binary. The force flag passed to
@@ -134,14 +112,14 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest,
if req.URI != "" {
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", force)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
} else {
xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", force)
if err := gallery.InstallBackendFromGallery(
context.Background(), galleries, s.systemState, s.ml, req.Backend, downloadCb, force, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, force, s.cfg.RequireBackendIntegrity,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}

View File

@@ -86,8 +86,6 @@ The frontend is a standard LocalAI instance with distributed mode enabled. These
| `--auto-approve-nodes` | `LOCALAI_AUTO_APPROVE_NODES` | `false` | Auto-approve new worker nodes (skip admin approval) |
| `--auth` | `LOCALAI_AUTH` | `false` | **Must be `true`** for distributed mode |
| `--auth-database-url` | `LOCALAI_AUTH_DATABASE_URL` | *(required)* | PostgreSQL connection URL |
| `--backend-install-timeout` | `LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT` | `15m` | How long the frontend waits for a worker to acknowledge a backend install before considering the request stalled. Raise it when workers pull large backend images over slow links. If a worker takes longer than this, the operation shows as "still installing in background" in the admin UI and clears once the worker finishes. |
| `--backend-upgrade-timeout` | `LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT` | `15m` | Same as the install timeout, applied to backend upgrades (force-reinstall). |
### Optional: S3 Object Storage
@@ -105,31 +103,6 @@ When S3 is not configured, model files are transferred directly from the fronten
For high-throughput or very large model files, S3 can be more efficient since it avoids streaming through the frontend.
### Watching Backend Installs
While a worker downloads a backend, the admin **Operations Bar** at the top
of the UI shows real-time progress: current file, downloaded/total bytes,
and percentage. This works the same as single-node mode.
When an install targets more than one worker, an **N nodes** chevron
appears on the operation row. Click it to expand a per-node breakdown,
with one row per worker showing:
- A status pill: **Queued** (gray), **Downloading** (blue), **Worker busy**
(yellow), **Done** (green), or **Failed** (red).
- The file currently being downloaded with current/total bytes and percentage.
- A thin per-node progress bar.
- Any error returned by the worker.
The yellow **Worker busy** pill means the worker took longer than
`--backend-install-timeout` to acknowledge but is most likely still
working in the background. The admin UI clears it as soon as the worker
finishes; no action is required from the operator.
If a worker is running an older LocalAI release that does not report
progress, its row in the breakdown will still show terminal status
(queued / done / failed / worker busy) but no per-file progress.
## Worker Configuration
Workers are started with the `worker` subcommand. Each worker is generic — it doesn't need a backend type at startup:

View File

@@ -225,7 +225,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
// newTestSmartRouter creates a SmartRouter with NATS wired up and a mock
// backend.install handler that always replies success for all registered nodes.
newTestSmartRouter := func(reg *nodes.NodeRegistry, extraOpts ...nodes.SmartRouterOptions) *nodes.SmartRouter {
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC, 3*time.Minute, 15*time.Minute)
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC)
opts := nodes.SmartRouterOptions{
Unloader: unloader,
@@ -395,7 +395,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
Expect(err).ToNot(HaveOccurred())
// Create RemoteUnloaderAdapter and unload model
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
err = unloader.UnloadRemoteModel("old-model")
Expect(err).ToNot(HaveOccurred())

View File

@@ -6,7 +6,6 @@ import (
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
@@ -176,7 +175,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
distMgr := nodes.NewDistributedModelManager(appCfg, ml, adapter)
err = distMgr.DeleteModel("big-model")
@@ -252,8 +251,8 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
err = distMgr.DeleteBackend("my-backend")
Expect(err).ToNot(HaveOccurred())
@@ -299,8 +298,8 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
// Should NOT return an error even though the backend doesn't exist locally
err = distMgr.DeleteBackend("remote-only-backend")

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -57,8 +56,8 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, "", nil)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeTrue())
})
@@ -78,8 +77,8 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, "", nil)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeFalse())
Expect(installReply.Error).To(ContainSubstring("backend not found"))
@@ -104,7 +103,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
// Frontend calls UnloadRemoteModel (triggered by UI "Stop" or WatchDog)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
Expect(adapter.UnloadRemoteModel("whisper-large")).To(Succeed())
Eventually(func() int32 { return stopReceived.Load() }, "5s").Should(Equal(int32(1)))
@@ -134,14 +133,14 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter.UnloadRemoteModel("shared-model")
Eventually(func() int32 { return count.Load() }, "5s").Should(Equal(int32(2)))
})
It("should be no-op for models not on any node", func() {
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
Expect(adapter.UnloadRemoteModel("nonexistent-model")).To(Succeed())
})
})
@@ -162,7 +161,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
Expect(adapter.StopNode(node.ID)).To(Succeed())
Eventually(func() int32 { return stopped.Load() }, "5s").Should(Equal(int32(1)))

View File

@@ -3,7 +3,6 @@ package distributed_test
import (
"context"
"encoding/json"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -79,7 +78,7 @@ var _ = Describe("SmartRouter trackingKey", Label("Distributed"), func() {
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
nodeID = node.ID
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
router = nodes.NewSmartRouter(registry, nodes.SmartRouterOptions{
Unloader: unloader,
})