Files
LocalAI/core/services/nodes/model_router.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

95 lines
3.1 KiB
Go

package nodes
import (
"context"
"fmt"
"sync"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/xlog"
)
// ModelRouterAdapter wraps SmartRouter to provide a model.ModelRouter callback
// for the ModelLoader. When the ModelLoader needs to start a gRPC backend,
// it calls this adapter instead of starting a local process.
//
// The adapter:
// 1. Calls SmartRouter.Route() to find/load the model on a remote node
// (SmartRouter pre-stages model files via FileStager in Route())
// 2. Returns a Model with a FileStagingClient-wrapped gRPC client
// 3. Tracks Release() functions for cleanup on model unload
type ModelRouterAdapter struct {
router *SmartRouter
mu sync.Mutex
release map[string]func() // modelID -> Release() callback
}
// NewModelRouterAdapter creates a new adapter.
func NewModelRouterAdapter(router *SmartRouter) *ModelRouterAdapter {
return &ModelRouterAdapter{
router: router,
release: make(map[string]func()),
}
}
// Route implements the model.ModelRouter callback signature.
// It delegates to SmartRouter.Route() and returns a Model that wraps the
// remote gRPC client with file staging if configured.
func (a *ModelRouterAdapter) Route(ctx context.Context, backend, modelID, modelName, modelFile string,
opts *pb.ModelOptions, parallel bool) (*model.Model, error) {
backendType := backend
// Set model file and name on opts — these are passed as function args
// (like the local path in initializers.go) but not pre-set in gRPCOptions.
if opts != nil {
opts.Model = modelName
opts.ModelFile = modelFile
}
// Route to a remote node (SmartRouter handles model pre-staging via FileStager)
// Pass modelID so the DB tracks models by their logical ID, not the file path
result, err := a.router.Route(ctx, modelID, modelName, backendType, opts, parallel)
if err != nil {
return nil, fmt.Errorf("routing model %s: %w", modelName, err)
}
// Store release function for cleanup on unload
a.mu.Lock()
if oldRelease, ok := a.release[modelID]; ok {
go oldRelease() // clean up previous route in background
}
a.release[modelID] = result.Release
a.mu.Unlock()
// The client from Route is already a grpc.Backend.
// If file staging is configured, it's already wrapped with FileStagingClient
// by SmartRouter. Use NewModelWithClient so the wrapper is preserved when
// the ModelLoader returns this model on subsequent requests.
m := model.NewModelWithClient(modelID, result.Node.Address, result.Client)
xlog.Info("Model routed to remote node", "model", modelName, "node", result.Node.Name, "address", result.Node.Address)
return m, nil
}
// ReleaseModel releases the in-flight counter for a model.
// Called when the model is unloaded.
func (a *ModelRouterAdapter) ReleaseModel(modelID string) {
a.mu.Lock()
release, ok := a.release[modelID]
if ok {
delete(a.release, modelID)
}
a.mu.Unlock()
if ok && release != nil {
release()
}
}
// AsModelRouter returns a model.ModelRouter function suitable for ModelLoader.SetModelRouter().
func (a *ModelRouterAdapter) AsModelRouter() model.ModelRouter {
return a.Route
}