Files
LocalAI/core/services/nodes/interfaces.go
Ettore Di Giacinto bbcaebc1ef feat(concurrency-groups): per-model exclusive groups for backend loading (#9662)
* feat(concurrency-groups): per-model exclusive groups for backend loading

Adds `concurrency_groups: [...]` to model YAML configs. Two models that share
a group cannot be loaded concurrently on the same node — loading one evicts
the others, reusing the existing pinned/busy/retry policy from LRU eviction.

Layered design:
- Watchdog (pkg/model): per-node correctness floor — on every Load(), evict
  any loaded model that shares a group with the requested one. Pinned skips
  surface NeedMore so the loader retries (and ultimately logs a clear
  warning), instead of silently allowing the rule to be violated.
- Distributed scheduler (core/services/nodes): soft anti-affinity hint —
  scheduleNewModel prefers nodes that don't already host a same-group
  model, falling back to eviction only if every candidate has a conflict.
  Composes with NodeSelector at the same point in the candidate pipeline.

Per-node, not cluster-wide: VRAM is a node-local resource, and two heavy
models running on different nodes is fine. The ConfigLoader is wired into
SmartRouter via a small ConcurrencyConflictResolver interface so the nodes
package keeps a narrow surface on core/config.

Refactors the inner LRU eviction body into a shared collectEvictionsLocked
helper and the loader retry loop into retryEnforce(fn, maxRetries, interval),
so both LRU and group enforcement share busy/pinned/retry semantics.

Closes #9659.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(watchdog): sync pinned + concurrency_groups at startup

The startup-time watchdog setup lives in initializeWatchdog (startup.go),
not in startWatchdog (watchdog.go). The latter is only invoked from the
runtime-settings RestartWatchdog path. As a result, neither
SyncPinnedModelsToWatchdog nor SyncModelGroupsToWatchdog ran at boot,
so `pinned: true` and `concurrency_groups: [...]` only became effective
after a settings-driven watchdog restart.

Fix by adding both sync calls to initializeWatchdog. Confirmed end-to-end:
loading model A in group "heavy", then C with no group (coexists),
then B in group "heavy" now correctly evicts A and leaves [B, C].

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(test): satisfy errcheck on new os.Remove in concurrency_groups spec

CI lint runs new-from-merge-base, so the existing pre-existing
`defer os.Remove(tmp.Name())` lines are baseline-grandfathered but the
one introduced by the concurrency_groups YAML round-trip test is held
to errcheck. Wrap the remove in a closure that discards the error.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-05 08:42:50 +02:00

116 lines
6.1 KiB
Go

package nodes
import (
"context"
"time"
grpc "github.com/mudler/LocalAI/pkg/grpc"
)
// ModelRouter is used by SmartRouter for routing decisions and model lifecycle.
type ModelRouter interface {
FindAndLockNodeWithModel(ctx context.Context, modelName string, candidateNodeIDs []string) (*BackendNode, *NodeModel, error)
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
TouchNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int)
SetNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int, state, address string, initialInFlight int) error
SetNodeModelLoadInfo(ctx context.Context, nodeID, modelName string, replicaIndex int, backendType string, optsBlob []byte) error
GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error)
NextFreeReplicaIndex(ctx context.Context, nodeID, modelName string, maxSlots int) (int, error)
CountReplicasOnNode(ctx context.Context, nodeID, modelName string) (int, error)
FindNodeWithVRAM(ctx context.Context, minBytes uint64) (*BackendNode, error)
FindIdleNode(ctx context.Context) (*BackendNode, error)
FindLeastLoadedNode(ctx context.Context) (*BackendNode, error)
FindGlobalLRUModelWithZeroInFlight(ctx context.Context) (*NodeModel, error)
FindLRUModel(ctx context.Context, nodeID string) (*NodeModel, error)
Get(ctx context.Context, nodeID string) (*BackendNode, error)
GetModelScheduling(ctx context.Context, modelName string) (*ModelSchedulingConfig, error)
FindNodesBySelector(ctx context.Context, selector map[string]string) ([]BackendNode, error)
FindNodesWithFreeSlot(ctx context.Context, modelName string, candidateNodeIDs []string) ([]BackendNode, error)
ReserveVRAM(ctx context.Context, nodeID string, bytes uint64) error
ReleaseVRAM(ctx context.Context, nodeID string, bytes uint64) error
FindNodeWithVRAMFromSet(ctx context.Context, minBytes uint64, nodeIDs []string) (*BackendNode, error)
FindIdleNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
FindLeastLoadedNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
GetNodeLabels(ctx context.Context, nodeID string) ([]NodeLabel, error)
FindNodesWithModel(ctx context.Context, modelName string) ([]BackendNode, error)
}
// ConcurrencyConflictResolver returns the names of configured models that
// share at least one concurrency group with the given model. It is satisfied
// by *config.ModelConfigLoader and lets the SmartRouter make group-aware
// placement decisions without importing the config package's full surface.
type ConcurrencyConflictResolver interface {
GetModelsConflictingWith(modelName string) []string
}
// NodeHealthStore is used by HealthMonitor for node status management.
type NodeHealthStore interface {
List(ctx context.Context) ([]BackendNode, error)
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
MarkOffline(ctx context.Context, nodeID string) error
MarkUnhealthy(ctx context.Context, nodeID string) error
MarkHealthy(ctx context.Context, nodeID string) error
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
FindStaleNodes(ctx context.Context, threshold time.Duration) ([]BackendNode, error)
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
}
// ModelLocator is used by RemoteUnloaderAdapter for model discovery.
type ModelLocator interface {
FindNodesWithModel(ctx context.Context, modelName string) ([]BackendNode, error)
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
}
// ModelLookup is used by DistributedModelStore for model existence queries.
type ModelLookup interface {
FindNodeForModel(ctx context.Context, modelName string) (*BackendNode, bool)
ListAllLoadedModels(ctx context.Context) ([]NodeModel, error)
Get(ctx context.Context, nodeID string) (*BackendNode, error)
}
// InFlightTracker is used by InFlightTrackingClient for request counting.
type InFlightTracker interface {
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
}
// NodeManager is used by HTTP endpoints for node registration and lifecycle.
type NodeManager interface {
Register(ctx context.Context, node *BackendNode, autoApprove bool) error
Get(ctx context.Context, nodeID string) (*BackendNode, error)
GetByName(ctx context.Context, name string) (*BackendNode, error)
List(ctx context.Context) ([]BackendNode, error)
Deregister(ctx context.Context, nodeID string) error
ApproveNode(ctx context.Context, nodeID string) error
MarkOffline(ctx context.Context, nodeID string) error
MarkDraining(ctx context.Context, nodeID string) error
MarkHealthy(ctx context.Context, nodeID string) error
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
UpdateAuthRefs(ctx context.Context, nodeID, authUserID, apiKeyID string) error
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
}
// BackendClientFactory creates gRPC backend clients.
type BackendClientFactory interface {
NewClient(address string, parallel bool) grpc.Backend
}
// tokenClientFactory is the default BackendClientFactory that creates gRPC
// clients with an optional bearer token for distributed auth.
type tokenClientFactory struct {
token string
}
func (f *tokenClientFactory) NewClient(address string, parallel bool) grpc.Backend {
if f.token != "" {
return grpc.NewClientWithToken(address, parallel, nil, false, f.token)
}
return grpc.NewClient(address, parallel, nil, false)
}