Files
LocalAI/pkg/model/initializers.go
Ettore Di Giacinto c844b7ac58 feat: disable force eviction (#7725)
* feat: allow to set forcing backends eviction while requests are in flight

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat: try to make the request sit and retry if eviction couldn't be done

Otherwise calls that in order to pass would need to shutdown other
backends would just fail.

In this way instead we make the request sit and retry eviction until it
succeeds. The thresholds can be configured by the user.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* add tests

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* expose settings to CLI

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Update docs

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2025-12-25 14:26:18 +01:00

305 lines
9.3 KiB
Go

package model
import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"
grpc "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/xlog"
"github.com/phayes/freeport"
)
const (
LLamaCPP = "llama-cpp"
)
var Aliases map[string]string = map[string]string{
"go-llama": LLamaCPP,
"llama": LLamaCPP,
"embedded-store": LocalStoreBackend,
"huggingface-embeddings": TransformersBackend,
"langchain-huggingface": LCHuggingFaceBackend,
"transformers-musicgen": TransformersBackend,
"sentencetransformers": TransformersBackend,
"mamba": TransformersBackend,
"stablediffusion": StableDiffusionGGMLBackend,
}
var TypeAlias map[string]string = map[string]string{
"sentencetransformers": "SentenceTransformer",
"huggingface-embeddings": "SentenceTransformer",
"mamba": "Mamba",
"transformers-musicgen": "MusicgenForConditionalGeneration",
}
const (
WhisperBackend = "whisper"
StableDiffusionGGMLBackend = "stablediffusion-ggml"
LCHuggingFaceBackend = "huggingface"
TransformersBackend = "transformers"
LocalStoreBackend = "local-store"
)
// starts the grpcModelProcess for the backend, and returns a grpc client
// It also loads the model
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string, string) (*Model, error) {
return func(modelID, modelName, modelFile string) (*Model, error) {
xlog.Debug("Loading Model with gRPC", "modelID", modelID, "file", modelFile, "backend", backend, "options", *o)
var client *Model
getFreeAddress := func() (string, error) {
port, err := freeport.GetFreePort()
if err != nil {
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
}
return fmt.Sprintf("127.0.0.1:%d", port), nil
}
// If no specific model path is set for transformers/HF, set it to the model path
for _, env := range []string{"HF_HOME", "TRANSFORMERS_CACHE", "HUGGINGFACE_HUB_CACHE"} {
if os.Getenv(env) == "" {
err := os.Setenv(env, ml.ModelPath)
if err != nil {
xlog.Error("unable to set environment variable to modelPath", "error", err, "name", env, "modelPath", ml.ModelPath)
}
}
}
// Check if the backend is provided as external
if uri, ok := ml.GetAllExternalBackends(o)[backend]; ok {
xlog.Debug("Loading external backend", "uri", uri)
// check if uri is a file or a address
if fi, err := os.Stat(uri); err == nil {
xlog.Debug("external backend is file", "file", fi)
serverAddress, err := getFreeAddress()
if err != nil {
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
}
// Make sure the process is executable
process, err := ml.startProcess(uri, modelID, serverAddress)
if err != nil {
xlog.Error("failed to launch", "error", err, "path", uri)
return nil, err
}
xlog.Debug("GRPC Service Started")
client = NewModel(modelID, serverAddress, process)
} else {
xlog.Debug("external backend is a uri")
// address
client = NewModel(modelID, uri, nil)
}
} else {
xlog.Error("Backend not found", "backend", backend)
return nil, fmt.Errorf("backend not found: %s", backend)
}
xlog.Debug("Wait for the service to start up")
xlog.Debug("Options", "options", o.gRPCOptions)
// Wait for the service to start up
ready := false
for i := 0; i < o.grpcAttempts; i++ {
alive, err := client.GRPC(o.parallelRequests, ml.wd).HealthCheck(context.Background())
if alive {
xlog.Debug("GRPC Service Ready")
ready = true
break
}
if err != nil && i == o.grpcAttempts-1 {
xlog.Error("failed starting/connecting to the gRPC service", "error", err)
}
time.Sleep(time.Duration(o.grpcAttemptsDelay) * time.Second)
}
if !ready {
xlog.Debug("GRPC Service NOT ready")
if process := client.Process(); process != nil {
process.Stop()
}
return nil, fmt.Errorf("grpc service not ready")
}
options := *o.gRPCOptions
options.Model = modelName
options.ModelFile = modelFile
options.ModelPath = ml.ModelPath
xlog.Debug("GRPC: Loading model with options", "options", options)
res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options)
if err != nil {
if process := client.Process(); process != nil {
process.Stop()
}
return nil, fmt.Errorf("could not load model: %w", err)
}
if !res.Success {
if process := client.Process(); process != nil {
process.Stop()
}
return nil, fmt.Errorf("could not load model (no success): %s", res.Message)
}
return client, nil
}
}
func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err error) {
o := NewOptions(opts...)
xlog.Info("BackendLoader starting", "modelID", o.modelID, "backend", o.backendString, "model", o.model)
backend := strings.ToLower(o.backendString)
if realBackend, exists := Aliases[backend]; exists {
typeAlias, exists := TypeAlias[backend]
if exists {
xlog.Debug("alias is a type alias", "alias", backend, "realBackend", realBackend, "type", typeAlias)
o.gRPCOptions.Type = typeAlias
} else {
xlog.Debug("alias", "alias", backend, "realBackend", realBackend)
}
backend = realBackend
}
model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o))
if err != nil {
if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil {
xlog.Error("error stopping model", "error", stopErr, "model", o.modelID)
}
xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString)
return nil, err
}
return model.GRPC(o.parallelRequests, ml.wd), nil
}
// enforceLRULimit enforces the LRU limit before loading a new model.
// This is called before loading a model to ensure we don't exceed the limit.
// It accounts for models that are currently being loaded by other goroutines.
// If models are busy and can't be evicted, it will wait and retry until space is available.
func (ml *ModelLoader) enforceLRULimit() {
if ml.wd == nil {
return
}
// Get the count of models currently being loaded to account for concurrent requests
pendingLoads := ml.GetLoadingCount()
// Get retry settings from ModelLoader
ml.mu.Lock()
maxRetries := ml.lruEvictionMaxRetries
retryInterval := ml.lruEvictionRetryInterval
ml.mu.Unlock()
for attempt := 0; attempt < maxRetries; attempt++ {
result := ml.wd.EnforceLRULimit(pendingLoads)
if !result.NeedMore {
// Successfully evicted enough models (or no eviction needed)
if result.EvictedCount > 0 {
xlog.Info("[ModelLoader] LRU enforcement complete", "evicted", result.EvictedCount)
}
return
}
// Need more evictions but models are busy - wait and retry
if attempt < maxRetries-1 {
xlog.Info("[ModelLoader] Waiting for busy models to become idle before eviction",
"evicted", result.EvictedCount,
"attempt", attempt+1,
"maxRetries", maxRetries,
"retryIn", retryInterval)
time.Sleep(retryInterval)
} else {
// Last attempt - log warning but proceed (might fail to load, but at least we tried)
xlog.Warn("[ModelLoader] LRU enforcement incomplete after max retries",
"evicted", result.EvictedCount,
"reason", "models are still busy with active API calls")
}
}
}
// updateModelLastUsed updates the last used time for a model (for LRU tracking)
func (ml *ModelLoader) updateModelLastUsed(m *Model) {
if ml.wd == nil || m == nil {
return
}
ml.wd.UpdateLastUsed(m.address)
}
func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
o := NewOptions(opts...)
// Return earlier if we have a model already loaded
// (avoid looping through all the backends)
if m := ml.CheckIsLoaded(o.modelID); m != nil {
xlog.Debug("Model already loaded", "model", o.modelID)
// Update last used time for LRU tracking
ml.updateModelLastUsed(m)
return m.GRPC(o.parallelRequests, ml.wd), nil
}
// Enforce LRU limit before loading a new model
ml.enforceLRULimit()
// if a backend is defined, return the loader directly
if o.backendString != "" {
client, err := ml.backendLoader(opts...)
if err != nil {
return nil, err
}
return client, nil
}
// Otherwise scan for backends in the asset directory
var err error
// get backends embedded in the binary
autoLoadBackends := []string{}
// append externalBackends supplied by the user via the CLI
for b := range ml.GetAllExternalBackends(o) {
autoLoadBackends = append(autoLoadBackends, b)
}
if len(autoLoadBackends) == 0 {
xlog.Error("No backends found")
return nil, fmt.Errorf("no backends found")
}
xlog.Debug("Loading from the following backends (in order)", "backends", autoLoadBackends)
xlog.Info("Trying to load the model", "modelID", o.modelID, "backends", autoLoadBackends)
for _, key := range autoLoadBackends {
xlog.Info("Attempting to load", "backend", key)
options := append(opts, []Option{
WithBackendString(key),
}...)
model, modelerr := ml.backendLoader(options...)
if modelerr == nil && model != nil {
xlog.Info("Loads OK", "backend", key)
return model, nil
} else if modelerr != nil {
err = errors.Join(err, fmt.Errorf("[%s]: %w", key, modelerr))
xlog.Info("Fails", "backend", key, "error", modelerr.Error())
} else if model == nil {
err = errors.Join(err, fmt.Errorf("backend %s returned no usable model", key))
xlog.Info("Fails", "backend", key, "error", "backend returned no usable model")
}
}
return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error())
}