mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-01 05:36:49 -04:00
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
95 lines
3.1 KiB
Go
95 lines
3.1 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// ModelRouterAdapter wraps SmartRouter to provide a model.ModelRouter callback
|
|
// for the ModelLoader. When the ModelLoader needs to start a gRPC backend,
|
|
// it calls this adapter instead of starting a local process.
|
|
//
|
|
// The adapter:
|
|
// 1. Calls SmartRouter.Route() to find/load the model on a remote node
|
|
// (SmartRouter pre-stages model files via FileStager in Route())
|
|
// 2. Returns a Model with a FileStagingClient-wrapped gRPC client
|
|
// 3. Tracks Release() functions for cleanup on model unload
|
|
type ModelRouterAdapter struct {
|
|
router *SmartRouter
|
|
mu sync.Mutex
|
|
release map[string]func() // modelID -> Release() callback
|
|
}
|
|
|
|
// NewModelRouterAdapter creates a new adapter.
|
|
func NewModelRouterAdapter(router *SmartRouter) *ModelRouterAdapter {
|
|
return &ModelRouterAdapter{
|
|
router: router,
|
|
release: make(map[string]func()),
|
|
}
|
|
}
|
|
|
|
// Route implements the model.ModelRouter callback signature.
|
|
// It delegates to SmartRouter.Route() and returns a Model that wraps the
|
|
// remote gRPC client with file staging if configured.
|
|
func (a *ModelRouterAdapter) Route(ctx context.Context, backend, modelID, modelName, modelFile string,
|
|
opts *pb.ModelOptions, parallel bool) (*model.Model, error) {
|
|
|
|
backendType := backend
|
|
|
|
// Set model file and name on opts — these are passed as function args
|
|
// (like the local path in initializers.go) but not pre-set in gRPCOptions.
|
|
if opts != nil {
|
|
opts.Model = modelName
|
|
opts.ModelFile = modelFile
|
|
}
|
|
|
|
// Route to a remote node (SmartRouter handles model pre-staging via FileStager)
|
|
// Pass modelID so the DB tracks models by their logical ID, not the file path
|
|
result, err := a.router.Route(ctx, modelID, modelName, backendType, opts, parallel)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("routing model %s: %w", modelName, err)
|
|
}
|
|
|
|
// Store release function for cleanup on unload
|
|
a.mu.Lock()
|
|
if oldRelease, ok := a.release[modelID]; ok {
|
|
go oldRelease() // clean up previous route in background
|
|
}
|
|
a.release[modelID] = result.Release
|
|
a.mu.Unlock()
|
|
|
|
// The client from Route is already a grpc.Backend.
|
|
// If file staging is configured, it's already wrapped with FileStagingClient
|
|
// by SmartRouter. Use NewModelWithClient so the wrapper is preserved when
|
|
// the ModelLoader returns this model on subsequent requests.
|
|
m := model.NewModelWithClient(modelID, result.Node.Address, result.Client)
|
|
|
|
xlog.Info("Model routed to remote node", "model", modelName, "node", result.Node.Name, "address", result.Node.Address)
|
|
return m, nil
|
|
}
|
|
|
|
// ReleaseModel releases the in-flight counter for a model.
|
|
// Called when the model is unloaded.
|
|
func (a *ModelRouterAdapter) ReleaseModel(modelID string) {
|
|
a.mu.Lock()
|
|
release, ok := a.release[modelID]
|
|
if ok {
|
|
delete(a.release, modelID)
|
|
}
|
|
a.mu.Unlock()
|
|
|
|
if ok && release != nil {
|
|
release()
|
|
}
|
|
}
|
|
|
|
// AsModelRouter returns a model.ModelRouter function suitable for ModelLoader.SetModelRouter().
|
|
func (a *ModelRouterAdapter) AsModelRouter() model.ModelRouter {
|
|
return a.Route
|
|
}
|