mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 04:56:52 -04:00
* feat(concurrency-groups): per-model exclusive groups for backend loading Adds `concurrency_groups: [...]` to model YAML configs. Two models that share a group cannot be loaded concurrently on the same node — loading one evicts the others, reusing the existing pinned/busy/retry policy from LRU eviction. Layered design: - Watchdog (pkg/model): per-node correctness floor — on every Load(), evict any loaded model that shares a group with the requested one. Pinned skips surface NeedMore so the loader retries (and ultimately logs a clear warning), instead of silently allowing the rule to be violated. - Distributed scheduler (core/services/nodes): soft anti-affinity hint — scheduleNewModel prefers nodes that don't already host a same-group model, falling back to eviction only if every candidate has a conflict. Composes with NodeSelector at the same point in the candidate pipeline. Per-node, not cluster-wide: VRAM is a node-local resource, and two heavy models running on different nodes is fine. The ConfigLoader is wired into SmartRouter via a small ConcurrencyConflictResolver interface so the nodes package keeps a narrow surface on core/config. Refactors the inner LRU eviction body into a shared collectEvictionsLocked helper and the loader retry loop into retryEnforce(fn, maxRetries, interval), so both LRU and group enforcement share busy/pinned/retry semantics. Closes #9659. Assisted-by: Claude:claude-opus-4-7 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(watchdog): sync pinned + concurrency_groups at startup The startup-time watchdog setup lives in initializeWatchdog (startup.go), not in startWatchdog (watchdog.go). The latter is only invoked from the runtime-settings RestartWatchdog path. As a result, neither SyncPinnedModelsToWatchdog nor SyncModelGroupsToWatchdog ran at boot, so `pinned: true` and `concurrency_groups: [...]` only became effective after a settings-driven watchdog restart. Fix by adding both sync calls to initializeWatchdog. Confirmed end-to-end: loading model A in group "heavy", then C with no group (coexists), then B in group "heavy" now correctly evicts A and leaves [B, C]. Assisted-by: Claude:claude-opus-4-7 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(test): satisfy errcheck on new os.Remove in concurrency_groups spec CI lint runs new-from-merge-base, so the existing pre-existing `defer os.Remove(tmp.Name())` lines are baseline-grandfathered but the one introduced by the concurrency_groups YAML round-trip test is held to errcheck. Wrap the remove in a closure that discards the error. Assisted-by: Claude:claude-opus-4-7 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
768 lines
30 KiB
Go
768 lines
30 KiB
Go
package application
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/backend"
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/http/auth"
|
|
"github.com/mudler/LocalAI/core/services/galleryop"
|
|
"github.com/mudler/LocalAI/core/services/jobs"
|
|
"github.com/mudler/LocalAI/core/services/nodes"
|
|
"github.com/mudler/LocalAI/core/services/storage"
|
|
coreStartup "github.com/mudler/LocalAI/core/startup"
|
|
"github.com/mudler/LocalAI/internal"
|
|
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/LocalAI/pkg/sanitize"
|
|
"github.com/mudler/LocalAI/pkg/xsysinfo"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
func New(opts ...config.AppOption) (*Application, error) {
|
|
options := config.NewApplicationConfig(opts...)
|
|
|
|
// Store a copy of the startup config (from env vars, before file loading)
|
|
// This is used to determine if settings came from env vars vs file
|
|
startupConfigCopy := *options
|
|
application := newApplication(options)
|
|
application.startupConfig = &startupConfigCopy
|
|
|
|
xlog.Info("Starting LocalAI", "threads", options.Threads, "modelsPath", options.SystemState.Model.ModelsPath)
|
|
xlog.Info("LocalAI version", "version", internal.PrintableVersion())
|
|
|
|
if err := application.start(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
caps, err := xsysinfo.CPUCapabilities()
|
|
if err == nil {
|
|
xlog.Debug("CPU capabilities", "capabilities", caps)
|
|
|
|
}
|
|
gpus, err := xsysinfo.GPUs()
|
|
if err == nil {
|
|
xlog.Debug("GPU count", "count", len(gpus))
|
|
for _, gpu := range gpus {
|
|
xlog.Debug("GPU", "gpu", gpu.String())
|
|
}
|
|
}
|
|
|
|
// Make sure directories exists
|
|
if options.SystemState.Model.ModelsPath == "" {
|
|
return nil, fmt.Errorf("models path cannot be empty")
|
|
}
|
|
|
|
err = os.MkdirAll(options.SystemState.Model.ModelsPath, 0750)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create ModelPath: %q", err)
|
|
}
|
|
if options.GeneratedContentDir != "" {
|
|
err := os.MkdirAll(options.GeneratedContentDir, 0750)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create ImageDir: %q", err)
|
|
}
|
|
}
|
|
if options.UploadDir != "" {
|
|
err := os.MkdirAll(options.UploadDir, 0750)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create UploadDir: %q", err)
|
|
}
|
|
}
|
|
|
|
// Create and migrate data directory
|
|
if options.DataPath != "" {
|
|
if err := os.MkdirAll(options.DataPath, 0750); err != nil {
|
|
return nil, fmt.Errorf("unable to create DataPath: %q", err)
|
|
}
|
|
// Migrate data from DynamicConfigsDir to DataPath if needed
|
|
if options.DynamicConfigsDir != "" && options.DataPath != options.DynamicConfigsDir {
|
|
migrateDataFiles(options.DynamicConfigsDir, options.DataPath)
|
|
}
|
|
}
|
|
|
|
// Initialize auth database if auth is enabled
|
|
if options.Auth.Enabled {
|
|
// Auto-generate HMAC secret if not provided
|
|
if options.Auth.APIKeyHMACSecret == "" {
|
|
secretFile := filepath.Join(options.DataPath, ".hmac_secret")
|
|
secret, err := loadOrGenerateHMACSecret(secretFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize HMAC secret: %w", err)
|
|
}
|
|
options.Auth.APIKeyHMACSecret = secret
|
|
}
|
|
|
|
authDB, err := auth.InitDB(options.Auth.DatabaseURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize auth database: %w", err)
|
|
}
|
|
application.authDB = authDB
|
|
xlog.Info("Auth enabled", "database", sanitize.URL(options.Auth.DatabaseURL))
|
|
|
|
// Start session and expired API key cleanup goroutine
|
|
go func() {
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-options.Context.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := auth.CleanExpiredSessions(authDB); err != nil {
|
|
xlog.Error("failed to clean expired sessions", "error", err)
|
|
}
|
|
if err := auth.CleanExpiredAPIKeys(authDB); err != nil {
|
|
xlog.Error("failed to clean expired API keys", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Wire JobStore for DB-backed task/job persistence whenever auth DB is available.
|
|
// This ensures tasks and jobs survive restarts in both single-node and distributed modes.
|
|
if application.authDB != nil && application.agentJobService != nil {
|
|
dbJobStore, err := jobs.NewJobStore(application.authDB)
|
|
if err != nil {
|
|
xlog.Error("Failed to create job store for auth DB", "error", err)
|
|
} else {
|
|
application.agentJobService.SetDistributedJobStore(dbJobStore)
|
|
}
|
|
}
|
|
|
|
// Initialize distributed mode services (NATS, object storage, node registry)
|
|
distSvc, err := initDistributed(options, application.authDB, application.ModelConfigLoader())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("distributed mode initialization failed: %w", err)
|
|
}
|
|
if distSvc != nil {
|
|
application.distributed = distSvc
|
|
// Wire remote model unloader so ShutdownModel works for remote nodes
|
|
// Uses NATS to tell serve-backend nodes to Free + kill their backend process
|
|
application.modelLoader.SetRemoteUnloader(distSvc.Unloader)
|
|
// Wire ModelRouter so grpcModel() delegates to SmartRouter in distributed mode
|
|
application.modelLoader.SetModelRouter(distSvc.ModelAdapter.AsModelRouter())
|
|
// Wire DistributedModelStore so shutdown/list/watchdog can find remote models
|
|
distStore := nodes.NewDistributedModelStore(
|
|
model.NewInMemoryModelStore(),
|
|
distSvc.Registry,
|
|
)
|
|
application.modelLoader.SetModelStore(distStore)
|
|
// Start health monitor
|
|
distSvc.Health.Start(options.Context)
|
|
// Start replica reconciler for auto-scaling model replicas
|
|
if distSvc.Reconciler != nil {
|
|
go distSvc.Reconciler.Run(options.Context)
|
|
}
|
|
// In distributed mode, MCP CI jobs are executed by agent workers (not the frontend)
|
|
// because the frontend can't create MCP sessions (e.g., stdio servers using docker).
|
|
// The dispatcher still subscribes to jobs.new for persistence (result/progress subs)
|
|
// but does NOT set a workerFn — agent workers consume jobs from the same NATS queue.
|
|
|
|
// Wire model config loader so job events include model config for agent workers
|
|
distSvc.Dispatcher.SetModelConfigLoader(application.backendLoader)
|
|
|
|
// Start job dispatcher — abort startup if it fails, as jobs would be accepted but never dispatched
|
|
if err := distSvc.Dispatcher.Start(options.Context); err != nil {
|
|
return nil, fmt.Errorf("starting job dispatcher: %w", err)
|
|
}
|
|
// Start ephemeral file cleanup
|
|
storage.StartEphemeralCleanup(options.Context, distSvc.FileMgr, 0, 0)
|
|
// Wire distributed backends into AgentJobService (before Start)
|
|
if application.agentJobService != nil {
|
|
application.agentJobService.SetDistributedBackends(distSvc.Dispatcher)
|
|
application.agentJobService.SetDistributedJobStore(distSvc.JobStore)
|
|
}
|
|
// Wire skill store into AgentPoolService (wired at pool start time via closure)
|
|
// The actual wiring happens in StartAgentPool since the pool doesn't exist yet.
|
|
|
|
// Wire NATS and gallery store into GalleryService for cross-instance progress/cancel
|
|
if application.galleryService != nil {
|
|
application.galleryService.SetNATSClient(distSvc.Nats)
|
|
if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil {
|
|
// Clean up stale in-progress operations from previous crashed instances
|
|
if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
|
|
xlog.Warn("Failed to clean stale gallery operations", "error", err)
|
|
}
|
|
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
|
|
}
|
|
// Wire distributed model/backend managers so delete propagates to workers
|
|
application.galleryService.SetModelManager(
|
|
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),
|
|
)
|
|
application.galleryService.SetBackendManager(
|
|
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Start AgentJobService (after distributed wiring so it knows whether to use local or NATS)
|
|
if application.agentJobService != nil {
|
|
if err := application.agentJobService.Start(options.Context); err != nil {
|
|
return nil, fmt.Errorf("starting agent job service: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := coreStartup.InstallModels(options.Context, application.GalleryService(), options.Galleries, options.BackendGalleries, options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, nil, options.ModelsURL...); err != nil {
|
|
xlog.Error("error installing models", "error", err)
|
|
}
|
|
|
|
for _, backend := range options.ExternalBackends {
|
|
if err := galleryop.InstallExternalBackend(options.Context, options.BackendGalleries, options.SystemState, application.ModelLoader(), nil, backend, "", ""); err != nil {
|
|
xlog.Error("error installing external backend", "error", err)
|
|
}
|
|
}
|
|
|
|
configLoaderOpts := options.ToConfigLoaderOptions()
|
|
|
|
if err := application.ModelConfigLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil {
|
|
xlog.Error("error loading config files", "error", err)
|
|
}
|
|
|
|
if err := gallery.RegisterBackends(options.SystemState, application.ModelLoader()); err != nil {
|
|
xlog.Error("error registering external backends", "error", err)
|
|
}
|
|
|
|
// Start background upgrade checker for backends.
|
|
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
|
|
// instance runs periodic checks (avoids duplicate upgrades across replicas).
|
|
if len(options.BackendGalleries) > 0 {
|
|
// Pass a lazy getter for the backend manager so the checker always
|
|
// uses the active one — DistributedBackendManager is swapped in above
|
|
// and asks workers for their installed backends, which is what
|
|
// upgrade detection needs in distributed mode.
|
|
bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() }
|
|
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn)
|
|
application.upgradeChecker = uc
|
|
// Refresh the upgrade cache the moment a backend op finishes — otherwise
|
|
// the UI keeps showing a just-upgraded backend as upgradeable until the
|
|
// next 6-hour tick. TriggerCheck is non-blocking.
|
|
if gs := application.GalleryService(); gs != nil {
|
|
gs.OnBackendOpCompleted = uc.TriggerCheck
|
|
}
|
|
go uc.Run(options.Context)
|
|
}
|
|
|
|
if options.ConfigFile != "" {
|
|
if err := application.ModelConfigLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil {
|
|
xlog.Error("error loading config file", "error", err)
|
|
}
|
|
}
|
|
|
|
if err := application.ModelConfigLoader().Preload(options.SystemState.Model.ModelsPath); err != nil {
|
|
xlog.Error("error downloading models", "error", err)
|
|
}
|
|
|
|
if options.PreloadJSONModels != "" {
|
|
if err := galleryop.ApplyGalleryFromString(options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.Galleries, options.BackendGalleries, options.PreloadJSONModels); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if options.PreloadModelsFromPath != "" {
|
|
if err := galleryop.ApplyGalleryFromFile(options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.Galleries, options.BackendGalleries, options.PreloadModelsFromPath); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if options.Debug {
|
|
for _, v := range application.ModelConfigLoader().GetAllModelsConfigs() {
|
|
xlog.Debug("Model", "name", v.Name, "config", v)
|
|
}
|
|
}
|
|
|
|
// Load runtime settings from file if DynamicConfigsDir is set
|
|
// This applies file settings with env var precedence (env vars take priority)
|
|
// Note: startupConfigCopy was already created above, so it has the original env var values
|
|
if options.DynamicConfigsDir != "" {
|
|
loadRuntimeSettingsFromFile(options)
|
|
}
|
|
|
|
application.ModelLoader().SetBackendLoggingEnabled(options.EnableBackendLogging)
|
|
|
|
// turn off any process that was started by GRPC if the context is canceled
|
|
go func() {
|
|
<-options.Context.Done()
|
|
xlog.Debug("Context canceled, shutting down")
|
|
application.distributed.Shutdown()
|
|
err := application.ModelLoader().StopAllGRPC()
|
|
if err != nil {
|
|
xlog.Error("error while stopping all grpc backends", "error", err)
|
|
}
|
|
}()
|
|
|
|
// Initialize watchdog with current settings (after loading from file)
|
|
initializeWatchdog(application, options)
|
|
|
|
if options.LoadToMemory != nil && !options.SingleBackend {
|
|
for _, m := range options.LoadToMemory {
|
|
cfg, err := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(m, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
xlog.Debug("Auto loading model into memory from file", "model", m, "file", cfg.Model)
|
|
|
|
o := backend.ModelOptions(*cfg, options)
|
|
|
|
var backendErr error
|
|
_, backendErr = application.ModelLoader().Load(o...)
|
|
if backendErr != nil {
|
|
return nil, backendErr
|
|
}
|
|
}
|
|
}
|
|
|
|
// Watch the configuration directory
|
|
startWatcher(options)
|
|
|
|
xlog.Info("core/startup process completed!")
|
|
return application, nil
|
|
}
|
|
|
|
func startWatcher(options *config.ApplicationConfig) {
|
|
if options.DynamicConfigsDir == "" {
|
|
// No need to start the watcher if the directory is not set
|
|
return
|
|
}
|
|
|
|
if _, err := os.Stat(options.DynamicConfigsDir); err != nil {
|
|
if os.IsNotExist(err) {
|
|
// We try to create the directory if it does not exist and was specified
|
|
if err := os.MkdirAll(options.DynamicConfigsDir, 0700); err != nil {
|
|
xlog.Error("failed creating DynamicConfigsDir", "error", err)
|
|
}
|
|
} else {
|
|
// something else happened, we log the error and don't start the watcher
|
|
xlog.Error("failed to read DynamicConfigsDir, watcher will not be started", "error", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
configHandler := newConfigFileHandler(options)
|
|
if err := configHandler.Watch(); err != nil {
|
|
xlog.Error("failed creating watcher", "error", err)
|
|
}
|
|
}
|
|
|
|
// loadRuntimeSettingsFromFile loads settings from runtime_settings.json with env var precedence
|
|
// This function is called at startup, before env vars are applied via AppOptions.
|
|
// Since env vars are applied via AppOptions in run.go, we need to check if they're set.
|
|
// We do this by checking if the current options values differ from defaults, which would
|
|
// indicate they were set from env vars. However, a simpler approach is to just apply
|
|
// file settings here, and let the AppOptions (which are applied after this) override them.
|
|
// But actually, this is called AFTER AppOptions are applied in New(), so we need to check env vars.
|
|
// The cleanest solution: Store original values before applying file, or check if values match
|
|
// what would be set from env vars. For now, we'll apply file settings and they'll be
|
|
// overridden by AppOptions if env vars were set (but AppOptions are already applied).
|
|
// Actually, this function is called in New() before AppOptions are fully processed for watchdog.
|
|
// Let's check the call order: New() -> loadRuntimeSettingsFromFile() -> initializeWatchdog()
|
|
// But AppOptions are applied in NewApplicationConfig() which is called first.
|
|
// So at this point, options already has values from env vars. We should compare against
|
|
// defaults to see if env vars were set. But we don't have defaults stored.
|
|
// Simplest: Just apply file settings. If env vars were set, they're already in options.
|
|
// The file watcher handler will handle runtime changes properly by comparing with startupAppConfig.
|
|
func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
|
|
settingsFile := filepath.Join(options.DynamicConfigsDir, "runtime_settings.json")
|
|
fileContent, err := os.ReadFile(settingsFile)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
xlog.Debug("runtime_settings.json not found, using defaults")
|
|
return
|
|
}
|
|
xlog.Warn("failed to read runtime_settings.json", "error", err)
|
|
return
|
|
}
|
|
|
|
var settings config.RuntimeSettings
|
|
|
|
if err := json.Unmarshal(fileContent, &settings); err != nil {
|
|
xlog.Warn("failed to parse runtime_settings.json", "error", err)
|
|
return
|
|
}
|
|
|
|
// At this point, options already has values from env vars (via AppOptions in run.go).
|
|
// To avoid env var duplication, we determine if env vars were set by checking if
|
|
// current values differ from defaults. Defaults are: false for bools, 0 for durations.
|
|
// If current value is at default, it likely wasn't set from env var, so we can apply file.
|
|
// If current value is non-default, it was likely set from env var, so we preserve it.
|
|
// Note: This means env vars explicitly setting to false/0 won't be distinguishable from defaults,
|
|
// but that's an acceptable limitation to avoid env var duplication.
|
|
|
|
if settings.WatchdogIdleEnabled != nil {
|
|
// Only apply if current value is default (false), suggesting it wasn't set from env var
|
|
if !options.WatchDogIdle {
|
|
options.WatchDogIdle = *settings.WatchdogIdleEnabled
|
|
if options.WatchDogIdle {
|
|
options.WatchDog = true
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogBusyEnabled != nil {
|
|
if !options.WatchDogBusy {
|
|
options.WatchDogBusy = *settings.WatchdogBusyEnabled
|
|
if options.WatchDogBusy {
|
|
options.WatchDog = true
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogIdleTimeout != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.WatchDogIdleTimeout == 0 {
|
|
dur, err := time.ParseDuration(*settings.WatchdogIdleTimeout)
|
|
if err == nil {
|
|
options.WatchDogIdleTimeout = dur
|
|
} else {
|
|
xlog.Warn("invalid watchdog idle timeout in runtime_settings.json", "error", err, "timeout", *settings.WatchdogIdleTimeout)
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogBusyTimeout != nil {
|
|
if options.WatchDogBusyTimeout == 0 {
|
|
dur, err := time.ParseDuration(*settings.WatchdogBusyTimeout)
|
|
if err == nil {
|
|
options.WatchDogBusyTimeout = dur
|
|
} else {
|
|
xlog.Warn("invalid watchdog busy timeout in runtime_settings.json", "error", err, "timeout", *settings.WatchdogBusyTimeout)
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogInterval != nil {
|
|
if options.WatchDogInterval == 0 {
|
|
dur, err := time.ParseDuration(*settings.WatchdogInterval)
|
|
if err == nil {
|
|
options.WatchDogInterval = dur
|
|
} else {
|
|
xlog.Warn("invalid watchdog interval in runtime_settings.json", "error", err, "interval", *settings.WatchdogInterval)
|
|
options.WatchDogInterval = model.DefaultWatchdogInterval
|
|
}
|
|
}
|
|
}
|
|
// Handle MaxActiveBackends (new) and SingleBackend (deprecated)
|
|
if settings.MaxActiveBackends != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.MaxActiveBackends == 0 {
|
|
options.MaxActiveBackends = *settings.MaxActiveBackends
|
|
// For backward compatibility, also set SingleBackend if MaxActiveBackends == 1
|
|
options.SingleBackend = (*settings.MaxActiveBackends == 1)
|
|
}
|
|
} else if settings.SingleBackend != nil {
|
|
// Legacy: SingleBackend maps to MaxActiveBackends = 1
|
|
if !options.SingleBackend {
|
|
options.SingleBackend = *settings.SingleBackend
|
|
if *settings.SingleBackend {
|
|
options.MaxActiveBackends = 1
|
|
}
|
|
}
|
|
}
|
|
if settings.MemoryReclaimerEnabled != nil {
|
|
// Only apply if current value is default (false), suggesting it wasn't set from env var
|
|
if !options.MemoryReclaimerEnabled {
|
|
options.MemoryReclaimerEnabled = *settings.MemoryReclaimerEnabled
|
|
if options.MemoryReclaimerEnabled {
|
|
options.WatchDog = true // Memory reclaimer requires watchdog
|
|
}
|
|
}
|
|
}
|
|
if settings.MemoryReclaimerThreshold != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.MemoryReclaimerThreshold == 0 {
|
|
options.MemoryReclaimerThreshold = *settings.MemoryReclaimerThreshold
|
|
}
|
|
}
|
|
if settings.ForceEvictionWhenBusy != nil {
|
|
// Only apply if current value is default (false), suggesting it wasn't set from env var
|
|
if !options.ForceEvictionWhenBusy {
|
|
options.ForceEvictionWhenBusy = *settings.ForceEvictionWhenBusy
|
|
}
|
|
}
|
|
if settings.LRUEvictionMaxRetries != nil {
|
|
// Only apply if current value is default (30), suggesting it wasn't set from env var
|
|
if options.LRUEvictionMaxRetries == 0 {
|
|
options.LRUEvictionMaxRetries = *settings.LRUEvictionMaxRetries
|
|
}
|
|
}
|
|
if settings.LRUEvictionRetryInterval != nil {
|
|
// Only apply if current value is default (1s), suggesting it wasn't set from env var
|
|
if options.LRUEvictionRetryInterval == 0 {
|
|
dur, err := time.ParseDuration(*settings.LRUEvictionRetryInterval)
|
|
if err == nil {
|
|
options.LRUEvictionRetryInterval = dur
|
|
} else {
|
|
xlog.Warn("invalid LRU eviction retry interval in runtime_settings.json", "error", err, "interval", *settings.LRUEvictionRetryInterval)
|
|
}
|
|
}
|
|
}
|
|
if settings.AgentJobRetentionDays != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.AgentJobRetentionDays == 0 {
|
|
options.AgentJobRetentionDays = *settings.AgentJobRetentionDays
|
|
}
|
|
}
|
|
if !options.WatchDogIdle && !options.WatchDogBusy {
|
|
if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled {
|
|
options.WatchDog = true
|
|
}
|
|
}
|
|
|
|
// P2P settings
|
|
if settings.P2PToken != nil {
|
|
if options.P2PToken == "" {
|
|
options.P2PToken = *settings.P2PToken
|
|
}
|
|
}
|
|
if settings.P2PNetworkID != nil {
|
|
if options.P2PNetworkID == "" {
|
|
options.P2PNetworkID = *settings.P2PNetworkID
|
|
}
|
|
}
|
|
if settings.Federated != nil {
|
|
if !options.Federated {
|
|
options.Federated = *settings.Federated
|
|
}
|
|
}
|
|
|
|
if settings.EnableBackendLogging != nil {
|
|
if !options.EnableBackendLogging {
|
|
options.EnableBackendLogging = *settings.EnableBackendLogging
|
|
}
|
|
}
|
|
|
|
// Tracing settings
|
|
if settings.EnableTracing != nil {
|
|
if !options.EnableTracing {
|
|
options.EnableTracing = *settings.EnableTracing
|
|
}
|
|
}
|
|
if settings.TracingMaxItems != nil {
|
|
if options.TracingMaxItems == 0 {
|
|
options.TracingMaxItems = *settings.TracingMaxItems
|
|
}
|
|
}
|
|
|
|
// Branding / whitelabeling. There are no env vars for these — the file is
|
|
// the only source — so apply unconditionally. Without this block a server
|
|
// restart silently drops the configured instance name, tagline, and asset
|
|
// filenames.
|
|
if settings.InstanceName != nil {
|
|
options.Branding.InstanceName = *settings.InstanceName
|
|
}
|
|
if settings.InstanceTagline != nil {
|
|
options.Branding.InstanceTagline = *settings.InstanceTagline
|
|
}
|
|
if settings.LogoFile != nil {
|
|
options.Branding.LogoFile = *settings.LogoFile
|
|
}
|
|
if settings.LogoHorizontalFile != nil {
|
|
options.Branding.LogoHorizontalFile = *settings.LogoHorizontalFile
|
|
}
|
|
if settings.FaviconFile != nil {
|
|
options.Branding.FaviconFile = *settings.FaviconFile
|
|
}
|
|
|
|
// Backend upgrade flags
|
|
if settings.AutoUpgradeBackends != nil {
|
|
if !options.AutoUpgradeBackends {
|
|
options.AutoUpgradeBackends = *settings.AutoUpgradeBackends
|
|
}
|
|
}
|
|
if settings.PreferDevelopmentBackends != nil {
|
|
if !options.PreferDevelopmentBackends {
|
|
options.PreferDevelopmentBackends = *settings.PreferDevelopmentBackends
|
|
}
|
|
}
|
|
|
|
// LocalAI Assistant — file-stored as the negation (LocalAIAssistantEnabled).
|
|
// Default is enabled (DisableLocalAIAssistant=false). Apply the file value
|
|
// unless env explicitly disabled the assistant (DisableLocalAIAssistant=true).
|
|
if settings.LocalAIAssistantEnabled != nil {
|
|
if !options.DisableLocalAIAssistant {
|
|
options.DisableLocalAIAssistant = !*settings.LocalAIAssistantEnabled
|
|
}
|
|
}
|
|
|
|
// Open Responses TTL. Default is 0 (no expiration). Treat the on-disk
|
|
// "0"/empty as "no expiration" — a no-op since options is already 0 —
|
|
// and parse anything else as a duration.
|
|
if settings.OpenResponsesStoreTTL != nil && options.OpenResponsesStoreTTL == 0 {
|
|
v := *settings.OpenResponsesStoreTTL
|
|
if v != "0" && v != "" {
|
|
if dur, err := time.ParseDuration(v); err == nil {
|
|
options.OpenResponsesStoreTTL = dur
|
|
} else {
|
|
xlog.Warn("invalid open_responses_store_ttl in runtime_settings.json", "error", err, "ttl", v)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Agent Pool. NewApplicationConfig seeds non-zero defaults for some of
|
|
// these fields (Enabled=true, EmbeddingModel="granite-embedding-107m-
|
|
// multilingual", MaxChunkingSize=400). The "if at default, apply file"
|
|
// gate uses each field's actual default literal so file values can
|
|
// override the bootstrap default while still letting an env-set value
|
|
// (e.g. WithAgentPoolEmbeddingModel from a flag) win.
|
|
if settings.AgentPoolEnabled != nil && options.AgentPool.Enabled {
|
|
options.AgentPool.Enabled = *settings.AgentPoolEnabled
|
|
}
|
|
if settings.AgentPoolDefaultModel != nil && options.AgentPool.DefaultModel == "" {
|
|
options.AgentPool.DefaultModel = *settings.AgentPoolDefaultModel
|
|
}
|
|
if settings.AgentPoolEmbeddingModel != nil {
|
|
if options.AgentPool.EmbeddingModel == "" || options.AgentPool.EmbeddingModel == "granite-embedding-107m-multilingual" {
|
|
options.AgentPool.EmbeddingModel = *settings.AgentPoolEmbeddingModel
|
|
}
|
|
}
|
|
if settings.AgentPoolMaxChunkingSize != nil {
|
|
if options.AgentPool.MaxChunkingSize == 0 || options.AgentPool.MaxChunkingSize == 400 {
|
|
options.AgentPool.MaxChunkingSize = *settings.AgentPoolMaxChunkingSize
|
|
}
|
|
}
|
|
if settings.AgentPoolChunkOverlap != nil && options.AgentPool.ChunkOverlap == 0 {
|
|
options.AgentPool.ChunkOverlap = *settings.AgentPoolChunkOverlap
|
|
}
|
|
if settings.AgentPoolEnableLogs != nil && !options.AgentPool.EnableLogs {
|
|
options.AgentPool.EnableLogs = *settings.AgentPoolEnableLogs
|
|
}
|
|
if settings.AgentPoolCollectionDBPath != nil && options.AgentPool.CollectionDBPath == "" {
|
|
options.AgentPool.CollectionDBPath = *settings.AgentPoolCollectionDBPath
|
|
}
|
|
if settings.AgentPoolVectorEngine != nil {
|
|
// Default is "chromem"; treat both that and empty as "not env-set".
|
|
if options.AgentPool.VectorEngine == "" || options.AgentPool.VectorEngine == "chromem" {
|
|
options.AgentPool.VectorEngine = *settings.AgentPoolVectorEngine
|
|
}
|
|
}
|
|
if settings.AgentPoolDatabaseURL != nil && options.AgentPool.DatabaseURL == "" {
|
|
options.AgentPool.DatabaseURL = *settings.AgentPoolDatabaseURL
|
|
}
|
|
if settings.AgentPoolAgentHubURL != nil {
|
|
// Default is "https://agenthub.localai.io"; treat both that and empty
|
|
// as "not env-set".
|
|
if options.AgentPool.AgentHubURL == "" || options.AgentPool.AgentHubURL == "https://agenthub.localai.io" {
|
|
options.AgentPool.AgentHubURL = *settings.AgentPoolAgentHubURL
|
|
}
|
|
}
|
|
|
|
xlog.Debug("Runtime settings loaded from runtime_settings.json")
|
|
}
|
|
|
|
// initializeWatchdog initializes the watchdog with current ApplicationConfig settings
|
|
func initializeWatchdog(application *Application, options *config.ApplicationConfig) {
|
|
// Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend)
|
|
lruLimit := options.GetEffectiveMaxActiveBackends()
|
|
|
|
// Create watchdog if enabled OR if LRU limit is set OR if memory reclaimer is enabled
|
|
if options.WatchDog || lruLimit > 0 || options.MemoryReclaimerEnabled {
|
|
wd := model.NewWatchDog(
|
|
model.WithProcessManager(application.ModelLoader()),
|
|
model.WithBusyTimeout(options.WatchDogBusyTimeout),
|
|
model.WithIdleTimeout(options.WatchDogIdleTimeout),
|
|
model.WithWatchdogInterval(options.WatchDogInterval),
|
|
model.WithBusyCheck(options.WatchDogBusy),
|
|
model.WithIdleCheck(options.WatchDogIdle),
|
|
model.WithLRULimit(lruLimit),
|
|
model.WithMemoryReclaimer(options.MemoryReclaimerEnabled, options.MemoryReclaimerThreshold),
|
|
model.WithForceEvictionWhenBusy(options.ForceEvictionWhenBusy),
|
|
)
|
|
application.ModelLoader().SetWatchDog(wd)
|
|
|
|
// Initialize ModelLoader LRU eviction retry settings
|
|
application.ModelLoader().SetLRUEvictionRetrySettings(
|
|
options.LRUEvictionMaxRetries,
|
|
options.LRUEvictionRetryInterval,
|
|
)
|
|
|
|
// Sync per-model state from configs to the watchdog. Without this,
|
|
// `pinned: true` and `concurrency_groups:` are only honored after a
|
|
// settings-driven RestartWatchdog and never at boot.
|
|
application.SyncPinnedModelsToWatchdog()
|
|
application.SyncModelGroupsToWatchdog()
|
|
|
|
// Start watchdog goroutine if any periodic checks are enabled
|
|
// LRU eviction doesn't need the Run() loop - it's triggered on model load
|
|
// But memory reclaimer needs the Run() loop for periodic checking
|
|
if options.WatchDogBusy || options.WatchDogIdle || options.MemoryReclaimerEnabled {
|
|
go wd.Run()
|
|
}
|
|
|
|
go func() {
|
|
<-options.Context.Done()
|
|
xlog.Debug("Context canceled, shutting down")
|
|
wd.Shutdown()
|
|
}()
|
|
}
|
|
}
|
|
|
|
// loadOrGenerateHMACSecret loads an HMAC secret from the given file path,
|
|
// or generates a random 32-byte secret and persists it if the file doesn't exist.
|
|
func loadOrGenerateHMACSecret(path string) (string, error) {
|
|
data, err := os.ReadFile(path)
|
|
if err == nil {
|
|
secret := string(data)
|
|
if len(secret) >= 32 {
|
|
return secret, nil
|
|
}
|
|
}
|
|
|
|
b := make([]byte, 32)
|
|
if _, err := rand.Read(b); err != nil {
|
|
return "", fmt.Errorf("failed to generate HMAC secret: %w", err)
|
|
}
|
|
secret := hex.EncodeToString(b)
|
|
|
|
if err := os.WriteFile(path, []byte(secret), 0600); err != nil {
|
|
return "", fmt.Errorf("failed to persist HMAC secret: %w", err)
|
|
}
|
|
|
|
xlog.Info("Generated new HMAC secret for API key hashing", "path", path)
|
|
return secret, nil
|
|
}
|
|
|
|
// migrateDataFiles moves persistent data files from the old config directory
|
|
// to the new data directory. Only moves files that exist in src but not in dst.
|
|
func migrateDataFiles(srcDir, dstDir string) {
|
|
// Files and directories to migrate
|
|
items := []string{
|
|
"agent_tasks.json",
|
|
"agent_jobs.json",
|
|
"collections",
|
|
"assets",
|
|
}
|
|
|
|
migrated := false
|
|
for _, item := range items {
|
|
srcPath := filepath.Join(srcDir, item)
|
|
dstPath := filepath.Join(dstDir, item)
|
|
|
|
// Only migrate if source exists and destination does not
|
|
if _, err := os.Stat(srcPath); os.IsNotExist(err) {
|
|
continue
|
|
}
|
|
if _, err := os.Stat(dstPath); err == nil {
|
|
continue // destination already exists, skip
|
|
}
|
|
|
|
if err := os.Rename(srcPath, dstPath); err != nil {
|
|
xlog.Warn("Failed to migrate data file, will copy instead", "src", srcPath, "dst", dstPath, "error", err)
|
|
// os.Rename fails across filesystems, fall back to leaving in place
|
|
// and log a warning for the user to manually move
|
|
xlog.Warn("Data file remains in old location, please move manually", "src", srcPath, "dst", dstPath)
|
|
continue
|
|
}
|
|
migrated = true
|
|
xlog.Info("Migrated data file to new data path", "src", srcPath, "dst", dstPath)
|
|
}
|
|
|
|
if migrated {
|
|
xlog.Info("Data migration complete", "from", srcDir, "to", dstDir)
|
|
}
|
|
}
|