mirror of
https://github.com/mudler/LocalAI.git
synced 2025-12-25 23:49:25 -05:00
* feat: allow to set forcing backends eviction while requests are in flight Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat: try to make the request sit and retry if eviction couldn't be done Otherwise calls that in order to pass would need to shutdown other backends would just fail. In this way instead we make the request sit and retry eviction until it succeeds. The thresholds can be configured by the user. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * add tests Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * expose settings to CLI Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Update docs Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
305 lines
9.3 KiB
Go
305 lines
9.3 KiB
Go
package model
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
grpc "github.com/mudler/LocalAI/pkg/grpc"
|
|
"github.com/mudler/xlog"
|
|
"github.com/phayes/freeport"
|
|
)
|
|
|
|
const (
|
|
LLamaCPP = "llama-cpp"
|
|
)
|
|
|
|
var Aliases map[string]string = map[string]string{
|
|
"go-llama": LLamaCPP,
|
|
"llama": LLamaCPP,
|
|
"embedded-store": LocalStoreBackend,
|
|
"huggingface-embeddings": TransformersBackend,
|
|
"langchain-huggingface": LCHuggingFaceBackend,
|
|
"transformers-musicgen": TransformersBackend,
|
|
"sentencetransformers": TransformersBackend,
|
|
"mamba": TransformersBackend,
|
|
"stablediffusion": StableDiffusionGGMLBackend,
|
|
}
|
|
|
|
var TypeAlias map[string]string = map[string]string{
|
|
"sentencetransformers": "SentenceTransformer",
|
|
"huggingface-embeddings": "SentenceTransformer",
|
|
"mamba": "Mamba",
|
|
"transformers-musicgen": "MusicgenForConditionalGeneration",
|
|
}
|
|
|
|
const (
|
|
WhisperBackend = "whisper"
|
|
StableDiffusionGGMLBackend = "stablediffusion-ggml"
|
|
LCHuggingFaceBackend = "huggingface"
|
|
|
|
TransformersBackend = "transformers"
|
|
LocalStoreBackend = "local-store"
|
|
)
|
|
|
|
// starts the grpcModelProcess for the backend, and returns a grpc client
|
|
// It also loads the model
|
|
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string, string) (*Model, error) {
|
|
return func(modelID, modelName, modelFile string) (*Model, error) {
|
|
|
|
xlog.Debug("Loading Model with gRPC", "modelID", modelID, "file", modelFile, "backend", backend, "options", *o)
|
|
|
|
var client *Model
|
|
|
|
getFreeAddress := func() (string, error) {
|
|
port, err := freeport.GetFreePort()
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
|
|
}
|
|
return fmt.Sprintf("127.0.0.1:%d", port), nil
|
|
}
|
|
|
|
// If no specific model path is set for transformers/HF, set it to the model path
|
|
for _, env := range []string{"HF_HOME", "TRANSFORMERS_CACHE", "HUGGINGFACE_HUB_CACHE"} {
|
|
if os.Getenv(env) == "" {
|
|
err := os.Setenv(env, ml.ModelPath)
|
|
if err != nil {
|
|
xlog.Error("unable to set environment variable to modelPath", "error", err, "name", env, "modelPath", ml.ModelPath)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check if the backend is provided as external
|
|
if uri, ok := ml.GetAllExternalBackends(o)[backend]; ok {
|
|
xlog.Debug("Loading external backend", "uri", uri)
|
|
// check if uri is a file or a address
|
|
if fi, err := os.Stat(uri); err == nil {
|
|
xlog.Debug("external backend is file", "file", fi)
|
|
serverAddress, err := getFreeAddress()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
|
|
}
|
|
// Make sure the process is executable
|
|
process, err := ml.startProcess(uri, modelID, serverAddress)
|
|
if err != nil {
|
|
xlog.Error("failed to launch", "error", err, "path", uri)
|
|
return nil, err
|
|
}
|
|
|
|
xlog.Debug("GRPC Service Started")
|
|
|
|
client = NewModel(modelID, serverAddress, process)
|
|
} else {
|
|
xlog.Debug("external backend is a uri")
|
|
// address
|
|
client = NewModel(modelID, uri, nil)
|
|
}
|
|
} else {
|
|
xlog.Error("Backend not found", "backend", backend)
|
|
return nil, fmt.Errorf("backend not found: %s", backend)
|
|
}
|
|
|
|
xlog.Debug("Wait for the service to start up")
|
|
xlog.Debug("Options", "options", o.gRPCOptions)
|
|
|
|
// Wait for the service to start up
|
|
ready := false
|
|
for i := 0; i < o.grpcAttempts; i++ {
|
|
alive, err := client.GRPC(o.parallelRequests, ml.wd).HealthCheck(context.Background())
|
|
if alive {
|
|
xlog.Debug("GRPC Service Ready")
|
|
ready = true
|
|
break
|
|
}
|
|
if err != nil && i == o.grpcAttempts-1 {
|
|
xlog.Error("failed starting/connecting to the gRPC service", "error", err)
|
|
}
|
|
time.Sleep(time.Duration(o.grpcAttemptsDelay) * time.Second)
|
|
}
|
|
|
|
if !ready {
|
|
xlog.Debug("GRPC Service NOT ready")
|
|
if process := client.Process(); process != nil {
|
|
process.Stop()
|
|
}
|
|
return nil, fmt.Errorf("grpc service not ready")
|
|
}
|
|
|
|
options := *o.gRPCOptions
|
|
options.Model = modelName
|
|
options.ModelFile = modelFile
|
|
options.ModelPath = ml.ModelPath
|
|
|
|
xlog.Debug("GRPC: Loading model with options", "options", options)
|
|
|
|
res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options)
|
|
if err != nil {
|
|
if process := client.Process(); process != nil {
|
|
process.Stop()
|
|
}
|
|
return nil, fmt.Errorf("could not load model: %w", err)
|
|
}
|
|
if !res.Success {
|
|
if process := client.Process(); process != nil {
|
|
process.Stop()
|
|
}
|
|
return nil, fmt.Errorf("could not load model (no success): %s", res.Message)
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
}
|
|
|
|
func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err error) {
|
|
o := NewOptions(opts...)
|
|
|
|
xlog.Info("BackendLoader starting", "modelID", o.modelID, "backend", o.backendString, "model", o.model)
|
|
|
|
backend := strings.ToLower(o.backendString)
|
|
if realBackend, exists := Aliases[backend]; exists {
|
|
typeAlias, exists := TypeAlias[backend]
|
|
if exists {
|
|
xlog.Debug("alias is a type alias", "alias", backend, "realBackend", realBackend, "type", typeAlias)
|
|
o.gRPCOptions.Type = typeAlias
|
|
} else {
|
|
xlog.Debug("alias", "alias", backend, "realBackend", realBackend)
|
|
}
|
|
|
|
backend = realBackend
|
|
}
|
|
|
|
model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o))
|
|
if err != nil {
|
|
if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil {
|
|
xlog.Error("error stopping model", "error", stopErr, "model", o.modelID)
|
|
}
|
|
xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString)
|
|
return nil, err
|
|
}
|
|
|
|
return model.GRPC(o.parallelRequests, ml.wd), nil
|
|
}
|
|
|
|
// enforceLRULimit enforces the LRU limit before loading a new model.
|
|
// This is called before loading a model to ensure we don't exceed the limit.
|
|
// It accounts for models that are currently being loaded by other goroutines.
|
|
// If models are busy and can't be evicted, it will wait and retry until space is available.
|
|
func (ml *ModelLoader) enforceLRULimit() {
|
|
if ml.wd == nil {
|
|
return
|
|
}
|
|
|
|
// Get the count of models currently being loaded to account for concurrent requests
|
|
pendingLoads := ml.GetLoadingCount()
|
|
|
|
// Get retry settings from ModelLoader
|
|
ml.mu.Lock()
|
|
maxRetries := ml.lruEvictionMaxRetries
|
|
retryInterval := ml.lruEvictionRetryInterval
|
|
ml.mu.Unlock()
|
|
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
result := ml.wd.EnforceLRULimit(pendingLoads)
|
|
|
|
if !result.NeedMore {
|
|
// Successfully evicted enough models (or no eviction needed)
|
|
if result.EvictedCount > 0 {
|
|
xlog.Info("[ModelLoader] LRU enforcement complete", "evicted", result.EvictedCount)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Need more evictions but models are busy - wait and retry
|
|
if attempt < maxRetries-1 {
|
|
xlog.Info("[ModelLoader] Waiting for busy models to become idle before eviction",
|
|
"evicted", result.EvictedCount,
|
|
"attempt", attempt+1,
|
|
"maxRetries", maxRetries,
|
|
"retryIn", retryInterval)
|
|
time.Sleep(retryInterval)
|
|
} else {
|
|
// Last attempt - log warning but proceed (might fail to load, but at least we tried)
|
|
xlog.Warn("[ModelLoader] LRU enforcement incomplete after max retries",
|
|
"evicted", result.EvictedCount,
|
|
"reason", "models are still busy with active API calls")
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateModelLastUsed updates the last used time for a model (for LRU tracking)
|
|
func (ml *ModelLoader) updateModelLastUsed(m *Model) {
|
|
if ml.wd == nil || m == nil {
|
|
return
|
|
}
|
|
ml.wd.UpdateLastUsed(m.address)
|
|
}
|
|
|
|
func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
|
|
o := NewOptions(opts...)
|
|
|
|
// Return earlier if we have a model already loaded
|
|
// (avoid looping through all the backends)
|
|
if m := ml.CheckIsLoaded(o.modelID); m != nil {
|
|
xlog.Debug("Model already loaded", "model", o.modelID)
|
|
// Update last used time for LRU tracking
|
|
ml.updateModelLastUsed(m)
|
|
return m.GRPC(o.parallelRequests, ml.wd), nil
|
|
}
|
|
|
|
// Enforce LRU limit before loading a new model
|
|
ml.enforceLRULimit()
|
|
|
|
// if a backend is defined, return the loader directly
|
|
if o.backendString != "" {
|
|
client, err := ml.backendLoader(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client, nil
|
|
}
|
|
|
|
// Otherwise scan for backends in the asset directory
|
|
var err error
|
|
|
|
// get backends embedded in the binary
|
|
autoLoadBackends := []string{}
|
|
|
|
// append externalBackends supplied by the user via the CLI
|
|
for b := range ml.GetAllExternalBackends(o) {
|
|
autoLoadBackends = append(autoLoadBackends, b)
|
|
}
|
|
|
|
if len(autoLoadBackends) == 0 {
|
|
xlog.Error("No backends found")
|
|
return nil, fmt.Errorf("no backends found")
|
|
}
|
|
|
|
xlog.Debug("Loading from the following backends (in order)", "backends", autoLoadBackends)
|
|
|
|
xlog.Info("Trying to load the model", "modelID", o.modelID, "backends", autoLoadBackends)
|
|
|
|
for _, key := range autoLoadBackends {
|
|
xlog.Info("Attempting to load", "backend", key)
|
|
options := append(opts, []Option{
|
|
WithBackendString(key),
|
|
}...)
|
|
|
|
model, modelerr := ml.backendLoader(options...)
|
|
if modelerr == nil && model != nil {
|
|
xlog.Info("Loads OK", "backend", key)
|
|
return model, nil
|
|
} else if modelerr != nil {
|
|
err = errors.Join(err, fmt.Errorf("[%s]: %w", key, modelerr))
|
|
xlog.Info("Fails", "backend", key, "error", modelerr.Error())
|
|
} else if model == nil {
|
|
err = errors.Join(err, fmt.Errorf("backend %s returned no usable model", key))
|
|
xlog.Info("Fails", "backend", key, "error", "backend returned no usable model")
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error())
|
|
}
|