Files
LocalAI/core/application/distributed.go
LocalAI [bot] b4fdb41dcc fix(distributed): cascade-clean stale node_models rows + filter routing by healthy status (#9754)
* fix(distributed): cascade-clean stale node_models on drain and filter routing by healthy status

Stale node_models rows (state="loaded") were surviving past the healthy
state of their owning node, causing /embeddings (and other inference
paths) to dispatch to a backend whose process was gone or drained. The
downstream symptom in a live cluster was pgvector rejecting inserts
with "vector cannot have more than 16000 dimensions (SQLSTATE 54000)"
because the misbehaving backend silently returned a malformed
(oversized) tensor; the Models page showed the model as "running"
without an associated node, like a stale entry, even though the node
was no longer visible in the Nodes view.

Two changes here, plus a third in a follow-up commit:

- MarkDraining now cascade-deletes node_models rows for the affected
  node, mirroring MarkOffline. Drains are explicit operator actions —
  the box has been intentionally taken out of rotation — so clearing
  the rows stops the Models UI from misreporting and prevents the
  routing layer from picking those rows if scheduling logic is ever
  relaxed. In-flight requests already hold their gRPC client through
  Route() and finish normally; the only observable effect is a
  non-fatal IncrementInFlight warning, acceptable for a drain.

  MarkUnhealthy is deliberately left status-only: it fires from
  managers_distributed / reconciler on a single nats.ErrNoResponders
  with no retry, so a transient NATS hiccup must not nuke every loaded
  model and force a full reload on recovery.

- FindAndLockNodeWithModel's inner JOIN now filters on
  backend_nodes.status = healthy in addition to node_models.state =
  loaded. The previous version relied on the second node-fetch step to
  reject non-healthy nodes, but a concurrent reader could still pick
  the same stale row in the same window. Belt-and-braces.

- DistributedConfig.PerModelHealthCheck renamed to
  DisablePerModelHealthCheck and inverted at the call site so
  per-model gRPC probing is on by default. The probe (now made
  consecutive-miss aware in a follow-up commit) independently health-
  checks each model's gRPC address and removes stale node_models rows
  when the backend has crashed even though the worker's node-level
  heartbeat is still arriving.

  Migration: the field had no CLI flag, env var binding, or YAML key
  in tree (only the bare struct field), so there is no user-facing
  migration. Anything constructing DistributedConfig in code needs to
  drop the assignment (default now does the right thing) or invert it.

Assisted-by: Claude:claude-opus-4-7 go-vet go-test golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(distributed): require consecutive misses before per-model probe removes a row

The per-model gRPC probe used to remove a node_models row on a single
failed health check. With the per-model probe now on by default, that
made any 5-second gRPC blip (network jitter, a long-running request
hogging the worker's gRPC server thread, brief GC pause) trigger a
full reload of the affected model — too eager for production.

Require perModelMissThreshold (3) consecutive failed probes before
removal. At the default 15s tick a model must be unreachable for ~45s
before reap; a single successful probe in between resets the streak.
Per-(node, model, replica) state tracked under a mutex on the monitor.

If the removal call itself fails, the miss counter is left in place
so the next tick retries rather than starting the streak over.

Tests:
- removes stale model via per-model health check after consecutive
  failures (replaces the single-shot expectation)
- preserves model row when an intermittent failure is followed by a
  success (covers the reset-on-success path and verifies the counter
  reset by failing twice more without crossing threshold)
- newTestHealthMonitor initializes the misses map so direct-construct
  test helpers don't nil-map-panic in the probe path

Assisted-by: Claude:claude-opus-4-7 go-vet go-test golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-13 21:57:50 +02:00

294 lines
9.8 KiB
Go

package application
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/agents"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/sanitize"
"github.com/mudler/xlog"
"gorm.io/gorm"
)
// DistributedServices holds all services initialized for distributed mode.
type DistributedServices struct {
Nats *messaging.Client
Store storage.ObjectStore
Registry *nodes.NodeRegistry
Router *nodes.SmartRouter
Health *nodes.HealthMonitor
Reconciler *nodes.ReplicaReconciler
JobStore *jobs.JobStore
Dispatcher *jobs.Dispatcher
AgentStore *agents.AgentStore
AgentBridge *agents.EventBridge
DistStores *distributed.Stores
FileMgr *storage.FileManager
FileStager nodes.FileStager
ModelAdapter *nodes.ModelRouterAdapter
Unloader *nodes.RemoteUnloaderAdapter
shutdownOnce sync.Once
}
// Shutdown stops all distributed services in reverse initialization order.
// It is safe to call on a nil receiver and is idempotent (uses sync.Once).
func (ds *DistributedServices) Shutdown() {
if ds == nil {
return
}
ds.shutdownOnce.Do(func() {
if ds.Health != nil {
ds.Health.Stop()
}
if ds.Dispatcher != nil {
ds.Dispatcher.Stop()
}
if closer, ok := ds.Store.(io.Closer); ok {
closer.Close()
}
// AgentBridge has no Close method — its NATS subscriptions are cleaned up
// when the NATS client is closed below.
if ds.Nats != nil {
ds.Nats.Close()
}
xlog.Info("Distributed services shut down")
})
}
// initDistributed validates distributed mode prerequisites and initializes
// NATS, object storage, node registry, and instance identity.
// Returns nil if distributed mode is not enabled.
// configLoader is used by the SmartRouter to compute concurrency-group
// anti-affinity at placement time (#9659); it may be nil in tests.
func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoader *config.ModelConfigLoader) (*DistributedServices, error) {
if !cfg.Distributed.Enabled {
return nil, nil
}
xlog.Info("Distributed mode enabled — validating prerequisites")
// Validate distributed config (NATS URL, S3 credential pairing, durations, etc.)
if err := cfg.Distributed.Validate(); err != nil {
return nil, err
}
// Validate PostgreSQL is configured (auth DB must be PostgreSQL for distributed mode)
if !cfg.Auth.Enabled {
return nil, fmt.Errorf("distributed mode requires authentication to be enabled (--auth / LOCALAI_AUTH=true)")
}
if !isPostgresURL(cfg.Auth.DatabaseURL) {
return nil, fmt.Errorf("distributed mode requires PostgreSQL for auth database (got %q)", sanitize.URL(cfg.Auth.DatabaseURL))
}
// Generate instance ID if not set
if cfg.Distributed.InstanceID == "" {
cfg.Distributed.InstanceID = uuid.New().String()
}
xlog.Info("Distributed instance", "id", cfg.Distributed.InstanceID)
// Connect to NATS
natsClient, err := messaging.New(cfg.Distributed.NatsURL)
if err != nil {
return nil, fmt.Errorf("connecting to NATS: %w", err)
}
xlog.Info("Connected to NATS", "url", sanitize.URL(cfg.Distributed.NatsURL))
// Ensure NATS is closed if any subsequent initialization step fails.
success := false
defer func() {
if !success {
natsClient.Close()
}
}()
// Initialize object storage
var store storage.ObjectStore
if cfg.Distributed.StorageURL != "" {
if cfg.Distributed.StorageBucket == "" {
return nil, fmt.Errorf("distributed storage bucket must be set when storage URL is configured")
}
s3Store, err := storage.NewS3Store(context.Background(), storage.S3Config{
Endpoint: cfg.Distributed.StorageURL,
Region: cfg.Distributed.StorageRegion,
Bucket: cfg.Distributed.StorageBucket,
AccessKeyID: cfg.Distributed.StorageAccessKey,
SecretAccessKey: cfg.Distributed.StorageSecretKey,
ForcePathStyle: true, // required for MinIO
})
if err != nil {
return nil, fmt.Errorf("initializing S3 storage: %w", err)
}
xlog.Info("Object storage initialized (S3)", "endpoint", cfg.Distributed.StorageURL, "bucket", cfg.Distributed.StorageBucket)
store = s3Store
} else {
// Fallback to filesystem storage in distributed mode (useful for single-node testing)
fsStore, err := storage.NewFilesystemStore(cfg.DataPath + "/objectstore")
if err != nil {
return nil, fmt.Errorf("initializing filesystem storage: %w", err)
}
xlog.Info("Object storage initialized (filesystem fallback)", "path", cfg.DataPath+"/objectstore")
store = fsStore
}
// Initialize node registry (requires the auth DB which is PostgreSQL)
if authDB == nil {
return nil, fmt.Errorf("distributed mode requires auth database to be initialized first")
}
registry, err := nodes.NewNodeRegistry(authDB)
if err != nil {
return nil, fmt.Errorf("initializing node registry: %w", err)
}
xlog.Info("Node registry initialized")
// Collect SmartRouter option values; the router itself is created after all
// dependencies (including FileStager and Unloader) are ready.
var routerAuthToken string
if cfg.Distributed.RegistrationToken != "" {
routerAuthToken = cfg.Distributed.RegistrationToken
}
var routerGalleriesJSON string
if galleriesJSON, err := json.Marshal(cfg.BackendGalleries); err == nil {
routerGalleriesJSON = string(galleriesJSON)
}
healthMon := nodes.NewHealthMonitor(registry, authDB,
cfg.Distributed.HealthCheckIntervalOrDefault(),
cfg.Distributed.StaleNodeThresholdOrDefault(),
routerAuthToken,
!cfg.Distributed.DisablePerModelHealthCheck,
)
// Initialize job store
jobStore, err := jobs.NewJobStore(authDB)
if err != nil {
return nil, fmt.Errorf("initializing job store: %w", err)
}
xlog.Info("Distributed job store initialized")
// Initialize job dispatcher
dispatcher := jobs.NewDispatcher(jobStore, natsClient, authDB, cfg.Distributed.InstanceID, cfg.Distributed.JobWorkerConcurrency)
// Initialize agent store
agentStore, err := agents.NewAgentStore(authDB)
if err != nil {
return nil, fmt.Errorf("initializing agent store: %w", err)
}
xlog.Info("Distributed agent store initialized")
// Initialize agent event bridge
agentBridge := agents.NewEventBridge(natsClient, agentStore, cfg.Distributed.InstanceID)
// Start observable persister — captures observable_update events from workers
// (which have no DB access) and persists them to PostgreSQL.
if err := agentBridge.StartObservablePersister(); err != nil {
xlog.Warn("Failed to start observable persister", "error", err)
} else {
xlog.Info("Observable persister started")
}
// Initialize Phase 4 stores (MCP, Gallery, FineTune, Skills)
distStores, err := distributed.InitStores(authDB)
if err != nil {
return nil, fmt.Errorf("initializing distributed stores: %w", err)
}
// Initialize file manager with local cache
cacheDir := cfg.DataPath + "/cache"
fileMgr, err := storage.NewFileManager(store, cacheDir)
if err != nil {
return nil, fmt.Errorf("initializing file manager: %w", err)
}
xlog.Info("File manager initialized", "cacheDir", cacheDir)
// Create FileStager for distributed file transfer
var fileStager nodes.FileStager
if cfg.Distributed.StorageURL != "" {
fileStager = nodes.NewS3NATSFileStager(fileMgr, natsClient)
xlog.Info("File stager initialized (S3+NATS)")
} else {
fileStager = nodes.NewHTTPFileStager(func(nodeID string) (string, error) {
node, err := registry.Get(context.Background(), nodeID)
if err != nil {
return "", err
}
if node.HTTPAddress == "" {
return "", fmt.Errorf("node %s has no HTTP address for file transfer", nodeID)
}
return node.HTTPAddress, nil
}, cfg.Distributed.RegistrationToken)
xlog.Info("File stager initialized (HTTP direct transfer)")
}
// Create RemoteUnloaderAdapter — needed by SmartRouter and startup.go
remoteUnloader := nodes.NewRemoteUnloaderAdapter(registry, natsClient)
// All dependencies ready — build SmartRouter with all options at once
var conflictResolver nodes.ConcurrencyConflictResolver
if configLoader != nil {
conflictResolver = configLoader
}
router := nodes.NewSmartRouter(registry, nodes.SmartRouterOptions{
Unloader: remoteUnloader,
FileStager: fileStager,
GalleriesJSON: routerGalleriesJSON,
AuthToken: routerAuthToken,
DB: authDB,
ConflictResolver: conflictResolver,
})
// Create ReplicaReconciler for auto-scaling model replicas. Adapter +
// RegistrationToken feed the state-reconciliation passes: pending op
// drain uses the adapter, and model health probes use the token to auth
// against workers' gRPC HealthCheck.
reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
Adapter: remoteUnloader,
RegistrationToken: cfg.Distributed.RegistrationToken,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
ProbeStaleAfter: 2 * time.Minute,
})
// Create ModelRouterAdapter to wire into ModelLoader
modelAdapter := nodes.NewModelRouterAdapter(router)
success = true
return &DistributedServices{
Nats: natsClient,
Store: store,
Registry: registry,
Router: router,
Health: healthMon,
Reconciler: reconciler,
JobStore: jobStore,
Dispatcher: dispatcher,
AgentStore: agentStore,
AgentBridge: agentBridge,
DistStores: distStores,
FileMgr: fileMgr,
FileStager: fileStager,
ModelAdapter: modelAdapter,
Unloader: remoteUnloader,
}, nil
}
func isPostgresURL(url string) bool {
return strings.HasPrefix(url, "postgres://") || strings.HasPrefix(url, "postgresql://")
}