mirror of
https://github.com/mudler/LocalAI.git
synced 2026-02-18 23:21:58 -05:00
fix: improve watchdown logics (#8591)
* fix: ensure proper watchdog shutdown and state passing between restarts Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix: add missing watchdog settings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix: untrack model if we shut it down successfully Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
committed by
GitHub
parent
067a255435
commit
ecba23d44e
@@ -319,6 +319,29 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
|
||||
options.MemoryReclaimerThreshold = *settings.MemoryReclaimerThreshold
|
||||
}
|
||||
}
|
||||
if settings.ForceEvictionWhenBusy != nil {
|
||||
// Only apply if current value is default (false), suggesting it wasn't set from env var
|
||||
if !options.ForceEvictionWhenBusy {
|
||||
options.ForceEvictionWhenBusy = *settings.ForceEvictionWhenBusy
|
||||
}
|
||||
}
|
||||
if settings.LRUEvictionMaxRetries != nil {
|
||||
// Only apply if current value is default (30), suggesting it wasn't set from env var
|
||||
if options.LRUEvictionMaxRetries == 0 {
|
||||
options.LRUEvictionMaxRetries = *settings.LRUEvictionMaxRetries
|
||||
}
|
||||
}
|
||||
if settings.LRUEvictionRetryInterval != nil {
|
||||
// Only apply if current value is default (1s), suggesting it wasn't set from env var
|
||||
if options.LRUEvictionRetryInterval == 0 {
|
||||
dur, err := time.ParseDuration(*settings.LRUEvictionRetryInterval)
|
||||
if err == nil {
|
||||
options.LRUEvictionRetryInterval = dur
|
||||
} else {
|
||||
xlog.Warn("invalid LRU eviction retry interval in runtime_settings.json", "error", err, "interval", *settings.LRUEvictionRetryInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
if settings.AgentJobRetentionDays != nil {
|
||||
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
||||
if options.AgentJobRetentionDays == 0 {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package application
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
@@ -37,11 +35,15 @@ func (a *Application) startWatchdog() error {
|
||||
model.WithMemoryReclaimer(appConfig.MemoryReclaimerEnabled, appConfig.MemoryReclaimerThreshold),
|
||||
model.WithForceEvictionWhenBusy(appConfig.ForceEvictionWhenBusy),
|
||||
)
|
||||
a.modelLoader.SetWatchDog(wd)
|
||||
|
||||
// Create new stop channel
|
||||
// Create new stop channel BEFORE setting up any goroutines
|
||||
// This prevents race conditions where the old shutdown handler might
|
||||
// receive the closed channel and try to shut down the new watchdog
|
||||
a.watchdogStop = make(chan bool, 1)
|
||||
|
||||
// Set the watchdog on the model loader
|
||||
a.modelLoader.SetWatchDog(wd)
|
||||
|
||||
// Start watchdog goroutine if any periodic checks are enabled
|
||||
// LRU eviction doesn't need the Run() loop - it's triggered on model load
|
||||
// But memory reclaimer needs the Run() loop for periodic checking
|
||||
@@ -49,15 +51,19 @@ func (a *Application) startWatchdog() error {
|
||||
go wd.Run()
|
||||
}
|
||||
|
||||
// Setup shutdown handler
|
||||
// Setup shutdown handler - this goroutine will wait on a.watchdogStop
|
||||
// which is now a fresh channel, so it won't receive any stale signals
|
||||
// Note: We capture wd in a local variable to ensure this handler operates
|
||||
// on the correct watchdog instance (not a later one that gets assigned to wd)
|
||||
wdForShutdown := wd
|
||||
go func() {
|
||||
select {
|
||||
case <-a.watchdogStop:
|
||||
xlog.Debug("Watchdog stop signal received")
|
||||
wd.Shutdown()
|
||||
wdForShutdown.Shutdown()
|
||||
case <-appConfig.Context.Done():
|
||||
xlog.Debug("Context canceled, shutting down watchdog")
|
||||
wd.Shutdown()
|
||||
wdForShutdown.Shutdown()
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -82,20 +88,41 @@ func (a *Application) RestartWatchdog() error {
|
||||
a.watchdogMutex.Lock()
|
||||
defer a.watchdogMutex.Unlock()
|
||||
|
||||
// Shutdown existing watchdog if running
|
||||
// Get the old watchdog before we shut it down
|
||||
oldWD := a.modelLoader.GetWatchDog()
|
||||
|
||||
// Get the state from the old watchdog before shutting it down
|
||||
// This preserves information about loaded models
|
||||
var oldState model.WatchDogState
|
||||
if oldWD != nil {
|
||||
oldState = oldWD.GetState()
|
||||
}
|
||||
|
||||
// Signal all handlers to stop by closing the stop channel
|
||||
// This will cause any goroutine waiting on <-a.watchdogStop to unblock
|
||||
if a.watchdogStop != nil {
|
||||
close(a.watchdogStop)
|
||||
a.watchdogStop = nil
|
||||
}
|
||||
|
||||
// Shutdown existing watchdog if running
|
||||
currentWD := a.modelLoader.GetWatchDog()
|
||||
if currentWD != nil {
|
||||
currentWD.Shutdown()
|
||||
// Wait a bit for shutdown to complete
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Shutdown existing watchdog - this triggers the stop signal
|
||||
if oldWD != nil {
|
||||
oldWD.Shutdown()
|
||||
// Wait for the old watchdog's Run() goroutine to fully shut down
|
||||
oldWD.WaitDone()
|
||||
}
|
||||
|
||||
// Start watchdog with new settings
|
||||
return a.startWatchdog()
|
||||
if err := a.startWatchdog(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Restore the model state from the old watchdog to the new one
|
||||
// This ensures the new watchdog knows about already-loaded models
|
||||
newWD := a.modelLoader.GetWatchDog()
|
||||
if newWD != nil && len(oldState.AddressModelMap) > 0 {
|
||||
newWD.RestoreState(oldState)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ type WatchDog struct {
|
||||
addressModelMap map[string]string
|
||||
pm ProcessManager
|
||||
stop chan bool
|
||||
done chan bool // Signals when Run() has completely shut down
|
||||
|
||||
busyCheck, idleCheck bool
|
||||
lruLimit int // Maximum number of active backends (0 = unlimited)
|
||||
@@ -78,6 +79,7 @@ func NewWatchDog(opts ...WatchDogOption) *WatchDog {
|
||||
lruLimit: o.lruLimit,
|
||||
addressModelMap: make(map[string]string),
|
||||
stop: make(chan bool, 1),
|
||||
done: make(chan bool, 1),
|
||||
memoryReclaimerEnabled: o.memoryReclaimerEnabled,
|
||||
memoryReclaimerThreshold: o.memoryReclaimerThreshold,
|
||||
watchdogInterval: o.watchdogInterval,
|
||||
@@ -128,6 +130,12 @@ func (wd *WatchDog) Shutdown() {
|
||||
wd.stop <- true
|
||||
}
|
||||
|
||||
// WaitDone blocks until the watchdog's Run() goroutine has completely shut down.
|
||||
// This should be called after Shutdown() to ensure the watchdog is fully stopped.
|
||||
func (wd *WatchDog) WaitDone() {
|
||||
<-wd.done
|
||||
}
|
||||
|
||||
func (wd *WatchDog) AddAddressModelMap(address string, model string) {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
@@ -173,6 +181,71 @@ func (wd *WatchDog) GetLoadedModelCount() int {
|
||||
return len(wd.addressModelMap)
|
||||
}
|
||||
|
||||
// WatchDogState holds the current state of models tracked by the watchdog
|
||||
type WatchDogState struct {
|
||||
AddressModelMap map[string]string
|
||||
BusyTime map[string]time.Time
|
||||
IdleTime map[string]time.Time
|
||||
LastUsed map[string]time.Time
|
||||
AddressMap map[string]*process.Process
|
||||
}
|
||||
|
||||
// GetState returns the current state of models tracked by the watchdog
|
||||
// This can be used to restore state when creating a new watchdog
|
||||
func (wd *WatchDog) GetState() WatchDogState {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
|
||||
// Create copies to avoid race conditions
|
||||
addressModelMap := make(map[string]string, len(wd.addressModelMap))
|
||||
for k, v := range wd.addressModelMap {
|
||||
addressModelMap[k] = v
|
||||
}
|
||||
|
||||
busyTime := make(map[string]time.Time, len(wd.busyTime))
|
||||
for k, v := range wd.busyTime {
|
||||
busyTime[k] = v
|
||||
}
|
||||
|
||||
idleTime := make(map[string]time.Time, len(wd.idleTime))
|
||||
for k, v := range wd.idleTime {
|
||||
idleTime[k] = v
|
||||
}
|
||||
|
||||
lastUsed := make(map[string]time.Time, len(wd.lastUsed))
|
||||
for k, v := range wd.lastUsed {
|
||||
lastUsed[k] = v
|
||||
}
|
||||
|
||||
addressMap := make(map[string]*process.Process, len(wd.addressMap))
|
||||
for k, v := range wd.addressMap {
|
||||
addressMap[k] = v
|
||||
}
|
||||
|
||||
return WatchDogState{
|
||||
AddressModelMap: addressModelMap,
|
||||
BusyTime: busyTime,
|
||||
IdleTime: idleTime,
|
||||
LastUsed: lastUsed,
|
||||
AddressMap: addressMap,
|
||||
}
|
||||
}
|
||||
|
||||
// RestoreState restores the model state from a previous watchdog
|
||||
// This should be called after the new watchdog is created but before Run() is started
|
||||
func (wd *WatchDog) RestoreState(state WatchDogState) {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
|
||||
wd.addressModelMap = state.AddressModelMap
|
||||
wd.busyTime = state.BusyTime
|
||||
wd.idleTime = state.IdleTime
|
||||
wd.lastUsed = state.LastUsed
|
||||
wd.addressMap = state.AddressMap
|
||||
|
||||
xlog.Info("[WatchDog] Restored model state", "modelCount", len(wd.addressModelMap))
|
||||
}
|
||||
|
||||
// modelUsageInfo holds information about a model's usage for LRU sorting
|
||||
type modelUsageInfo struct {
|
||||
address string
|
||||
@@ -279,6 +352,7 @@ func (wd *WatchDog) Run() {
|
||||
select {
|
||||
case <-wd.stop:
|
||||
xlog.Info("[WatchDog] Stopping watchdog")
|
||||
wd.done <- true
|
||||
return
|
||||
case <-time.After(wd.watchdogInterval):
|
||||
// Check if any monitoring is enabled
|
||||
@@ -290,6 +364,7 @@ func (wd *WatchDog) Run() {
|
||||
|
||||
if !busyCheck && !idleCheck && !memoryCheck {
|
||||
xlog.Info("[WatchDog] No checks enabled, stopping watchdog")
|
||||
wd.done <- true
|
||||
return
|
||||
}
|
||||
if busyCheck {
|
||||
@@ -462,14 +537,16 @@ func (wd *WatchDog) evictLRUModel() {
|
||||
|
||||
xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed)
|
||||
|
||||
// Untrack the model
|
||||
wd.untrack(lruModel.address)
|
||||
wd.Unlock()
|
||||
|
||||
// Shutdown the model
|
||||
if err := wd.pm.ShutdownModel(lruModel.model); err != nil {
|
||||
xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model)
|
||||
} else {
|
||||
// Untrack the model
|
||||
wd.Lock()
|
||||
wd.untrack(lruModel.address)
|
||||
wd.Unlock()
|
||||
xlog.Info("[WatchDog] Memory reclaimer eviction complete", "model", lruModel.model)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user