diff --git a/core/application/startup.go b/core/application/startup.go index 68e24f196..29090a4b7 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -319,6 +319,29 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) { options.MemoryReclaimerThreshold = *settings.MemoryReclaimerThreshold } } + if settings.ForceEvictionWhenBusy != nil { + // Only apply if current value is default (false), suggesting it wasn't set from env var + if !options.ForceEvictionWhenBusy { + options.ForceEvictionWhenBusy = *settings.ForceEvictionWhenBusy + } + } + if settings.LRUEvictionMaxRetries != nil { + // Only apply if current value is default (30), suggesting it wasn't set from env var + if options.LRUEvictionMaxRetries == 0 { + options.LRUEvictionMaxRetries = *settings.LRUEvictionMaxRetries + } + } + if settings.LRUEvictionRetryInterval != nil { + // Only apply if current value is default (1s), suggesting it wasn't set from env var + if options.LRUEvictionRetryInterval == 0 { + dur, err := time.ParseDuration(*settings.LRUEvictionRetryInterval) + if err == nil { + options.LRUEvictionRetryInterval = dur + } else { + xlog.Warn("invalid LRU eviction retry interval in runtime_settings.json", "error", err, "interval", *settings.LRUEvictionRetryInterval) + } + } + } if settings.AgentJobRetentionDays != nil { // Only apply if current value is default (0), suggesting it wasn't set from env var if options.AgentJobRetentionDays == 0 { diff --git a/core/application/watchdog.go b/core/application/watchdog.go index 054205fef..f11655512 100644 --- a/core/application/watchdog.go +++ b/core/application/watchdog.go @@ -1,8 +1,6 @@ package application import ( - "time" - "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/xlog" ) @@ -37,11 +35,15 @@ func (a *Application) startWatchdog() error { model.WithMemoryReclaimer(appConfig.MemoryReclaimerEnabled, appConfig.MemoryReclaimerThreshold), model.WithForceEvictionWhenBusy(appConfig.ForceEvictionWhenBusy), ) - a.modelLoader.SetWatchDog(wd) - // Create new stop channel + // Create new stop channel BEFORE setting up any goroutines + // This prevents race conditions where the old shutdown handler might + // receive the closed channel and try to shut down the new watchdog a.watchdogStop = make(chan bool, 1) + // Set the watchdog on the model loader + a.modelLoader.SetWatchDog(wd) + // Start watchdog goroutine if any periodic checks are enabled // LRU eviction doesn't need the Run() loop - it's triggered on model load // But memory reclaimer needs the Run() loop for periodic checking @@ -49,15 +51,19 @@ func (a *Application) startWatchdog() error { go wd.Run() } - // Setup shutdown handler + // Setup shutdown handler - this goroutine will wait on a.watchdogStop + // which is now a fresh channel, so it won't receive any stale signals + // Note: We capture wd in a local variable to ensure this handler operates + // on the correct watchdog instance (not a later one that gets assigned to wd) + wdForShutdown := wd go func() { select { case <-a.watchdogStop: xlog.Debug("Watchdog stop signal received") - wd.Shutdown() + wdForShutdown.Shutdown() case <-appConfig.Context.Done(): xlog.Debug("Context canceled, shutting down watchdog") - wd.Shutdown() + wdForShutdown.Shutdown() } }() @@ -82,20 +88,41 @@ func (a *Application) RestartWatchdog() error { a.watchdogMutex.Lock() defer a.watchdogMutex.Unlock() - // Shutdown existing watchdog if running + // Get the old watchdog before we shut it down + oldWD := a.modelLoader.GetWatchDog() + + // Get the state from the old watchdog before shutting it down + // This preserves information about loaded models + var oldState model.WatchDogState + if oldWD != nil { + oldState = oldWD.GetState() + } + + // Signal all handlers to stop by closing the stop channel + // This will cause any goroutine waiting on <-a.watchdogStop to unblock if a.watchdogStop != nil { close(a.watchdogStop) a.watchdogStop = nil } - // Shutdown existing watchdog if running - currentWD := a.modelLoader.GetWatchDog() - if currentWD != nil { - currentWD.Shutdown() - // Wait a bit for shutdown to complete - time.Sleep(100 * time.Millisecond) + // Shutdown existing watchdog - this triggers the stop signal + if oldWD != nil { + oldWD.Shutdown() + // Wait for the old watchdog's Run() goroutine to fully shut down + oldWD.WaitDone() } // Start watchdog with new settings - return a.startWatchdog() + if err := a.startWatchdog(); err != nil { + return err + } + + // Restore the model state from the old watchdog to the new one + // This ensures the new watchdog knows about already-loaded models + newWD := a.modelLoader.GetWatchDog() + if newWD != nil && len(oldState.AddressModelMap) > 0 { + newWD.RestoreState(oldState) + } + + return nil } diff --git a/pkg/model/watchdog.go b/pkg/model/watchdog.go index 090488a06..2d50ad819 100644 --- a/pkg/model/watchdog.go +++ b/pkg/model/watchdog.go @@ -33,6 +33,7 @@ type WatchDog struct { addressModelMap map[string]string pm ProcessManager stop chan bool + done chan bool // Signals when Run() has completely shut down busyCheck, idleCheck bool lruLimit int // Maximum number of active backends (0 = unlimited) @@ -78,6 +79,7 @@ func NewWatchDog(opts ...WatchDogOption) *WatchDog { lruLimit: o.lruLimit, addressModelMap: make(map[string]string), stop: make(chan bool, 1), + done: make(chan bool, 1), memoryReclaimerEnabled: o.memoryReclaimerEnabled, memoryReclaimerThreshold: o.memoryReclaimerThreshold, watchdogInterval: o.watchdogInterval, @@ -128,6 +130,12 @@ func (wd *WatchDog) Shutdown() { wd.stop <- true } +// WaitDone blocks until the watchdog's Run() goroutine has completely shut down. +// This should be called after Shutdown() to ensure the watchdog is fully stopped. +func (wd *WatchDog) WaitDone() { + <-wd.done +} + func (wd *WatchDog) AddAddressModelMap(address string, model string) { wd.Lock() defer wd.Unlock() @@ -173,6 +181,71 @@ func (wd *WatchDog) GetLoadedModelCount() int { return len(wd.addressModelMap) } +// WatchDogState holds the current state of models tracked by the watchdog +type WatchDogState struct { + AddressModelMap map[string]string + BusyTime map[string]time.Time + IdleTime map[string]time.Time + LastUsed map[string]time.Time + AddressMap map[string]*process.Process +} + +// GetState returns the current state of models tracked by the watchdog +// This can be used to restore state when creating a new watchdog +func (wd *WatchDog) GetState() WatchDogState { + wd.Lock() + defer wd.Unlock() + + // Create copies to avoid race conditions + addressModelMap := make(map[string]string, len(wd.addressModelMap)) + for k, v := range wd.addressModelMap { + addressModelMap[k] = v + } + + busyTime := make(map[string]time.Time, len(wd.busyTime)) + for k, v := range wd.busyTime { + busyTime[k] = v + } + + idleTime := make(map[string]time.Time, len(wd.idleTime)) + for k, v := range wd.idleTime { + idleTime[k] = v + } + + lastUsed := make(map[string]time.Time, len(wd.lastUsed)) + for k, v := range wd.lastUsed { + lastUsed[k] = v + } + + addressMap := make(map[string]*process.Process, len(wd.addressMap)) + for k, v := range wd.addressMap { + addressMap[k] = v + } + + return WatchDogState{ + AddressModelMap: addressModelMap, + BusyTime: busyTime, + IdleTime: idleTime, + LastUsed: lastUsed, + AddressMap: addressMap, + } +} + +// RestoreState restores the model state from a previous watchdog +// This should be called after the new watchdog is created but before Run() is started +func (wd *WatchDog) RestoreState(state WatchDogState) { + wd.Lock() + defer wd.Unlock() + + wd.addressModelMap = state.AddressModelMap + wd.busyTime = state.BusyTime + wd.idleTime = state.IdleTime + wd.lastUsed = state.LastUsed + wd.addressMap = state.AddressMap + + xlog.Info("[WatchDog] Restored model state", "modelCount", len(wd.addressModelMap)) +} + // modelUsageInfo holds information about a model's usage for LRU sorting type modelUsageInfo struct { address string @@ -279,6 +352,7 @@ func (wd *WatchDog) Run() { select { case <-wd.stop: xlog.Info("[WatchDog] Stopping watchdog") + wd.done <- true return case <-time.After(wd.watchdogInterval): // Check if any monitoring is enabled @@ -290,6 +364,7 @@ func (wd *WatchDog) Run() { if !busyCheck && !idleCheck && !memoryCheck { xlog.Info("[WatchDog] No checks enabled, stopping watchdog") + wd.done <- true return } if busyCheck { @@ -462,14 +537,16 @@ func (wd *WatchDog) evictLRUModel() { xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed) - // Untrack the model - wd.untrack(lruModel.address) wd.Unlock() // Shutdown the model if err := wd.pm.ShutdownModel(lruModel.model); err != nil { xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model) } else { + // Untrack the model + wd.Lock() + wd.untrack(lruModel.address) + wd.Unlock() xlog.Info("[WatchDog] Memory reclaimer eviction complete", "model", lruModel.model) } }