Files
LocalAI/core/application/watchdog.go
Ettore Di Giacinto bbcaebc1ef feat(concurrency-groups): per-model exclusive groups for backend loading (#9662)
* feat(concurrency-groups): per-model exclusive groups for backend loading

Adds `concurrency_groups: [...]` to model YAML configs. Two models that share
a group cannot be loaded concurrently on the same node — loading one evicts
the others, reusing the existing pinned/busy/retry policy from LRU eviction.

Layered design:
- Watchdog (pkg/model): per-node correctness floor — on every Load(), evict
  any loaded model that shares a group with the requested one. Pinned skips
  surface NeedMore so the loader retries (and ultimately logs a clear
  warning), instead of silently allowing the rule to be violated.
- Distributed scheduler (core/services/nodes): soft anti-affinity hint —
  scheduleNewModel prefers nodes that don't already host a same-group
  model, falling back to eviction only if every candidate has a conflict.
  Composes with NodeSelector at the same point in the candidate pipeline.

Per-node, not cluster-wide: VRAM is a node-local resource, and two heavy
models running on different nodes is fine. The ConfigLoader is wired into
SmartRouter via a small ConcurrencyConflictResolver interface so the nodes
package keeps a narrow surface on core/config.

Refactors the inner LRU eviction body into a shared collectEvictionsLocked
helper and the loader retry loop into retryEnforce(fn, maxRetries, interval),
so both LRU and group enforcement share busy/pinned/retry semantics.

Closes #9659.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(watchdog): sync pinned + concurrency_groups at startup

The startup-time watchdog setup lives in initializeWatchdog (startup.go),
not in startWatchdog (watchdog.go). The latter is only invoked from the
runtime-settings RestartWatchdog path. As a result, neither
SyncPinnedModelsToWatchdog nor SyncModelGroupsToWatchdog ran at boot,
so `pinned: true` and `concurrency_groups: [...]` only became effective
after a settings-driven watchdog restart.

Fix by adding both sync calls to initializeWatchdog. Confirmed end-to-end:
loading model A in group "heavy", then C with no group (coexists),
then B in group "heavy" now correctly evicts A and leaves [B, C].

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(test): satisfy errcheck on new os.Remove in concurrency_groups spec

CI lint runs new-from-merge-base, so the existing pre-existing
`defer os.Remove(tmp.Name())` lines are baseline-grandfathered but the
one introduced by the concurrency_groups YAML round-trip test is held
to errcheck. Wrap the remove in a closure that discards the error.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-05 08:42:50 +02:00

193 lines
6.2 KiB
Go

package application
import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/xlog"
)
// SyncPinnedModelsToWatchdog reads pinned status from all model configs and updates the watchdog
func (a *Application) SyncPinnedModelsToWatchdog() {
cl := a.ModelConfigLoader()
if cl == nil {
return
}
wd := a.modelLoader.GetWatchDog()
if wd == nil {
return
}
configs := cl.GetAllModelsConfigs()
var pinned []string
for _, cfg := range configs {
if cfg.IsPinned() {
pinned = append(pinned, cfg.Name)
}
}
wd.SetPinnedModels(pinned)
xlog.Debug("Synced pinned models to watchdog", "count", len(pinned))
}
// SyncModelGroupsToWatchdog reads concurrency_groups from all model configs and
// updates the watchdog so EnforceGroupExclusivity has the current view.
func (a *Application) SyncModelGroupsToWatchdog() {
cl := a.ModelConfigLoader()
if cl == nil {
return
}
wd := a.modelLoader.GetWatchDog()
if wd == nil {
return
}
groups := extractModelGroupsFromConfigs(cl.GetAllModelsConfigs())
wd.ReplaceModelGroups(groups)
xlog.Debug("Synced concurrency groups to watchdog", "count", len(groups))
}
// extractModelGroupsFromConfigs builds the model→groups map the watchdog
// expects. Disabled models are skipped — their declared groups should not
// block other models from loading.
func extractModelGroupsFromConfigs(configs []config.ModelConfig) map[string][]string {
out := make(map[string][]string)
for _, cfg := range configs {
if cfg.IsDisabled() {
continue
}
gs := cfg.GetConcurrencyGroups()
if len(gs) == 0 {
continue
}
out[cfg.Name] = gs
}
return out
}
func (a *Application) StopWatchdog() error {
if a.watchdogStop != nil {
close(a.watchdogStop)
a.watchdogStop = nil
}
return nil
}
// startWatchdog starts the watchdog with current ApplicationConfig settings
// This is an internal method that assumes the caller holds the watchdogMutex
func (a *Application) startWatchdog() error {
appConfig := a.ApplicationConfig()
// Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend)
lruLimit := appConfig.GetEffectiveMaxActiveBackends()
// Create watchdog if enabled OR if LRU limit is set OR if memory reclaimer is enabled
// LRU eviction requires watchdog infrastructure even without busy/idle checks
if appConfig.WatchDog || lruLimit > 0 || appConfig.MemoryReclaimerEnabled {
wd := model.NewWatchDog(
model.WithProcessManager(a.modelLoader),
model.WithBusyTimeout(appConfig.WatchDogBusyTimeout),
model.WithIdleTimeout(appConfig.WatchDogIdleTimeout),
model.WithWatchdogInterval(appConfig.WatchDogInterval),
model.WithBusyCheck(appConfig.WatchDogBusy),
model.WithIdleCheck(appConfig.WatchDogIdle),
model.WithLRULimit(lruLimit),
model.WithMemoryReclaimer(appConfig.MemoryReclaimerEnabled, appConfig.MemoryReclaimerThreshold),
model.WithForceEvictionWhenBusy(appConfig.ForceEvictionWhenBusy),
)
// Create new stop channel BEFORE setting up any goroutines
// This prevents race conditions where the old shutdown handler might
// receive the closed channel and try to shut down the new watchdog
a.watchdogStop = make(chan bool, 1)
// Set the watchdog on the model loader
a.modelLoader.SetWatchDog(wd)
// Sync pinned models and concurrency groups from config to the watchdog
a.SyncPinnedModelsToWatchdog()
a.SyncModelGroupsToWatchdog()
// Start watchdog goroutine if any periodic checks are enabled
// LRU eviction doesn't need the Run() loop - it's triggered on model load
// But memory reclaimer needs the Run() loop for periodic checking
if appConfig.WatchDogBusy || appConfig.WatchDogIdle || appConfig.MemoryReclaimerEnabled {
go wd.Run()
}
// Setup shutdown handler - this goroutine will wait on a.watchdogStop
// which is now a fresh channel, so it won't receive any stale signals
// Note: We capture wd in a local variable to ensure this handler operates
// on the correct watchdog instance (not a later one that gets assigned to wd)
wdForShutdown := wd
go func() {
select {
case <-a.watchdogStop:
xlog.Debug("Watchdog stop signal received")
wdForShutdown.Shutdown()
case <-appConfig.Context.Done():
xlog.Debug("Context canceled, shutting down watchdog")
wdForShutdown.Shutdown()
}
}()
xlog.Info("Watchdog started with new settings", "lruLimit", lruLimit, "busyCheck", appConfig.WatchDogBusy, "idleCheck", appConfig.WatchDogIdle, "memoryReclaimer", appConfig.MemoryReclaimerEnabled, "memoryThreshold", appConfig.MemoryReclaimerThreshold, "interval", appConfig.WatchDogInterval)
} else {
xlog.Info("Watchdog disabled")
}
return nil
}
// StartWatchdog starts the watchdog with current ApplicationConfig settings
func (a *Application) StartWatchdog() error {
a.watchdogMutex.Lock()
defer a.watchdogMutex.Unlock()
return a.startWatchdog()
}
// RestartWatchdog restarts the watchdog with current ApplicationConfig settings
func (a *Application) RestartWatchdog() error {
a.watchdogMutex.Lock()
defer a.watchdogMutex.Unlock()
// Get the old watchdog before we shut it down
oldWD := a.modelLoader.GetWatchDog()
// Get the state from the old watchdog before shutting it down
// This preserves information about loaded models
var oldState model.WatchDogState
if oldWD != nil {
oldState = oldWD.GetState()
}
// Signal all handlers to stop by closing the stop channel
// This will cause any goroutine waiting on <-a.watchdogStop to unblock
if a.watchdogStop != nil {
close(a.watchdogStop)
a.watchdogStop = nil
}
// Shutdown existing watchdog - this triggers the stop signal
if oldWD != nil {
oldWD.Shutdown()
// Wait for the old watchdog's Run() goroutine to fully shut down
oldWD.WaitDone()
}
// Start watchdog with new settings
if err := a.startWatchdog(); err != nil {
return err
}
// Restore the model state from the old watchdog to the new one
// This ensures the new watchdog knows about already-loaded models
newWD := a.modelLoader.GetWatchDog()
if newWD != nil && len(oldState.AddressModelMap) > 0 {
newWD.RestoreState(oldState)
}
// Re-sync pinned models and concurrency groups after restart
a.SyncPinnedModelsToWatchdog()
a.SyncModelGroupsToWatchdog()
return nil
}