mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-09 01:09:13 -04:00
feat(loader): enhance single active backend to support LRU eviction (#7535)
* feat(loader): refactor single active backend support to LRU This changeset introduces LRU management of loaded backends. Users can set now a maximum number of models to be loaded concurrently, and, when setting LocalAI in single active backend mode we set LRU to 1 for backward compatibility. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * chore: add tests Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Update docs Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
committed by
GitHub
parent
c141a40e00
commit
fc5b9ebfcc
@@ -29,7 +29,7 @@ type Application struct {
|
||||
func newApplication(appConfig *config.ApplicationConfig) *Application {
|
||||
return &Application{
|
||||
backendLoader: config.NewModelConfigLoader(appConfig.SystemState.Model.ModelsPath),
|
||||
modelLoader: model.NewModelLoader(appConfig.SystemState, appConfig.SingleBackend),
|
||||
modelLoader: model.NewModelLoader(appConfig.SystemState),
|
||||
applicationConfig: appConfig,
|
||||
templatesEvaluator: templates.NewEvaluator(appConfig.SystemState.Model.ModelsPath),
|
||||
}
|
||||
|
||||
@@ -191,7 +191,8 @@ type runtimeSettings struct {
|
||||
WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"`
|
||||
WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"`
|
||||
WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"`
|
||||
SingleBackend *bool `json:"single_backend,omitempty"`
|
||||
SingleBackend *bool `json:"single_backend,omitempty"` // Deprecated: use MaxActiveBackends = 1 instead
|
||||
MaxActiveBackends *int `json:"max_active_backends,omitempty"` // Maximum number of active backends (0 = unlimited, 1 = single backend mode)
|
||||
ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"`
|
||||
Threads *int `json:"threads,omitempty"`
|
||||
ContextSize *int `json:"context_size,omitempty"`
|
||||
@@ -224,6 +225,7 @@ func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHand
|
||||
envWatchdogIdleTimeout := appConfig.WatchDogIdleTimeout == startupAppConfig.WatchDogIdleTimeout
|
||||
envWatchdogBusyTimeout := appConfig.WatchDogBusyTimeout == startupAppConfig.WatchDogBusyTimeout
|
||||
envSingleBackend := appConfig.SingleBackend == startupAppConfig.SingleBackend
|
||||
envMaxActiveBackends := appConfig.MaxActiveBackends == startupAppConfig.MaxActiveBackends
|
||||
envParallelRequests := appConfig.ParallelBackendRequests == startupAppConfig.ParallelBackendRequests
|
||||
envThreads := appConfig.Threads == startupAppConfig.Threads
|
||||
envContextSize := appConfig.ContextSize == startupAppConfig.ContextSize
|
||||
@@ -275,8 +277,19 @@ func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHand
|
||||
log.Warn().Err(err).Str("timeout", *settings.WatchdogBusyTimeout).Msg("invalid watchdog busy timeout in runtime_settings.json")
|
||||
}
|
||||
}
|
||||
if settings.SingleBackend != nil && !envSingleBackend {
|
||||
// Handle MaxActiveBackends (new) and SingleBackend (deprecated)
|
||||
if settings.MaxActiveBackends != nil && !envMaxActiveBackends {
|
||||
appConfig.MaxActiveBackends = *settings.MaxActiveBackends
|
||||
// For backward compatibility, also set SingleBackend if MaxActiveBackends == 1
|
||||
appConfig.SingleBackend = (*settings.MaxActiveBackends == 1)
|
||||
} else if settings.SingleBackend != nil && !envSingleBackend {
|
||||
// Legacy: SingleBackend maps to MaxActiveBackends = 1
|
||||
appConfig.SingleBackend = *settings.SingleBackend
|
||||
if *settings.SingleBackend {
|
||||
appConfig.MaxActiveBackends = 1
|
||||
} else {
|
||||
appConfig.MaxActiveBackends = 0
|
||||
}
|
||||
}
|
||||
if settings.ParallelBackendRequests != nil && !envParallelRequests {
|
||||
appConfig.ParallelBackendRequests = *settings.ParallelBackendRequests
|
||||
|
||||
@@ -224,7 +224,8 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
|
||||
WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"`
|
||||
WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"`
|
||||
WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"`
|
||||
SingleBackend *bool `json:"single_backend,omitempty"`
|
||||
SingleBackend *bool `json:"single_backend,omitempty"` // Deprecated: use MaxActiveBackends = 1 instead
|
||||
MaxActiveBackends *int `json:"max_active_backends,omitempty"` // Maximum number of active backends (0 = unlimited)
|
||||
ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"`
|
||||
AgentJobRetentionDays *int `json:"agent_job_retention_days,omitempty"`
|
||||
}
|
||||
@@ -280,9 +281,21 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if settings.SingleBackend != nil {
|
||||
// Handle MaxActiveBackends (new) and SingleBackend (deprecated)
|
||||
if settings.MaxActiveBackends != nil {
|
||||
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
||||
if options.MaxActiveBackends == 0 {
|
||||
options.MaxActiveBackends = *settings.MaxActiveBackends
|
||||
// For backward compatibility, also set SingleBackend if MaxActiveBackends == 1
|
||||
options.SingleBackend = (*settings.MaxActiveBackends == 1)
|
||||
}
|
||||
} else if settings.SingleBackend != nil {
|
||||
// Legacy: SingleBackend maps to MaxActiveBackends = 1
|
||||
if !options.SingleBackend {
|
||||
options.SingleBackend = *settings.SingleBackend
|
||||
if *settings.SingleBackend {
|
||||
options.MaxActiveBackends = 1
|
||||
}
|
||||
}
|
||||
}
|
||||
if settings.ParallelBackendRequests != nil {
|
||||
@@ -307,15 +320,25 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
|
||||
|
||||
// initializeWatchdog initializes the watchdog with current ApplicationConfig settings
|
||||
func initializeWatchdog(application *Application, options *config.ApplicationConfig) {
|
||||
if options.WatchDog {
|
||||
// Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend)
|
||||
lruLimit := options.GetEffectiveMaxActiveBackends()
|
||||
|
||||
// Create watchdog if enabled OR if LRU limit is set
|
||||
if options.WatchDog || lruLimit > 0 {
|
||||
wd := model.NewWatchDog(
|
||||
application.ModelLoader(),
|
||||
options.WatchDogBusyTimeout,
|
||||
options.WatchDogIdleTimeout,
|
||||
options.WatchDogBusy,
|
||||
options.WatchDogIdle)
|
||||
options.WatchDogIdle,
|
||||
lruLimit)
|
||||
application.ModelLoader().SetWatchDog(wd)
|
||||
go wd.Run()
|
||||
|
||||
// Start watchdog goroutine only if busy/idle checks are enabled
|
||||
if options.WatchDogBusy || options.WatchDogIdle {
|
||||
go wd.Run()
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-options.Context.Done()
|
||||
log.Debug().Msgf("Context canceled, shutting down")
|
||||
|
||||
@@ -20,21 +20,29 @@ func (a *Application) StopWatchdog() error {
|
||||
func (a *Application) startWatchdog() error {
|
||||
appConfig := a.ApplicationConfig()
|
||||
|
||||
// Create new watchdog if enabled
|
||||
if appConfig.WatchDog {
|
||||
// Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend)
|
||||
lruLimit := appConfig.GetEffectiveMaxActiveBackends()
|
||||
|
||||
// Create watchdog if enabled OR if LRU limit is set
|
||||
// LRU eviction requires watchdog infrastructure even without busy/idle checks
|
||||
if appConfig.WatchDog || lruLimit > 0 {
|
||||
wd := model.NewWatchDog(
|
||||
a.modelLoader,
|
||||
appConfig.WatchDogBusyTimeout,
|
||||
appConfig.WatchDogIdleTimeout,
|
||||
appConfig.WatchDogBusy,
|
||||
appConfig.WatchDogIdle)
|
||||
appConfig.WatchDogIdle,
|
||||
lruLimit)
|
||||
a.modelLoader.SetWatchDog(wd)
|
||||
|
||||
// Create new stop channel
|
||||
a.watchdogStop = make(chan bool, 1)
|
||||
|
||||
// Start watchdog goroutine
|
||||
go wd.Run()
|
||||
// Start watchdog goroutine only if busy/idle checks are enabled
|
||||
// LRU eviction doesn't need the Run() loop - it's triggered on model load
|
||||
if appConfig.WatchDogBusy || appConfig.WatchDogIdle {
|
||||
go wd.Run()
|
||||
}
|
||||
|
||||
// Setup shutdown handler
|
||||
go func() {
|
||||
@@ -48,7 +56,7 @@ func (a *Application) startWatchdog() error {
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info().Msg("Watchdog started with new settings")
|
||||
log.Info().Int("lruLimit", lruLimit).Bool("busyCheck", appConfig.WatchDogBusy).Bool("idleCheck", appConfig.WatchDogIdle).Msg("Watchdog started with new settings")
|
||||
} else {
|
||||
log.Info().Msg("Watchdog disabled")
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ func Detection(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
if detectionModel == nil {
|
||||
return nil, fmt.Errorf("could not load detection model")
|
||||
|
||||
@@ -17,7 +17,6 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, modelConf
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
var fn func() ([]float32, error)
|
||||
switch model := inferenceModel.(type) {
|
||||
|
||||
@@ -16,7 +16,6 @@ func ImageGeneration(height, width, mode, step, seed int, positive_prompt, negat
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
fn := func() error {
|
||||
_, err := inferenceModel.GenerateImage(
|
||||
|
||||
@@ -60,7 +60,6 @@ func ModelInference(ctx context.Context, s string, messages schema.Messages, ima
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
var protoMessages []*proto.Message
|
||||
// if we are using the tokenizer template, we need to convert the messages to proto messages
|
||||
|
||||
@@ -15,7 +15,6 @@ func Rerank(request *proto.RerankRequest, loader *model.ModelLoader, appConfig *
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
if rerankModel == nil {
|
||||
return nil, fmt.Errorf("could not load rerank model")
|
||||
|
||||
@@ -29,7 +29,6 @@ func SoundGeneration(
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
if soundGenModel == nil {
|
||||
return "", nil, fmt.Errorf("could not load sound generation model")
|
||||
|
||||
@@ -20,7 +20,6 @@ func TokenMetrics(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
if model == nil {
|
||||
return nil, fmt.Errorf("could not loadmodel model")
|
||||
|
||||
@@ -17,7 +17,6 @@ func ModelTokenize(s string, loader *model.ModelLoader, modelConfig config.Model
|
||||
if err != nil {
|
||||
return schema.TokenizeResponse{}, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
predictOptions := gRPCPredictOpts(modelConfig, loader.ModelPath)
|
||||
predictOptions.Prompt = s
|
||||
|
||||
@@ -24,7 +24,6 @@ func ModelTranscription(audio, language string, translate bool, diarize bool, ml
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer ml.Close()
|
||||
|
||||
if transcriptionModel == nil {
|
||||
return nil, fmt.Errorf("could not load transcription model")
|
||||
|
||||
@@ -26,7 +26,6 @@ func ModelTTS(
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
if ttsModel == nil {
|
||||
return "", nil, fmt.Errorf("could not load tts model %q", modelConfig.Model)
|
||||
|
||||
@@ -19,7 +19,6 @@ func VAD(request *schema.VADRequest,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer ml.Close()
|
||||
|
||||
req := proto.VADRequest{
|
||||
Audio: request.Audio,
|
||||
|
||||
@@ -16,7 +16,6 @@ func VideoGeneration(height, width int32, prompt, negativePrompt, startImage, en
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer loader.Close()
|
||||
|
||||
fn := func() error {
|
||||
_, err := inferenceModel.GenerateVideo(
|
||||
|
||||
@@ -102,7 +102,7 @@ func (bi *BackendsInstall) Run(ctx *cliContext.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
modelLoader := model.NewModelLoader(systemState, true)
|
||||
modelLoader := model.NewModelLoader(systemState)
|
||||
err = startup.InstallExternalBackends(context.Background(), galleries, systemState, modelLoader, progressCallback, bi.BackendArgs, bi.Name, bi.Alias)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -80,7 +80,7 @@ func (mi *ModelsInstall) Run(ctx *cliContext.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
galleryService := services.NewGalleryService(&config.ApplicationConfig{}, model.NewModelLoader(systemState, true))
|
||||
galleryService := services.NewGalleryService(&config.ApplicationConfig{}, model.NewModelLoader(systemState))
|
||||
err = galleryService.Start(context.Background(), config.NewModelConfigLoader(mi.ModelsPath), systemState)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -134,7 +134,7 @@ func (mi *ModelsInstall) Run(ctx *cliContext.Context) error {
|
||||
log.Info().Str("model", modelName).Str("license", model.License).Msg("installing model")
|
||||
}
|
||||
|
||||
modelLoader := model.NewModelLoader(systemState, true)
|
||||
modelLoader := model.NewModelLoader(systemState)
|
||||
err = startup.InstallModels(context.Background(), galleryService, galleries, backendGalleries, systemState, modelLoader, !mi.DisablePredownloadScan, mi.AutoloadBackendGalleries, progressCallback, modelName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -64,7 +64,8 @@ type RunCMD struct {
|
||||
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
|
||||
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
|
||||
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
|
||||
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
|
||||
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time (deprecated: use --max-active-backends=1 instead)" group:"backends"`
|
||||
MaxActiveBackends int `env:"LOCALAI_MAX_ACTIVE_BACKENDS,MAX_ACTIVE_BACKENDS" default:"0" help:"Maximum number of backends to keep loaded at once (0 = unlimited, 1 = single backend mode). Least recently used backends are evicted when limit is reached" group:"backends"`
|
||||
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
|
||||
ExternalGRPCBackends []string `env:"LOCALAI_EXTERNAL_GRPC_BACKENDS,EXTERNAL_GRPC_BACKENDS" help:"A list of external grpc backends" group:"backends"`
|
||||
EnableWatchdogIdle bool `env:"LOCALAI_WATCHDOG_IDLE,WATCHDOG_IDLE" default:"false" help:"Enable watchdog for stopping backends that are idle longer than the watchdog-idle-timeout" group:"backends"`
|
||||
@@ -202,7 +203,13 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
||||
if r.ParallelRequests {
|
||||
opts = append(opts, config.EnableParallelBackendRequests)
|
||||
}
|
||||
if r.SingleActiveBackend {
|
||||
|
||||
// Handle max active backends (LRU eviction)
|
||||
// MaxActiveBackends takes precedence over SingleActiveBackend
|
||||
if r.MaxActiveBackends > 0 {
|
||||
opts = append(opts, config.SetMaxActiveBackends(r.MaxActiveBackends))
|
||||
} else if r.SingleActiveBackend {
|
||||
// Backward compatibility: --single-active-backend is equivalent to --max-active-backends=1
|
||||
opts = append(opts, config.EnableSingleBackend)
|
||||
}
|
||||
|
||||
|
||||
@@ -79,7 +79,7 @@ func (t *SoundGenerationCMD) Run(ctx *cliContext.Context) error {
|
||||
GeneratedContentDir: outputDir,
|
||||
ExternalGRPCBackends: externalBackends,
|
||||
}
|
||||
ml := model.NewModelLoader(systemState, opts.SingleBackend)
|
||||
ml := model.NewModelLoader(systemState)
|
||||
|
||||
defer func() {
|
||||
err := ml.StopAllGRPC()
|
||||
|
||||
@@ -38,7 +38,7 @@ func (t *TranscriptCMD) Run(ctx *cliContext.Context) error {
|
||||
}
|
||||
|
||||
cl := config.NewModelConfigLoader(t.ModelsPath)
|
||||
ml := model.NewModelLoader(systemState, opts.SingleBackend)
|
||||
ml := model.NewModelLoader(systemState)
|
||||
if err := cl.LoadModelConfigsFromPath(t.ModelsPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ func (t *TTSCMD) Run(ctx *cliContext.Context) error {
|
||||
GeneratedContentDir: outputDir,
|
||||
}
|
||||
|
||||
ml := model.NewModelLoader(systemState, opts.SingleBackend)
|
||||
ml := model.NewModelLoader(systemState)
|
||||
|
||||
defer func() {
|
||||
err := ml.StopAllGRPC()
|
||||
|
||||
@@ -37,7 +37,7 @@ func findLLamaCPPBackend(galleries string, systemState *system.SystemState) (str
|
||||
|
||||
backend, ok := backends.Get(llamaCPPGalleryName)
|
||||
if !ok {
|
||||
ml := model.NewModelLoader(systemState, true)
|
||||
ml := model.NewModelLoader(systemState)
|
||||
var gals []config.Gallery
|
||||
if err := json.Unmarshal([]byte(galleries), &gals); err != nil {
|
||||
log.Error().Err(err).Msg("failed loading galleries")
|
||||
|
||||
@@ -52,7 +52,8 @@ type ApplicationConfig struct {
|
||||
|
||||
AutoloadGalleries, AutoloadBackendGalleries bool
|
||||
|
||||
SingleBackend bool
|
||||
SingleBackend bool // Deprecated: use MaxActiveBackends = 1 instead
|
||||
MaxActiveBackends int // Maximum number of active backends (0 = unlimited, 1 = single backend mode)
|
||||
ParallelBackendRequests bool
|
||||
|
||||
WatchDogIdle bool
|
||||
@@ -186,8 +187,38 @@ func SetWatchDogIdleTimeout(t time.Duration) AppOption {
|
||||
}
|
||||
}
|
||||
|
||||
// EnableSingleBackend is deprecated: use SetMaxActiveBackends(1) instead.
|
||||
// This is kept for backward compatibility.
|
||||
var EnableSingleBackend = func(o *ApplicationConfig) {
|
||||
o.SingleBackend = true
|
||||
o.MaxActiveBackends = 1
|
||||
}
|
||||
|
||||
// SetMaxActiveBackends sets the maximum number of active backends.
|
||||
// 0 = unlimited, 1 = single backend mode (replaces EnableSingleBackend)
|
||||
func SetMaxActiveBackends(n int) AppOption {
|
||||
return func(o *ApplicationConfig) {
|
||||
o.MaxActiveBackends = n
|
||||
// For backward compatibility, also set SingleBackend if n == 1
|
||||
if n == 1 {
|
||||
o.SingleBackend = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetEffectiveMaxActiveBackends returns the effective max active backends limit.
|
||||
// It considers both MaxActiveBackends and the deprecated SingleBackend setting.
|
||||
// If MaxActiveBackends is set (> 0), it takes precedence.
|
||||
// If SingleBackend is true and MaxActiveBackends is 0, returns 1.
|
||||
// Otherwise returns 0 (unlimited).
|
||||
func (o *ApplicationConfig) GetEffectiveMaxActiveBackends() int {
|
||||
if o.MaxActiveBackends > 0 {
|
||||
return o.MaxActiveBackends
|
||||
}
|
||||
if o.SingleBackend {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
var EnableParallelBackendRequests = func(o *ApplicationConfig) {
|
||||
|
||||
@@ -108,7 +108,7 @@ var _ = Describe("Gallery Backends", func() {
|
||||
}
|
||||
systemState, err = system.GetSystemState(system.WithBackendPath(tempDir))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
ml = model.NewModelLoader(systemState, true)
|
||||
ml = model.NewModelLoader(systemState)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
|
||||
@@ -27,7 +27,8 @@ type RuntimeSettings struct {
|
||||
WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"`
|
||||
WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"`
|
||||
WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"`
|
||||
SingleBackend *bool `json:"single_backend,omitempty"`
|
||||
SingleBackend *bool `json:"single_backend,omitempty"` // Deprecated: use MaxActiveBackends = 1 instead
|
||||
MaxActiveBackends *int `json:"max_active_backends,omitempty"` // Maximum number of active backends (0 = unlimited, 1 = single backend mode)
|
||||
ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"`
|
||||
Threads *int `json:"threads,omitempty"`
|
||||
ContextSize *int `json:"context_size,omitempty"`
|
||||
@@ -65,6 +66,7 @@ func GetSettingsEndpoint(app *application.Application) echo.HandlerFunc {
|
||||
watchdogBusy := appConfig.WatchDogBusy
|
||||
watchdogEnabled := appConfig.WatchDog
|
||||
singleBackend := appConfig.SingleBackend
|
||||
maxActiveBackends := appConfig.MaxActiveBackends
|
||||
parallelBackendRequests := appConfig.ParallelBackendRequests
|
||||
threads := appConfig.Threads
|
||||
contextSize := appConfig.ContextSize
|
||||
@@ -87,6 +89,7 @@ func GetSettingsEndpoint(app *application.Application) echo.HandlerFunc {
|
||||
settings.WatchdogBusyEnabled = &watchdogBusy
|
||||
settings.WatchdogEnabled = &watchdogEnabled
|
||||
settings.SingleBackend = &singleBackend
|
||||
settings.MaxActiveBackends = &maxActiveBackends
|
||||
settings.ParallelBackendRequests = ¶llelBackendRequests
|
||||
settings.Threads = &threads
|
||||
settings.ContextSize = &contextSize
|
||||
@@ -223,8 +226,20 @@ func UpdateSettingsEndpoint(app *application.Application) echo.HandlerFunc {
|
||||
appConfig.WatchDogBusyTimeout = dur
|
||||
watchdogChanged = true
|
||||
}
|
||||
if settings.SingleBackend != nil {
|
||||
if settings.MaxActiveBackends != nil {
|
||||
appConfig.MaxActiveBackends = *settings.MaxActiveBackends
|
||||
// For backward compatibility, update SingleBackend too
|
||||
appConfig.SingleBackend = (*settings.MaxActiveBackends == 1)
|
||||
watchdogChanged = true // LRU limit is managed by watchdog
|
||||
} else if settings.SingleBackend != nil {
|
||||
// Legacy support: SingleBackend maps to MaxActiveBackends = 1
|
||||
appConfig.SingleBackend = *settings.SingleBackend
|
||||
if *settings.SingleBackend {
|
||||
appConfig.MaxActiveBackends = 1
|
||||
} else {
|
||||
appConfig.MaxActiveBackends = 0
|
||||
}
|
||||
watchdogChanged = true // LRU limit is managed by watchdog
|
||||
}
|
||||
if settings.ParallelBackendRequests != nil {
|
||||
appConfig.ParallelBackendRequests = *settings.ParallelBackendRequests
|
||||
|
||||
@@ -21,7 +21,6 @@ func StoresSetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sl.Close()
|
||||
|
||||
vals := make([][]byte, len(input.Values))
|
||||
for i, v := range input.Values {
|
||||
@@ -49,7 +48,6 @@ func StoresDeleteEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationCo
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sl.Close()
|
||||
|
||||
if err := store.DeleteCols(c.Request().Context(), sb, input.Keys); err != nil {
|
||||
return err
|
||||
@@ -71,7 +69,6 @@ func StoresGetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sl.Close()
|
||||
|
||||
keys, vals, err := store.GetCols(c.Request().Context(), sb, input.Keys)
|
||||
if err != nil {
|
||||
@@ -103,7 +100,6 @@ func StoresFindEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConf
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sl.Close()
|
||||
|
||||
keys, vals, similarities, err := store.Find(c.Request().Context(), sb, input.Key, input.Topk)
|
||||
if err != nil {
|
||||
|
||||
@@ -138,17 +138,15 @@
|
||||
</p>
|
||||
|
||||
<div class="space-y-4">
|
||||
<!-- Single Backend Mode -->
|
||||
<div class="flex items-center justify-between">
|
||||
<div>
|
||||
<label class="text-sm font-medium text-[var(--color-text-primary)]">Single Backend Mode</label>
|
||||
<p class="text-xs text-[var(--color-text-secondary)] mt-1">Allow only one backend to be active at a time</p>
|
||||
</div>
|
||||
<label class="relative inline-flex items-center cursor-pointer">
|
||||
<input type="checkbox" x-model="settings.single_backend"
|
||||
class="sr-only peer">
|
||||
<div class="w-11 h-6 bg-[var(--color-bg-primary)] peer-focus:outline-none peer-focus:ring-4 peer-focus:ring-[var(--color-accent-light)] rounded-full peer peer-checked:after:translate-x-full peer-checked:after:border-white after:content-[''] after:absolute after:top-[2px] after:left-[2px] after:bg-white after:border-gray-300 after:border after:rounded-full after:h-5 after:w-5 after:transition-all peer-checked:bg-[var(--color-accent)]"></div>
|
||||
</label>
|
||||
<!-- Max Active Backends -->
|
||||
<div>
|
||||
<label class="block text-sm font-medium text-[var(--color-text-primary)] mb-2">Max Active Backends</label>
|
||||
<p class="text-xs text-[var(--color-text-secondary)] mb-2">Maximum number of models to keep loaded at once (0 = unlimited, 1 = single backend mode). Least recently used models are evicted when limit is reached.</p>
|
||||
<input type="number" x-model="settings.max_active_backends"
|
||||
min="0"
|
||||
placeholder="0"
|
||||
@change="updateMaxActiveBackends()"
|
||||
class="w-full px-3 py-2 bg-[var(--color-bg-primary)] border border-[var(--color-accent-light)] rounded text-sm text-[var(--color-text-primary)] focus:outline-none focus:ring-2 focus:ring-[var(--color-accent-light)]">
|
||||
</div>
|
||||
|
||||
<!-- Parallel Backend Requests -->
|
||||
@@ -462,7 +460,7 @@ function settingsDashboard() {
|
||||
watchdog_busy_enabled: false,
|
||||
watchdog_idle_timeout: '15m',
|
||||
watchdog_busy_timeout: '5m',
|
||||
single_backend: false,
|
||||
max_active_backends: 0,
|
||||
parallel_backend_requests: false,
|
||||
threads: 0,
|
||||
context_size: 0,
|
||||
@@ -500,7 +498,7 @@ function settingsDashboard() {
|
||||
watchdog_busy_enabled: data.watchdog_busy_enabled,
|
||||
watchdog_idle_timeout: data.watchdog_idle_timeout || '15m',
|
||||
watchdog_busy_timeout: data.watchdog_busy_timeout || '5m',
|
||||
single_backend: data.single_backend,
|
||||
max_active_backends: data.max_active_backends || 0,
|
||||
parallel_backend_requests: data.parallel_backend_requests,
|
||||
threads: data.threads || 0,
|
||||
context_size: data.context_size || 0,
|
||||
@@ -536,6 +534,12 @@ function settingsDashboard() {
|
||||
}
|
||||
},
|
||||
|
||||
updateMaxActiveBackends() {
|
||||
// Ensure max_active_backends is a non-negative integer
|
||||
const value = parseInt(this.settings.max_active_backends) || 0;
|
||||
this.settings.max_active_backends = Math.max(0, value);
|
||||
},
|
||||
|
||||
async saveSettings() {
|
||||
if (this.saving) return;
|
||||
|
||||
@@ -560,8 +564,8 @@ function settingsDashboard() {
|
||||
if (this.settings.watchdog_busy_timeout) {
|
||||
payload.watchdog_busy_timeout = this.settings.watchdog_busy_timeout;
|
||||
}
|
||||
if (this.settings.single_backend !== undefined) {
|
||||
payload.single_backend = this.settings.single_backend;
|
||||
if (this.settings.max_active_backends !== undefined) {
|
||||
payload.max_active_backends = parseInt(this.settings.max_active_backends) || 0;
|
||||
}
|
||||
if (this.settings.parallel_backend_requests !== undefined) {
|
||||
payload.parallel_backend_requests = this.settings.parallel_backend_requests;
|
||||
|
||||
@@ -42,7 +42,7 @@ var _ = Describe("AgentJobService", func() {
|
||||
appConfig.APIAddress = "127.0.0.1:8080"
|
||||
appConfig.AgentJobRetentionDays = 30
|
||||
|
||||
modelLoader = model.NewModelLoader(systemState, false)
|
||||
modelLoader = model.NewModelLoader(systemState)
|
||||
configLoader = config.NewModelConfigLoader(tempDir)
|
||||
evaluator = templates.NewEvaluator(tempDir)
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ var _ = Describe("Preload test", func() {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
systemState, err = system.GetSystemState(system.WithModelPath(tmpdir))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
ml = model.NewModelLoader(systemState, true)
|
||||
ml = model.NewModelLoader(systemState)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
|
||||
@@ -5,7 +5,10 @@ weight = 22
|
||||
url = '/advanced/vram-management'
|
||||
+++
|
||||
|
||||
When running multiple models in LocalAI, especially on systems with limited GPU memory (VRAM), you may encounter situations where loading a new model fails because there isn't enough available VRAM. LocalAI provides two mechanisms to automatically manage model memory allocation and prevent VRAM exhaustion.
|
||||
When running multiple models in LocalAI, especially on systems with limited GPU memory (VRAM), you may encounter situations where loading a new model fails because there isn't enough available VRAM. LocalAI provides several mechanisms to automatically manage model memory allocation and prevent VRAM exhaustion:
|
||||
|
||||
1. **Max Active Backends (LRU Eviction)**: Limit the number of loaded models, evicting the least recently used when the limit is reached
|
||||
2. **Watchdog Mechanisms**: Automatically unload idle or stuck models based on configurable timeouts
|
||||
|
||||
## The Problem
|
||||
|
||||
@@ -16,34 +19,80 @@ By default, LocalAI keeps models loaded in memory once they're first used. This
|
||||
|
||||
This is a common issue when working with GPU-accelerated models, as VRAM is typically more limited than system RAM. For more context, see issues [#6068](https://github.com/mudler/LocalAI/issues/6068), [#7269](https://github.com/mudler/LocalAI/issues/7269), and [#5352](https://github.com/mudler/LocalAI/issues/5352).
|
||||
|
||||
## Solution 1: Single Active Backend
|
||||
## Solution 1: Max Active Backends (LRU Eviction)
|
||||
|
||||
The simplest approach is to ensure only one model is loaded at a time. When a new model is requested, LocalAI will automatically unload the currently active model before loading the new one.
|
||||
LocalAI supports limiting the maximum number of active backends (loaded models) using LRU (Least Recently Used) eviction. When the limit is reached and a new model needs to be loaded, the least recently used model is automatically unloaded to make room.
|
||||
|
||||
### Configuration
|
||||
|
||||
```bash
|
||||
./local-ai --single-active-backend
|
||||
Set the maximum number of active backends using CLI flags or environment variables:
|
||||
|
||||
LOCALAI_SINGLE_ACTIVE_BACKEND=true ./local-ai
|
||||
```bash
|
||||
# Allow up to 3 models loaded simultaneously
|
||||
./local-ai --max-active-backends=3
|
||||
|
||||
# Using environment variables
|
||||
LOCALAI_MAX_ACTIVE_BACKENDS=3 ./local-ai
|
||||
MAX_ACTIVE_BACKENDS=3 ./local-ai
|
||||
```
|
||||
|
||||
Setting the limit to `1` is equivalent to single active backend mode (see below). Setting to `0` disables the limit (unlimited backends).
|
||||
|
||||
### Use cases
|
||||
|
||||
- Single GPU systems with limited VRAM
|
||||
- When you only need one model active at a time
|
||||
- Simple deployments where model switching is acceptable
|
||||
- Systems with limited VRAM that can handle a few models simultaneously
|
||||
- Multi-model deployments where you want to keep frequently-used models loaded
|
||||
- Balancing between memory usage and model reload times
|
||||
- Production environments requiring predictable memory consumption
|
||||
|
||||
### How it works
|
||||
|
||||
1. When a model is requested, its "last used" timestamp is updated
|
||||
2. When a new model needs to be loaded and the limit is reached, LocalAI identifies the least recently used model(s)
|
||||
3. The LRU model(s) are automatically unloaded to make room for the new model
|
||||
4. Concurrent requests for loading different models are handled safely - the system accounts for models currently being loaded when calculating evictions
|
||||
|
||||
### Example
|
||||
|
||||
```bash
|
||||
LOCALAI_SINGLE_ACTIVE_BACKEND=true ./local-ai
|
||||
# Allow 2 active backends
|
||||
LOCALAI_MAX_ACTIVE_BACKENDS=2 ./local-ai
|
||||
|
||||
# First request - model-a is loaded (1 active)
|
||||
curl http://localhost:8080/v1/chat/completions -d '{"model": "model-a", ...}'
|
||||
|
||||
# Second request - model-b is loaded (2 active, at limit)
|
||||
curl http://localhost:8080/v1/chat/completions -d '{"model": "model-b", ...}'
|
||||
|
||||
# Third request - model-a is evicted (LRU), model-c is loaded
|
||||
curl http://localhost:8080/v1/chat/completions -d '{"model": "model-c", ...}'
|
||||
|
||||
# Request for model-b updates its "last used" time
|
||||
curl http://localhost:8080/v1/chat/completions -d '{"model": "model-b", ...}'
|
||||
```
|
||||
|
||||
### Single Active Backend Mode
|
||||
|
||||
The simplest approach is to ensure only one model is loaded at a time. This is now implemented as `--max-active-backends=1`. When a new model is requested, LocalAI will automatically unload the currently active model before loading the new one.
|
||||
|
||||
```bash
|
||||
# These are equivalent:
|
||||
./local-ai --max-active-backends=1
|
||||
./local-ai --single-active-backend
|
||||
|
||||
# Using environment variables
|
||||
LOCALAI_MAX_ACTIVE_BACKENDS=1 ./local-ai
|
||||
LOCALAI_SINGLE_ACTIVE_BACKEND=true ./local-ai
|
||||
```
|
||||
|
||||
> **Note:** The `--single-active-backend` flag is deprecated but still supported for backward compatibility. It is recommended to use `--max-active-backends=1` instead.
|
||||
|
||||
#### Single backend use cases
|
||||
|
||||
- Single GPU systems with very limited VRAM
|
||||
- When you only need one model active at a time
|
||||
- Simple deployments where model switching is acceptable
|
||||
|
||||
## Solution 2: Watchdog Mechanisms
|
||||
|
||||
For more flexible memory management, LocalAI provides watchdog mechanisms that automatically unload models based on their activity state. This allows multiple models to be loaded simultaneously, but automatically frees memory when models become inactive or stuck.
|
||||
@@ -133,6 +182,31 @@ Timeouts can be specified using Go's duration format:
|
||||
- `30s` - 30 seconds
|
||||
- `2h30m` - 2 hours and 30 minutes
|
||||
|
||||
## Combining LRU and Watchdog
|
||||
|
||||
You can combine Max Active Backends (LRU eviction) with the watchdog mechanisms for comprehensive memory management:
|
||||
|
||||
```bash
|
||||
# Allow up to 3 active backends with idle watchdog
|
||||
LOCALAI_MAX_ACTIVE_BACKENDS=3 \
|
||||
LOCALAI_WATCHDOG_IDLE=true \
|
||||
LOCALAI_WATCHDOG_IDLE_TIMEOUT=15m \
|
||||
./local-ai
|
||||
```
|
||||
|
||||
Or using command line flags:
|
||||
|
||||
```bash
|
||||
./local-ai \
|
||||
--max-active-backends=3 \
|
||||
--enable-watchdog-idle --watchdog-idle-timeout=15m
|
||||
```
|
||||
|
||||
This configuration:
|
||||
- Ensures no more than 3 models are loaded at once (LRU eviction kicks in when exceeded)
|
||||
- Automatically unloads any model that hasn't been used for 15 minutes
|
||||
- Provides both hard limits and time-based cleanup
|
||||
|
||||
## Limitations and Considerations
|
||||
|
||||
### VRAM Usage Estimation
|
||||
@@ -157,10 +231,11 @@ To stop all models, you'll need to call the endpoint for each loaded model indiv
|
||||
### Best Practices
|
||||
|
||||
1. **Monitor VRAM usage**: Use `nvidia-smi` (for NVIDIA GPUs) or similar tools to monitor actual VRAM usage
|
||||
2. **Start with single active backend**: For single-GPU systems, `--single-active-backend` is often the simplest solution
|
||||
3. **Tune watchdog timeouts**: Adjust timeouts based on your usage patterns - shorter timeouts free memory faster but may cause more frequent reloads
|
||||
4. **Consider model size**: Ensure your VRAM can accommodate at least one of your largest models
|
||||
5. **Use quantization**: Smaller quantized models use less VRAM and allow more flexibility
|
||||
2. **Set an appropriate backend limit**: For single-GPU systems, `--max-active-backends=1` is often the simplest solution. For systems with more VRAM, you can increase the limit to keep more models loaded
|
||||
3. **Combine LRU with watchdog**: Use `--max-active-backends` to limit the number of loaded models, and enable idle watchdog to unload models that haven't been used recently
|
||||
4. **Tune watchdog timeouts**: Adjust timeouts based on your usage patterns - shorter timeouts free memory faster but may cause more frequent reloads
|
||||
5. **Consider model size**: Ensure your VRAM can accommodate at least one of your largest models
|
||||
6. **Use quantization**: Smaller quantized models use less VRAM and allow more flexibility
|
||||
|
||||
## Related Documentation
|
||||
|
||||
|
||||
@@ -27,9 +27,11 @@ Changes to watchdog settings are applied immediately by restarting the watchdog
|
||||
|
||||
### Backend Configuration
|
||||
|
||||
- **Single Backend**: Allow only one backend to run at a time
|
||||
- **Max Active Backends**: Maximum number of active backends (loaded models). When exceeded, the least recently used model is automatically evicted. Set to `0` for unlimited, `1` for single-backend mode
|
||||
- **Parallel Backend Requests**: Enable backends to handle multiple requests in parallel if supported
|
||||
|
||||
> **Note:** The "Single Backend" setting is deprecated. Use "Max Active Backends" set to `1` for single-backend behavior.
|
||||
|
||||
### Performance Settings
|
||||
|
||||
- **Threads**: Number of threads used for parallel computation (recommended: number of physical cores)
|
||||
@@ -90,7 +92,7 @@ The `runtime_settings.json` file follows this structure:
|
||||
"watchdog_busy_enabled": false,
|
||||
"watchdog_idle_timeout": "15m",
|
||||
"watchdog_busy_timeout": "5m",
|
||||
"single_backend": false,
|
||||
"max_active_backends": 0,
|
||||
"parallel_backend_requests": true,
|
||||
"threads": 8,
|
||||
"context_size": 2048,
|
||||
|
||||
@@ -39,7 +39,8 @@ Complete reference for all LocalAI command-line interface (CLI) parameters and e
|
||||
| `--backend-galleries` | | JSON list of backend galleries | `$LOCALAI_BACKEND_GALLERIES`, `$BACKEND_GALLERIES` |
|
||||
| `--autoload-backend-galleries` | `true` | Automatically load backend galleries on startup | `$LOCALAI_AUTOLOAD_BACKEND_GALLERIES`, `$AUTOLOAD_BACKEND_GALLERIES` |
|
||||
| `--parallel-requests` | `false` | Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm) | `$LOCALAI_PARALLEL_REQUESTS`, `$PARALLEL_REQUESTS` |
|
||||
| `--single-active-backend` | `false` | Allow only one backend to be run at a time | `$LOCALAI_SINGLE_ACTIVE_BACKEND`, `$SINGLE_ACTIVE_BACKEND` |
|
||||
| `--max-active-backends` | `0` | Maximum number of active backends (loaded models). When exceeded, the least recently used model is evicted. Set to `0` for unlimited, `1` for single-backend mode | `$LOCALAI_MAX_ACTIVE_BACKENDS`, `$MAX_ACTIVE_BACKENDS` |
|
||||
| `--single-active-backend` | `false` | **DEPRECATED** - Use `--max-active-backends=1` instead. Allow only one backend to be run at a time | `$LOCALAI_SINGLE_ACTIVE_BACKEND`, `$SINGLE_ACTIVE_BACKEND` |
|
||||
| `--preload-backend-only` | `false` | Do not launch the API services, only the preloaded models/backends are started (useful for multi-node setups) | `$LOCALAI_PRELOAD_BACKEND_ONLY`, `$PRELOAD_BACKEND_ONLY` |
|
||||
| `--enable-watchdog-idle` | `false` | Enable watchdog for stopping backends that are idle longer than the watchdog-idle-timeout | `$LOCALAI_WATCHDOG_IDLE`, `$WATCHDOG_IDLE` |
|
||||
| `--watchdog-idle-timeout` | `15m` | Threshold beyond which an idle backend should be stopped | `$LOCALAI_WATCHDOG_IDLE_TIMEOUT`, `$WATCHDOG_IDLE_TIMEOUT` |
|
||||
|
||||
@@ -184,54 +184,45 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e
|
||||
return model.GRPC(o.parallelRequests, ml.wd), nil
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) stopActiveBackends(modelID string) {
|
||||
if !ml.singletonMode {
|
||||
// enforceLRULimit enforces the LRU limit before loading a new model.
|
||||
// This is called before loading a model to ensure we don't exceed the limit.
|
||||
// It accounts for models that are currently being loaded by other goroutines.
|
||||
func (ml *ModelLoader) enforceLRULimit() {
|
||||
if ml.wd == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If we can have only one backend active, kill all the others (except external backends)
|
||||
|
||||
// Stop all backends except the one we are going to load
|
||||
log.Debug().Msgf("Stopping all backends except '%s'", modelID)
|
||||
err := ml.StopGRPC(allExcept(modelID))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("keptModel", modelID).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing")
|
||||
}
|
||||
// Get the count of models currently being loaded to account for concurrent requests
|
||||
pendingLoads := ml.GetLoadingCount()
|
||||
ml.wd.EnforceLRULimit(pendingLoads)
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) Close() {
|
||||
if !ml.singletonMode {
|
||||
// updateModelLastUsed updates the last used time for a model (for LRU tracking)
|
||||
func (ml *ModelLoader) updateModelLastUsed(m *Model) {
|
||||
if ml.wd == nil || m == nil {
|
||||
return
|
||||
}
|
||||
ml.singletonLock.Unlock()
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) lockBackend() {
|
||||
if !ml.singletonMode {
|
||||
return
|
||||
}
|
||||
ml.singletonLock.Lock()
|
||||
ml.wd.UpdateLastUsed(m.address)
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
|
||||
ml.lockBackend() // grab the singleton lock if needed
|
||||
|
||||
o := NewOptions(opts...)
|
||||
|
||||
// Return earlier if we have a model already loaded
|
||||
// (avoid looping through all the backends)
|
||||
if m := ml.CheckIsLoaded(o.modelID); m != nil {
|
||||
log.Debug().Msgf("Model '%s' already loaded", o.modelID)
|
||||
// Update last used time for LRU tracking
|
||||
ml.updateModelLastUsed(m)
|
||||
return m.GRPC(o.parallelRequests, ml.wd), nil
|
||||
}
|
||||
|
||||
ml.stopActiveBackends(o.modelID)
|
||||
// Enforce LRU limit before loading a new model
|
||||
ml.enforceLRULimit()
|
||||
|
||||
// if a backend is defined, return the loader directly
|
||||
if o.backendString != "" {
|
||||
client, err := ml.backendLoader(opts...)
|
||||
if err != nil {
|
||||
ml.Close()
|
||||
return nil, err
|
||||
}
|
||||
return client, nil
|
||||
@@ -250,7 +241,6 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
|
||||
|
||||
if len(autoLoadBackends) == 0 {
|
||||
log.Error().Msg("No backends found")
|
||||
ml.Close()
|
||||
return nil, fmt.Errorf("no backends found")
|
||||
}
|
||||
|
||||
@@ -277,7 +267,5 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
|
||||
}
|
||||
}
|
||||
|
||||
ml.Close() // make sure to release the lock in case of failure
|
||||
|
||||
return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error())
|
||||
}
|
||||
|
||||
@@ -22,24 +22,32 @@ import (
|
||||
type ModelLoader struct {
|
||||
ModelPath string
|
||||
mu sync.Mutex
|
||||
singletonLock sync.Mutex
|
||||
singletonMode bool
|
||||
models map[string]*Model
|
||||
loading map[string]chan struct{} // tracks models currently being loaded
|
||||
wd *WatchDog
|
||||
externalBackends map[string]string
|
||||
}
|
||||
|
||||
func NewModelLoader(system *system.SystemState, singleActiveBackend bool) *ModelLoader {
|
||||
// NewModelLoader creates a new ModelLoader instance.
|
||||
// LRU eviction is now managed through the WatchDog component.
|
||||
func NewModelLoader(system *system.SystemState) *ModelLoader {
|
||||
nml := &ModelLoader{
|
||||
ModelPath: system.Model.ModelsPath,
|
||||
models: make(map[string]*Model),
|
||||
singletonMode: singleActiveBackend,
|
||||
loading: make(map[string]chan struct{}),
|
||||
externalBackends: make(map[string]string),
|
||||
}
|
||||
|
||||
return nml
|
||||
}
|
||||
|
||||
// GetLoadingCount returns the number of models currently being loaded
|
||||
func (ml *ModelLoader) GetLoadingCount() int {
|
||||
ml.mu.Lock()
|
||||
defer ml.mu.Unlock()
|
||||
return len(ml.loading)
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) SetWatchDog(wd *WatchDog) {
|
||||
ml.wd = wd
|
||||
}
|
||||
@@ -154,14 +162,44 @@ func (ml *ModelLoader) ListLoadedModels() []*Model {
|
||||
|
||||
func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, string, string) (*Model, error)) (*Model, error) {
|
||||
ml.mu.Lock()
|
||||
defer ml.mu.Unlock()
|
||||
|
||||
// Check if we already have a loaded model
|
||||
if model := ml.checkIsLoaded(modelID); model != nil {
|
||||
ml.mu.Unlock()
|
||||
return model, nil
|
||||
}
|
||||
|
||||
// Load the model and keep it in memory for later use
|
||||
// Check if another goroutine is already loading this model
|
||||
if loadingChan, isLoading := ml.loading[modelID]; isLoading {
|
||||
ml.mu.Unlock()
|
||||
// Wait for the other goroutine to finish loading
|
||||
log.Debug().Str("modelID", modelID).Msg("Waiting for model to be loaded by another request")
|
||||
<-loadingChan
|
||||
// Now check if the model is loaded
|
||||
ml.mu.Lock()
|
||||
model := ml.checkIsLoaded(modelID)
|
||||
ml.mu.Unlock()
|
||||
if model != nil {
|
||||
return model, nil
|
||||
}
|
||||
// If still not loaded, the other goroutine failed - we'll try again
|
||||
return ml.LoadModel(modelID, modelName, loader)
|
||||
}
|
||||
|
||||
// Mark this model as loading (create a channel that will be closed when done)
|
||||
loadingChan := make(chan struct{})
|
||||
ml.loading[modelID] = loadingChan
|
||||
ml.mu.Unlock()
|
||||
|
||||
// Ensure we clean up the loading state when done
|
||||
defer func() {
|
||||
ml.mu.Lock()
|
||||
delete(ml.loading, modelID)
|
||||
close(loadingChan)
|
||||
ml.mu.Unlock()
|
||||
}()
|
||||
|
||||
// Load the model (this can take a long time, no lock held)
|
||||
modelFile := filepath.Join(ml.ModelPath, modelName)
|
||||
log.Debug().Msgf("Loading model in memory from file: %s", modelFile)
|
||||
|
||||
@@ -174,7 +212,10 @@ func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string,
|
||||
return nil, fmt.Errorf("loader didn't return a model")
|
||||
}
|
||||
|
||||
// Add to models map
|
||||
ml.mu.Lock()
|
||||
ml.models[modelID] = model
|
||||
ml.mu.Unlock()
|
||||
|
||||
return model, nil
|
||||
}
|
||||
|
||||
@@ -4,6 +4,9 @@ import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/LocalAI/pkg/system"
|
||||
@@ -27,7 +30,7 @@ var _ = Describe("ModelLoader", func() {
|
||||
system.WithModelPath(modelPath),
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
modelLoader = model.NewModelLoader(systemState, false)
|
||||
modelLoader = model.NewModelLoader(systemState)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
@@ -106,4 +109,157 @@ var _ = Describe("ModelLoader", func() {
|
||||
Expect(modelLoader.CheckIsLoaded("foo")).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("Concurrent Loading", func() {
|
||||
It("should handle concurrent requests for the same model", func() {
|
||||
var loadCount int32
|
||||
mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) {
|
||||
atomic.AddInt32(&loadCount, 1)
|
||||
time.Sleep(100 * time.Millisecond) // Simulate loading time
|
||||
return model.NewModel(modelID, modelName, nil), nil
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
results := make([]*model.Model, 5)
|
||||
errs := make([]error, 5)
|
||||
|
||||
// Start 5 concurrent requests for the same model
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
results[idx], errs[idx] = modelLoader.LoadModel("concurrent-model", "test.model", mockLoader)
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// All requests should succeed
|
||||
for i := 0; i < 5; i++ {
|
||||
Expect(errs[i]).To(BeNil())
|
||||
Expect(results[i]).ToNot(BeNil())
|
||||
}
|
||||
|
||||
// The loader should only have been called once
|
||||
Expect(atomic.LoadInt32(&loadCount)).To(Equal(int32(1)))
|
||||
|
||||
// All results should be the same model instance
|
||||
for i := 1; i < 5; i++ {
|
||||
Expect(results[i]).To(Equal(results[0]))
|
||||
}
|
||||
})
|
||||
|
||||
It("should handle concurrent requests for different models", func() {
|
||||
var loadCount int32
|
||||
mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) {
|
||||
atomic.AddInt32(&loadCount, 1)
|
||||
time.Sleep(50 * time.Millisecond) // Simulate loading time
|
||||
return model.NewModel(modelID, modelName, nil), nil
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
modelCount := 3
|
||||
|
||||
// Start concurrent requests for different models
|
||||
for i := 0; i < modelCount; i++ {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
modelID := "model-" + string(rune('A'+idx))
|
||||
_, err := modelLoader.LoadModel(modelID, "test.model", mockLoader)
|
||||
Expect(err).To(BeNil())
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Each model should be loaded exactly once
|
||||
Expect(atomic.LoadInt32(&loadCount)).To(Equal(int32(modelCount)))
|
||||
|
||||
// All models should be loaded
|
||||
Expect(modelLoader.CheckIsLoaded("model-A")).ToNot(BeNil())
|
||||
Expect(modelLoader.CheckIsLoaded("model-B")).ToNot(BeNil())
|
||||
Expect(modelLoader.CheckIsLoaded("model-C")).ToNot(BeNil())
|
||||
})
|
||||
|
||||
It("should track loading count correctly", func() {
|
||||
loadStarted := make(chan struct{})
|
||||
loadComplete := make(chan struct{})
|
||||
|
||||
mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) {
|
||||
close(loadStarted)
|
||||
<-loadComplete // Wait until we're told to complete
|
||||
return model.NewModel(modelID, modelName, nil), nil
|
||||
}
|
||||
|
||||
// Start loading in background
|
||||
go func() {
|
||||
modelLoader.LoadModel("slow-model", "test.model", mockLoader)
|
||||
}()
|
||||
|
||||
// Wait for loading to start
|
||||
<-loadStarted
|
||||
|
||||
// Loading count should be 1
|
||||
Expect(modelLoader.GetLoadingCount()).To(Equal(1))
|
||||
|
||||
// Complete the loading
|
||||
close(loadComplete)
|
||||
|
||||
// Wait a bit for cleanup
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Loading count should be back to 0
|
||||
Expect(modelLoader.GetLoadingCount()).To(Equal(0))
|
||||
})
|
||||
|
||||
It("should retry loading if first attempt fails", func() {
|
||||
var attemptCount int32
|
||||
mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) {
|
||||
count := atomic.AddInt32(&attemptCount, 1)
|
||||
if count == 1 {
|
||||
return nil, errors.New("first attempt fails")
|
||||
}
|
||||
return model.NewModel(modelID, modelName, nil), nil
|
||||
}
|
||||
|
||||
// First goroutine will fail
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
var err1, err2 error
|
||||
var m1, m2 *model.Model
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
m1, err1 = modelLoader.LoadModel("retry-model", "test.model", mockLoader)
|
||||
}()
|
||||
|
||||
// Give first goroutine a head start
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
m2, err2 = modelLoader.LoadModel("retry-model", "test.model", mockLoader)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// At least one should succeed (the second attempt after retry)
|
||||
successCount := 0
|
||||
if err1 == nil && m1 != nil {
|
||||
successCount++
|
||||
}
|
||||
if err2 == nil && m2 != nil {
|
||||
successCount++
|
||||
}
|
||||
Expect(successCount).To(BeNumerically(">=", 1))
|
||||
})
|
||||
})
|
||||
|
||||
Context("GetLoadingCount", func() {
|
||||
It("should return 0 when nothing is loading", func() {
|
||||
Expect(modelLoader.GetLoadingCount()).To(Equal(0))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -13,13 +14,16 @@ import (
|
||||
// watchdog that will keep track of the state of each backend (busy or not)
|
||||
// and for how much time it has been busy.
|
||||
// If a backend is busy for too long, the watchdog will kill the process and
|
||||
// force a reload of the model
|
||||
// force a reload of the model.
|
||||
// The watchdog also supports LRU (Least Recently Used) eviction when a maximum
|
||||
// number of active backends is configured.
|
||||
// The watchdog runs as a separate go routine,
|
||||
// and the GRPC client talks to it via a channel to send status updates
|
||||
type WatchDog struct {
|
||||
sync.Mutex
|
||||
timetable map[string]time.Time
|
||||
busyTime map[string]time.Time
|
||||
idleTime map[string]time.Time
|
||||
lastUsed map[string]time.Time // LRU tracking: when each model was last used
|
||||
timeout, idletimeout time.Duration
|
||||
addressMap map[string]*process.Process
|
||||
addressModelMap map[string]string
|
||||
@@ -27,27 +31,44 @@ type WatchDog struct {
|
||||
stop chan bool
|
||||
|
||||
busyCheck, idleCheck bool
|
||||
lruLimit int // Maximum number of active backends (0 = unlimited)
|
||||
}
|
||||
|
||||
type ProcessManager interface {
|
||||
ShutdownModel(modelName string) error
|
||||
}
|
||||
|
||||
func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool) *WatchDog {
|
||||
func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool, lruLimit int) *WatchDog {
|
||||
return &WatchDog{
|
||||
timeout: timeoutBusy,
|
||||
idletimeout: timeoutIdle,
|
||||
pm: pm,
|
||||
timetable: make(map[string]time.Time),
|
||||
busyTime: make(map[string]time.Time),
|
||||
idleTime: make(map[string]time.Time),
|
||||
lastUsed: make(map[string]time.Time),
|
||||
addressMap: make(map[string]*process.Process),
|
||||
busyCheck: busy,
|
||||
idleCheck: idle,
|
||||
lruLimit: lruLimit,
|
||||
addressModelMap: make(map[string]string),
|
||||
stop: make(chan bool, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// SetLRULimit updates the LRU limit dynamically
|
||||
func (wd *WatchDog) SetLRULimit(limit int) {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
wd.lruLimit = limit
|
||||
}
|
||||
|
||||
// GetLRULimit returns the current LRU limit
|
||||
func (wd *WatchDog) GetLRULimit() int {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
return wd.lruLimit
|
||||
}
|
||||
|
||||
func (wd *WatchDog) Shutdown() {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
@@ -70,15 +91,107 @@ func (wd *WatchDog) Add(address string, p *process.Process) {
|
||||
func (wd *WatchDog) Mark(address string) {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
wd.timetable[address] = time.Now()
|
||||
now := time.Now()
|
||||
wd.busyTime[address] = now
|
||||
wd.lastUsed[address] = now // Update LRU tracking
|
||||
delete(wd.idleTime, address)
|
||||
}
|
||||
|
||||
func (wd *WatchDog) UnMark(ModelAddress string) {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
delete(wd.timetable, ModelAddress)
|
||||
wd.idleTime[ModelAddress] = time.Now()
|
||||
now := time.Now()
|
||||
delete(wd.busyTime, ModelAddress)
|
||||
wd.idleTime[ModelAddress] = now
|
||||
wd.lastUsed[ModelAddress] = now // Update LRU tracking
|
||||
}
|
||||
|
||||
// UpdateLastUsed updates the last used time for a model address (for LRU tracking)
|
||||
// This should be called when a model is accessed (e.g., when checking if loaded)
|
||||
func (wd *WatchDog) UpdateLastUsed(address string) {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
wd.lastUsed[address] = time.Now()
|
||||
}
|
||||
|
||||
// GetLoadedModelCount returns the number of currently loaded models tracked by the watchdog
|
||||
func (wd *WatchDog) GetLoadedModelCount() int {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
return len(wd.addressModelMap)
|
||||
}
|
||||
|
||||
// modelUsageInfo holds information about a model's usage for LRU sorting
|
||||
type modelUsageInfo struct {
|
||||
address string
|
||||
model string
|
||||
lastUsed time.Time
|
||||
}
|
||||
|
||||
// EnforceLRULimit ensures we're under the LRU limit by evicting least recently used models.
|
||||
// This should be called before loading a new model.
|
||||
// pendingLoads is the number of models currently being loaded (to account for concurrent loads).
|
||||
// Returns the number of models evicted.
|
||||
func (wd *WatchDog) EnforceLRULimit(pendingLoads int) int {
|
||||
if wd.lruLimit <= 0 {
|
||||
return 0 // LRU disabled
|
||||
}
|
||||
|
||||
wd.Lock()
|
||||
|
||||
currentCount := len(wd.addressModelMap)
|
||||
// We need to evict enough to make room for the new model AND any pending loads
|
||||
// Total after loading = currentCount + pendingLoads + 1 (the new one we're about to load)
|
||||
// We need: currentCount + pendingLoads + 1 <= lruLimit
|
||||
// So evict: currentCount + pendingLoads + 1 - lruLimit = currentCount - lruLimit + pendingLoads + 1
|
||||
modelsToEvict := currentCount - wd.lruLimit + pendingLoads + 1
|
||||
if modelsToEvict <= 0 {
|
||||
wd.Unlock()
|
||||
return 0
|
||||
}
|
||||
|
||||
log.Debug().Int("current", currentCount).Int("pendingLoads", pendingLoads).Int("limit", wd.lruLimit).Int("toEvict", modelsToEvict).Msg("[WatchDog] LRU enforcement triggered")
|
||||
|
||||
// Build a list of models sorted by last used time (oldest first)
|
||||
var models []modelUsageInfo
|
||||
for address, model := range wd.addressModelMap {
|
||||
lastUsed := wd.lastUsed[address]
|
||||
if lastUsed.IsZero() {
|
||||
// If no lastUsed recorded, use a very old time
|
||||
lastUsed = time.Time{}
|
||||
}
|
||||
models = append(models, modelUsageInfo{
|
||||
address: address,
|
||||
model: model,
|
||||
lastUsed: lastUsed,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort by lastUsed time (oldest first)
|
||||
sort.Slice(models, func(i, j int) bool {
|
||||
return models[i].lastUsed.Before(models[j].lastUsed)
|
||||
})
|
||||
|
||||
// Collect models to evict (the oldest ones)
|
||||
var modelsToShutdown []string
|
||||
for i := 0; i < modelsToEvict && i < len(models); i++ {
|
||||
m := models[i]
|
||||
log.Info().Str("model", m.model).Time("lastUsed", m.lastUsed).Msg("[WatchDog] LRU evicting model")
|
||||
modelsToShutdown = append(modelsToShutdown, m.model)
|
||||
// Clean up the maps while we have the lock
|
||||
wd.untrack(m.address)
|
||||
}
|
||||
wd.Unlock()
|
||||
|
||||
// Now shutdown models without holding the watchdog lock to prevent deadlock
|
||||
for _, model := range modelsToShutdown {
|
||||
if err := wd.pm.ShutdownModel(model); err != nil {
|
||||
log.Error().Err(err).Str("model", model).Msg("[WatchDog] error shutting down model during LRU eviction")
|
||||
}
|
||||
log.Debug().Str("model", model).Msg("[WatchDog] LRU eviction complete")
|
||||
}
|
||||
|
||||
return len(modelsToShutdown)
|
||||
}
|
||||
|
||||
func (wd *WatchDog) Run() {
|
||||
@@ -117,14 +230,10 @@ func (wd *WatchDog) checkIdle() {
|
||||
model, ok := wd.addressModelMap[address]
|
||||
if ok {
|
||||
modelsToShutdown = append(modelsToShutdown, model)
|
||||
// Clean up the maps while we have the lock
|
||||
delete(wd.idleTime, address)
|
||||
delete(wd.addressModelMap, address)
|
||||
delete(wd.addressMap, address)
|
||||
} else {
|
||||
log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
|
||||
delete(wd.idleTime, address)
|
||||
}
|
||||
wd.untrack(address)
|
||||
}
|
||||
}
|
||||
wd.Unlock()
|
||||
@@ -144,7 +253,7 @@ func (wd *WatchDog) checkBusy() {
|
||||
|
||||
// Collect models to shutdown while holding the lock
|
||||
var modelsToShutdown []string
|
||||
for address, t := range wd.timetable {
|
||||
for address, t := range wd.busyTime {
|
||||
log.Debug().Msgf("[WatchDog] %s: active connection", address)
|
||||
|
||||
if time.Since(t) > wd.timeout {
|
||||
@@ -152,14 +261,10 @@ func (wd *WatchDog) checkBusy() {
|
||||
if ok {
|
||||
log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model)
|
||||
modelsToShutdown = append(modelsToShutdown, model)
|
||||
// Clean up the maps while we have the lock
|
||||
delete(wd.timetable, address)
|
||||
delete(wd.addressModelMap, address)
|
||||
delete(wd.addressMap, address)
|
||||
} else {
|
||||
log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
|
||||
delete(wd.timetable, address)
|
||||
}
|
||||
wd.untrack(address)
|
||||
}
|
||||
}
|
||||
wd.Unlock()
|
||||
@@ -172,3 +277,11 @@ func (wd *WatchDog) checkBusy() {
|
||||
log.Debug().Msgf("[WatchDog] model shut down: %s", model)
|
||||
}
|
||||
}
|
||||
|
||||
func (wd *WatchDog) untrack(address string) {
|
||||
delete(wd.busyTime, address)
|
||||
delete(wd.idleTime, address)
|
||||
delete(wd.lastUsed, address)
|
||||
delete(wd.addressModelMap, address)
|
||||
delete(wd.addressMap, address)
|
||||
}
|
||||
|
||||
244
pkg/model/watchdog_test.go
Normal file
244
pkg/model/watchdog_test.go
Normal file
@@ -0,0 +1,244 @@
|
||||
package model_test
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// mockProcessManager implements ProcessManager for testing
|
||||
type mockProcessManager struct {
|
||||
mu sync.Mutex
|
||||
shutdownCalls []string
|
||||
shutdownErrors map[string]error
|
||||
}
|
||||
|
||||
func newMockProcessManager() *mockProcessManager {
|
||||
return &mockProcessManager{
|
||||
shutdownCalls: []string{},
|
||||
shutdownErrors: make(map[string]error),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockProcessManager) ShutdownModel(modelName string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.shutdownCalls = append(m.shutdownCalls, modelName)
|
||||
if err, ok := m.shutdownErrors[modelName]; ok {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockProcessManager) getShutdownCalls() []string {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
result := make([]string, len(m.shutdownCalls))
|
||||
copy(result, m.shutdownCalls)
|
||||
return result
|
||||
}
|
||||
|
||||
var _ = Describe("WatchDog", func() {
|
||||
var (
|
||||
wd *model.WatchDog
|
||||
pm *mockProcessManager
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
pm = newMockProcessManager()
|
||||
})
|
||||
|
||||
Context("LRU Limit", func() {
|
||||
It("should create watchdog with LRU limit", func() {
|
||||
wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 2)
|
||||
Expect(wd.GetLRULimit()).To(Equal(2))
|
||||
})
|
||||
|
||||
It("should allow updating LRU limit dynamically", func() {
|
||||
wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 2)
|
||||
wd.SetLRULimit(5)
|
||||
Expect(wd.GetLRULimit()).To(Equal(5))
|
||||
})
|
||||
|
||||
It("should return 0 for disabled LRU", func() {
|
||||
wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 0)
|
||||
Expect(wd.GetLRULimit()).To(Equal(0))
|
||||
})
|
||||
})
|
||||
|
||||
Context("Model Tracking", func() {
|
||||
BeforeEach(func() {
|
||||
wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 3)
|
||||
})
|
||||
|
||||
It("should track loaded models count", func() {
|
||||
Expect(wd.GetLoadedModelCount()).To(Equal(0))
|
||||
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
Expect(wd.GetLoadedModelCount()).To(Equal(1))
|
||||
|
||||
wd.AddAddressModelMap("addr2", "model2")
|
||||
Expect(wd.GetLoadedModelCount()).To(Equal(2))
|
||||
})
|
||||
|
||||
It("should update lastUsed time on Mark", func() {
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
// The model should now have a lastUsed time set
|
||||
// We can verify this indirectly through LRU eviction behavior
|
||||
})
|
||||
|
||||
It("should update lastUsed time on UnMark", func() {
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
wd.UnMark("addr1")
|
||||
// The model should now have an updated lastUsed time
|
||||
})
|
||||
|
||||
It("should update lastUsed time via UpdateLastUsed", func() {
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.UpdateLastUsed("addr1")
|
||||
// Verify the time was updated
|
||||
})
|
||||
})
|
||||
|
||||
Context("EnforceLRULimit", func() {
|
||||
BeforeEach(func() {
|
||||
wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 2)
|
||||
})
|
||||
|
||||
It("should not evict when under limit", func() {
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
|
||||
evicted := wd.EnforceLRULimit(0)
|
||||
Expect(evicted).To(Equal(0))
|
||||
Expect(pm.getShutdownCalls()).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("should evict oldest model when at limit", func() {
|
||||
// Add two models
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
wd.AddAddressModelMap("addr2", "model2")
|
||||
wd.Mark("addr2")
|
||||
|
||||
// Enforce LRU with limit of 2 (need to make room for 1 new model)
|
||||
evicted := wd.EnforceLRULimit(0)
|
||||
Expect(evicted).To(Equal(1))
|
||||
Expect(pm.getShutdownCalls()).To(ContainElement("model1")) // oldest should be evicted
|
||||
})
|
||||
|
||||
It("should evict multiple models when needed", func() {
|
||||
// Add three models
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
wd.AddAddressModelMap("addr2", "model2")
|
||||
wd.Mark("addr2")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
wd.AddAddressModelMap("addr3", "model3")
|
||||
wd.Mark("addr3")
|
||||
|
||||
// Set limit to 1, should evict 2 oldest + 1 for new = 3 evictions
|
||||
wd.SetLRULimit(1)
|
||||
evicted := wd.EnforceLRULimit(0)
|
||||
Expect(evicted).To(Equal(3))
|
||||
shutdowns := pm.getShutdownCalls()
|
||||
Expect(shutdowns).To(ContainElement("model1"))
|
||||
Expect(shutdowns).To(ContainElement("model2"))
|
||||
Expect(shutdowns).To(ContainElement("model3"))
|
||||
})
|
||||
|
||||
It("should account for pending loads", func() {
|
||||
// Add two models (at limit)
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
wd.AddAddressModelMap("addr2", "model2")
|
||||
wd.Mark("addr2")
|
||||
|
||||
// With 1 pending load, we need to evict 2 (current=2, pending=1, new=1, limit=2)
|
||||
// total after = 2 + 1 + 1 = 4, need to evict 4 - 2 = 2
|
||||
evicted := wd.EnforceLRULimit(1)
|
||||
Expect(evicted).To(Equal(2))
|
||||
})
|
||||
|
||||
It("should not evict when LRU is disabled", func() {
|
||||
wd.SetLRULimit(0)
|
||||
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.AddAddressModelMap("addr2", "model2")
|
||||
wd.AddAddressModelMap("addr3", "model3")
|
||||
|
||||
evicted := wd.EnforceLRULimit(0)
|
||||
Expect(evicted).To(Equal(0))
|
||||
Expect(pm.getShutdownCalls()).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("should evict least recently used first", func() {
|
||||
wd.SetLRULimit(2)
|
||||
|
||||
// Add models with different lastUsed times
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
wd.AddAddressModelMap("addr2", "model2")
|
||||
wd.Mark("addr2")
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Touch model1 again to make it more recent
|
||||
wd.UpdateLastUsed("addr1")
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
wd.AddAddressModelMap("addr3", "model3")
|
||||
wd.Mark("addr3")
|
||||
|
||||
// Now model2 is the oldest, should be evicted first
|
||||
evicted := wd.EnforceLRULimit(0)
|
||||
Expect(evicted).To(BeNumerically(">=", 1))
|
||||
|
||||
shutdowns := pm.getShutdownCalls()
|
||||
// model2 should be evicted first (it's the oldest)
|
||||
if len(shutdowns) >= 1 {
|
||||
Expect(shutdowns[0]).To(Equal("model2"))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
Context("Single Backend Mode (LRU=1)", func() {
|
||||
BeforeEach(func() {
|
||||
wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 1)
|
||||
})
|
||||
|
||||
It("should evict existing model when loading new one", func() {
|
||||
wd.AddAddressModelMap("addr1", "model1")
|
||||
wd.Mark("addr1")
|
||||
|
||||
// With limit=1, loading a new model should evict the existing one
|
||||
evicted := wd.EnforceLRULimit(0)
|
||||
Expect(evicted).To(Equal(1))
|
||||
Expect(pm.getShutdownCalls()).To(ContainElement("model1"))
|
||||
})
|
||||
|
||||
It("should handle rapid model switches", func() {
|
||||
for i := 0; i < 5; i++ {
|
||||
wd.AddAddressModelMap("addr", "model")
|
||||
wd.Mark("addr")
|
||||
wd.EnforceLRULimit(0)
|
||||
}
|
||||
// All previous models should have been evicted
|
||||
Expect(len(pm.getShutdownCalls())).To(Equal(5))
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -63,7 +63,7 @@ var _ = Describe("Integration tests for the stores backend(s) and internal APIs"
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
sl = model.NewModelLoader(systemState, false)
|
||||
sl = model.NewModelLoader(systemState)
|
||||
sc, err = sl.Load(storeOpts...)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(sc).ToNot(BeNil())
|
||||
|
||||
Reference in New Issue
Block a user