mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 13:10:23 -04:00
* fix(distributed): cascade-clean stale node_models on drain and filter routing by healthy status Stale node_models rows (state="loaded") were surviving past the healthy state of their owning node, causing /embeddings (and other inference paths) to dispatch to a backend whose process was gone or drained. The downstream symptom in a live cluster was pgvector rejecting inserts with "vector cannot have more than 16000 dimensions (SQLSTATE 54000)" because the misbehaving backend silently returned a malformed (oversized) tensor; the Models page showed the model as "running" without an associated node, like a stale entry, even though the node was no longer visible in the Nodes view. Two changes here, plus a third in a follow-up commit: - MarkDraining now cascade-deletes node_models rows for the affected node, mirroring MarkOffline. Drains are explicit operator actions — the box has been intentionally taken out of rotation — so clearing the rows stops the Models UI from misreporting and prevents the routing layer from picking those rows if scheduling logic is ever relaxed. In-flight requests already hold their gRPC client through Route() and finish normally; the only observable effect is a non-fatal IncrementInFlight warning, acceptable for a drain. MarkUnhealthy is deliberately left status-only: it fires from managers_distributed / reconciler on a single nats.ErrNoResponders with no retry, so a transient NATS hiccup must not nuke every loaded model and force a full reload on recovery. - FindAndLockNodeWithModel's inner JOIN now filters on backend_nodes.status = healthy in addition to node_models.state = loaded. The previous version relied on the second node-fetch step to reject non-healthy nodes, but a concurrent reader could still pick the same stale row in the same window. Belt-and-braces. - DistributedConfig.PerModelHealthCheck renamed to DisablePerModelHealthCheck and inverted at the call site so per-model gRPC probing is on by default. The probe (now made consecutive-miss aware in a follow-up commit) independently health- checks each model's gRPC address and removes stale node_models rows when the backend has crashed even though the worker's node-level heartbeat is still arriving. Migration: the field had no CLI flag, env var binding, or YAML key in tree (only the bare struct field), so there is no user-facing migration. Anything constructing DistributedConfig in code needs to drop the assignment (default now does the right thing) or invert it. Assisted-by: Claude:claude-opus-4-7 go-vet go-test golangci-lint Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(distributed): require consecutive misses before per-model probe removes a row The per-model gRPC probe used to remove a node_models row on a single failed health check. With the per-model probe now on by default, that made any 5-second gRPC blip (network jitter, a long-running request hogging the worker's gRPC server thread, brief GC pause) trigger a full reload of the affected model — too eager for production. Require perModelMissThreshold (3) consecutive failed probes before removal. At the default 15s tick a model must be unreachable for ~45s before reap; a single successful probe in between resets the streak. Per-(node, model, replica) state tracked under a mutex on the monitor. If the removal call itself fails, the miss counter is left in place so the next tick retries rather than starting the streak over. Tests: - removes stale model via per-model health check after consecutive failures (replaces the single-shot expectation) - preserves model row when an intermittent failure is followed by a success (covers the reset-on-success path and verifies the counter reset by failing twice more without crossing threshold) - newTestHealthMonitor initializes the misses map so direct-construct test helpers don't nil-map-panic in the probe path Assisted-by: Claude:claude-opus-4-7 go-vet go-test golangci-lint Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
294 lines
9.8 KiB
Go
294 lines
9.8 KiB
Go
package application
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/services/agents"
|
|
"github.com/mudler/LocalAI/core/services/distributed"
|
|
"github.com/mudler/LocalAI/core/services/jobs"
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/LocalAI/core/services/nodes"
|
|
"github.com/mudler/LocalAI/core/services/storage"
|
|
"github.com/mudler/LocalAI/pkg/sanitize"
|
|
"github.com/mudler/xlog"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// DistributedServices holds all services initialized for distributed mode.
|
|
type DistributedServices struct {
|
|
Nats *messaging.Client
|
|
Store storage.ObjectStore
|
|
Registry *nodes.NodeRegistry
|
|
Router *nodes.SmartRouter
|
|
Health *nodes.HealthMonitor
|
|
Reconciler *nodes.ReplicaReconciler
|
|
JobStore *jobs.JobStore
|
|
Dispatcher *jobs.Dispatcher
|
|
AgentStore *agents.AgentStore
|
|
AgentBridge *agents.EventBridge
|
|
DistStores *distributed.Stores
|
|
FileMgr *storage.FileManager
|
|
FileStager nodes.FileStager
|
|
ModelAdapter *nodes.ModelRouterAdapter
|
|
Unloader *nodes.RemoteUnloaderAdapter
|
|
|
|
shutdownOnce sync.Once
|
|
}
|
|
|
|
// Shutdown stops all distributed services in reverse initialization order.
|
|
// It is safe to call on a nil receiver and is idempotent (uses sync.Once).
|
|
func (ds *DistributedServices) Shutdown() {
|
|
if ds == nil {
|
|
return
|
|
}
|
|
ds.shutdownOnce.Do(func() {
|
|
if ds.Health != nil {
|
|
ds.Health.Stop()
|
|
}
|
|
if ds.Dispatcher != nil {
|
|
ds.Dispatcher.Stop()
|
|
}
|
|
if closer, ok := ds.Store.(io.Closer); ok {
|
|
closer.Close()
|
|
}
|
|
// AgentBridge has no Close method — its NATS subscriptions are cleaned up
|
|
// when the NATS client is closed below.
|
|
if ds.Nats != nil {
|
|
ds.Nats.Close()
|
|
}
|
|
xlog.Info("Distributed services shut down")
|
|
})
|
|
}
|
|
|
|
// initDistributed validates distributed mode prerequisites and initializes
|
|
// NATS, object storage, node registry, and instance identity.
|
|
// Returns nil if distributed mode is not enabled.
|
|
// configLoader is used by the SmartRouter to compute concurrency-group
|
|
// anti-affinity at placement time (#9659); it may be nil in tests.
|
|
func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoader *config.ModelConfigLoader) (*DistributedServices, error) {
|
|
if !cfg.Distributed.Enabled {
|
|
return nil, nil
|
|
}
|
|
|
|
xlog.Info("Distributed mode enabled — validating prerequisites")
|
|
|
|
// Validate distributed config (NATS URL, S3 credential pairing, durations, etc.)
|
|
if err := cfg.Distributed.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Validate PostgreSQL is configured (auth DB must be PostgreSQL for distributed mode)
|
|
if !cfg.Auth.Enabled {
|
|
return nil, fmt.Errorf("distributed mode requires authentication to be enabled (--auth / LOCALAI_AUTH=true)")
|
|
}
|
|
if !isPostgresURL(cfg.Auth.DatabaseURL) {
|
|
return nil, fmt.Errorf("distributed mode requires PostgreSQL for auth database (got %q)", sanitize.URL(cfg.Auth.DatabaseURL))
|
|
}
|
|
|
|
// Generate instance ID if not set
|
|
if cfg.Distributed.InstanceID == "" {
|
|
cfg.Distributed.InstanceID = uuid.New().String()
|
|
}
|
|
xlog.Info("Distributed instance", "id", cfg.Distributed.InstanceID)
|
|
|
|
// Connect to NATS
|
|
natsClient, err := messaging.New(cfg.Distributed.NatsURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to NATS: %w", err)
|
|
}
|
|
xlog.Info("Connected to NATS", "url", sanitize.URL(cfg.Distributed.NatsURL))
|
|
|
|
// Ensure NATS is closed if any subsequent initialization step fails.
|
|
success := false
|
|
defer func() {
|
|
if !success {
|
|
natsClient.Close()
|
|
}
|
|
}()
|
|
|
|
// Initialize object storage
|
|
var store storage.ObjectStore
|
|
if cfg.Distributed.StorageURL != "" {
|
|
if cfg.Distributed.StorageBucket == "" {
|
|
return nil, fmt.Errorf("distributed storage bucket must be set when storage URL is configured")
|
|
}
|
|
s3Store, err := storage.NewS3Store(context.Background(), storage.S3Config{
|
|
Endpoint: cfg.Distributed.StorageURL,
|
|
Region: cfg.Distributed.StorageRegion,
|
|
Bucket: cfg.Distributed.StorageBucket,
|
|
AccessKeyID: cfg.Distributed.StorageAccessKey,
|
|
SecretAccessKey: cfg.Distributed.StorageSecretKey,
|
|
ForcePathStyle: true, // required for MinIO
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing S3 storage: %w", err)
|
|
}
|
|
xlog.Info("Object storage initialized (S3)", "endpoint", cfg.Distributed.StorageURL, "bucket", cfg.Distributed.StorageBucket)
|
|
store = s3Store
|
|
} else {
|
|
// Fallback to filesystem storage in distributed mode (useful for single-node testing)
|
|
fsStore, err := storage.NewFilesystemStore(cfg.DataPath + "/objectstore")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing filesystem storage: %w", err)
|
|
}
|
|
xlog.Info("Object storage initialized (filesystem fallback)", "path", cfg.DataPath+"/objectstore")
|
|
store = fsStore
|
|
}
|
|
|
|
// Initialize node registry (requires the auth DB which is PostgreSQL)
|
|
if authDB == nil {
|
|
return nil, fmt.Errorf("distributed mode requires auth database to be initialized first")
|
|
}
|
|
|
|
registry, err := nodes.NewNodeRegistry(authDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing node registry: %w", err)
|
|
}
|
|
xlog.Info("Node registry initialized")
|
|
|
|
// Collect SmartRouter option values; the router itself is created after all
|
|
// dependencies (including FileStager and Unloader) are ready.
|
|
var routerAuthToken string
|
|
if cfg.Distributed.RegistrationToken != "" {
|
|
routerAuthToken = cfg.Distributed.RegistrationToken
|
|
}
|
|
var routerGalleriesJSON string
|
|
if galleriesJSON, err := json.Marshal(cfg.BackendGalleries); err == nil {
|
|
routerGalleriesJSON = string(galleriesJSON)
|
|
}
|
|
|
|
healthMon := nodes.NewHealthMonitor(registry, authDB,
|
|
cfg.Distributed.HealthCheckIntervalOrDefault(),
|
|
cfg.Distributed.StaleNodeThresholdOrDefault(),
|
|
routerAuthToken,
|
|
!cfg.Distributed.DisablePerModelHealthCheck,
|
|
)
|
|
|
|
// Initialize job store
|
|
jobStore, err := jobs.NewJobStore(authDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing job store: %w", err)
|
|
}
|
|
xlog.Info("Distributed job store initialized")
|
|
|
|
// Initialize job dispatcher
|
|
dispatcher := jobs.NewDispatcher(jobStore, natsClient, authDB, cfg.Distributed.InstanceID, cfg.Distributed.JobWorkerConcurrency)
|
|
|
|
// Initialize agent store
|
|
agentStore, err := agents.NewAgentStore(authDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing agent store: %w", err)
|
|
}
|
|
xlog.Info("Distributed agent store initialized")
|
|
|
|
// Initialize agent event bridge
|
|
agentBridge := agents.NewEventBridge(natsClient, agentStore, cfg.Distributed.InstanceID)
|
|
|
|
// Start observable persister — captures observable_update events from workers
|
|
// (which have no DB access) and persists them to PostgreSQL.
|
|
if err := agentBridge.StartObservablePersister(); err != nil {
|
|
xlog.Warn("Failed to start observable persister", "error", err)
|
|
} else {
|
|
xlog.Info("Observable persister started")
|
|
}
|
|
|
|
// Initialize Phase 4 stores (MCP, Gallery, FineTune, Skills)
|
|
distStores, err := distributed.InitStores(authDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing distributed stores: %w", err)
|
|
}
|
|
|
|
// Initialize file manager with local cache
|
|
cacheDir := cfg.DataPath + "/cache"
|
|
fileMgr, err := storage.NewFileManager(store, cacheDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing file manager: %w", err)
|
|
}
|
|
xlog.Info("File manager initialized", "cacheDir", cacheDir)
|
|
|
|
// Create FileStager for distributed file transfer
|
|
var fileStager nodes.FileStager
|
|
if cfg.Distributed.StorageURL != "" {
|
|
fileStager = nodes.NewS3NATSFileStager(fileMgr, natsClient)
|
|
xlog.Info("File stager initialized (S3+NATS)")
|
|
} else {
|
|
fileStager = nodes.NewHTTPFileStager(func(nodeID string) (string, error) {
|
|
node, err := registry.Get(context.Background(), nodeID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if node.HTTPAddress == "" {
|
|
return "", fmt.Errorf("node %s has no HTTP address for file transfer", nodeID)
|
|
}
|
|
return node.HTTPAddress, nil
|
|
}, cfg.Distributed.RegistrationToken)
|
|
xlog.Info("File stager initialized (HTTP direct transfer)")
|
|
}
|
|
// Create RemoteUnloaderAdapter — needed by SmartRouter and startup.go
|
|
remoteUnloader := nodes.NewRemoteUnloaderAdapter(registry, natsClient)
|
|
|
|
// All dependencies ready — build SmartRouter with all options at once
|
|
var conflictResolver nodes.ConcurrencyConflictResolver
|
|
if configLoader != nil {
|
|
conflictResolver = configLoader
|
|
}
|
|
router := nodes.NewSmartRouter(registry, nodes.SmartRouterOptions{
|
|
Unloader: remoteUnloader,
|
|
FileStager: fileStager,
|
|
GalleriesJSON: routerGalleriesJSON,
|
|
AuthToken: routerAuthToken,
|
|
DB: authDB,
|
|
ConflictResolver: conflictResolver,
|
|
})
|
|
|
|
// Create ReplicaReconciler for auto-scaling model replicas. Adapter +
|
|
// RegistrationToken feed the state-reconciliation passes: pending op
|
|
// drain uses the adapter, and model health probes use the token to auth
|
|
// against workers' gRPC HealthCheck.
|
|
reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{
|
|
Registry: registry,
|
|
Scheduler: router,
|
|
Unloader: remoteUnloader,
|
|
Adapter: remoteUnloader,
|
|
RegistrationToken: cfg.Distributed.RegistrationToken,
|
|
DB: authDB,
|
|
Interval: 30 * time.Second,
|
|
ScaleDownDelay: 5 * time.Minute,
|
|
ProbeStaleAfter: 2 * time.Minute,
|
|
})
|
|
|
|
// Create ModelRouterAdapter to wire into ModelLoader
|
|
modelAdapter := nodes.NewModelRouterAdapter(router)
|
|
|
|
success = true
|
|
return &DistributedServices{
|
|
Nats: natsClient,
|
|
Store: store,
|
|
Registry: registry,
|
|
Router: router,
|
|
Health: healthMon,
|
|
Reconciler: reconciler,
|
|
JobStore: jobStore,
|
|
Dispatcher: dispatcher,
|
|
AgentStore: agentStore,
|
|
AgentBridge: agentBridge,
|
|
DistStores: distStores,
|
|
FileMgr: fileMgr,
|
|
FileStager: fileStager,
|
|
ModelAdapter: modelAdapter,
|
|
Unloader: remoteUnloader,
|
|
}, nil
|
|
}
|
|
|
|
func isPostgresURL(url string) bool {
|
|
return strings.HasPrefix(url, "postgres://") || strings.HasPrefix(url, "postgresql://")
|
|
}
|