mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-01 05:36:49 -04:00
* always enable parallel requests Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat: add node reconciler, allow to schedule to group of nodes, min/max autoscaler Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * chore: move tests to ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * chore(smart router): order by available vram Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
96 lines
4.4 KiB
Go
96 lines
4.4 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
grpc "github.com/mudler/LocalAI/pkg/grpc"
|
|
)
|
|
|
|
// ModelRouter is used by SmartRouter for routing decisions and model lifecycle.
|
|
type ModelRouter interface {
|
|
FindAndLockNodeWithModel(ctx context.Context, modelName string) (*BackendNode, *NodeModel, error)
|
|
DecrementInFlight(ctx context.Context, nodeID, modelName string) error
|
|
IncrementInFlight(ctx context.Context, nodeID, modelName string) error
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string) error
|
|
TouchNodeModel(ctx context.Context, nodeID, modelName string)
|
|
SetNodeModel(ctx context.Context, nodeID, modelName, state, address string, initialInFlight int) error
|
|
FindNodeWithVRAM(ctx context.Context, minBytes uint64) (*BackendNode, error)
|
|
FindIdleNode(ctx context.Context) (*BackendNode, error)
|
|
FindLeastLoadedNode(ctx context.Context) (*BackendNode, error)
|
|
FindGlobalLRUModelWithZeroInFlight(ctx context.Context) (*NodeModel, error)
|
|
FindLRUModel(ctx context.Context, nodeID string) (*NodeModel, error)
|
|
Get(ctx context.Context, nodeID string) (*BackendNode, error)
|
|
GetModelScheduling(ctx context.Context, modelName string) (*ModelSchedulingConfig, error)
|
|
FindNodesBySelector(ctx context.Context, selector map[string]string) ([]BackendNode, error)
|
|
FindNodeWithVRAMFromSet(ctx context.Context, minBytes uint64, nodeIDs []string) (*BackendNode, error)
|
|
FindIdleNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
|
|
FindLeastLoadedNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
|
|
GetNodeLabels(ctx context.Context, nodeID string) ([]NodeLabel, error)
|
|
}
|
|
|
|
// NodeHealthStore is used by HealthMonitor for node status management.
|
|
type NodeHealthStore interface {
|
|
List(ctx context.Context) ([]BackendNode, error)
|
|
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
|
|
MarkOffline(ctx context.Context, nodeID string) error
|
|
MarkUnhealthy(ctx context.Context, nodeID string) error
|
|
MarkHealthy(ctx context.Context, nodeID string) error
|
|
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
|
|
FindStaleNodes(ctx context.Context, threshold time.Duration) ([]BackendNode, error)
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string) error
|
|
}
|
|
|
|
// ModelLocator is used by RemoteUnloaderAdapter for model discovery.
|
|
type ModelLocator interface {
|
|
FindNodesWithModel(ctx context.Context, modelName string) ([]BackendNode, error)
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string) error
|
|
}
|
|
|
|
// ModelLookup is used by DistributedModelStore for model existence queries.
|
|
type ModelLookup interface {
|
|
FindNodeForModel(ctx context.Context, modelName string) (*BackendNode, bool)
|
|
ListAllLoadedModels(ctx context.Context) ([]NodeModel, error)
|
|
Get(ctx context.Context, nodeID string) (*BackendNode, error)
|
|
}
|
|
|
|
// InFlightTracker is used by InFlightTrackingClient for request counting.
|
|
type InFlightTracker interface {
|
|
IncrementInFlight(ctx context.Context, nodeID, modelName string) error
|
|
DecrementInFlight(ctx context.Context, nodeID, modelName string) error
|
|
}
|
|
|
|
// NodeManager is used by HTTP endpoints for node registration and lifecycle.
|
|
type NodeManager interface {
|
|
Register(ctx context.Context, node *BackendNode, autoApprove bool) error
|
|
Get(ctx context.Context, nodeID string) (*BackendNode, error)
|
|
GetByName(ctx context.Context, name string) (*BackendNode, error)
|
|
List(ctx context.Context) ([]BackendNode, error)
|
|
Deregister(ctx context.Context, nodeID string) error
|
|
ApproveNode(ctx context.Context, nodeID string) error
|
|
MarkOffline(ctx context.Context, nodeID string) error
|
|
MarkDraining(ctx context.Context, nodeID string) error
|
|
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
|
|
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
|
|
UpdateAuthRefs(ctx context.Context, nodeID, authUserID, apiKeyID string) error
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string) error
|
|
}
|
|
|
|
// BackendClientFactory creates gRPC backend clients.
|
|
type BackendClientFactory interface {
|
|
NewClient(address string, parallel bool) grpc.Backend
|
|
}
|
|
|
|
// tokenClientFactory is the default BackendClientFactory that creates gRPC
|
|
// clients with an optional bearer token for distributed auth.
|
|
type tokenClientFactory struct {
|
|
token string
|
|
}
|
|
|
|
func (f *tokenClientFactory) NewClient(address string, parallel bool) grpc.Backend {
|
|
if f.token != "" {
|
|
return grpc.NewClientWithToken(address, parallel, nil, false, f.token)
|
|
}
|
|
return grpc.NewClient(address, parallel, nil, false)
|
|
}
|