diff --git a/core/services/nodes/replicapicker.go b/core/services/nodes/replicapicker.go index 56d383e61..0c784060f 100644 --- a/core/services/nodes/replicapicker.go +++ b/core/services/nodes/replicapicker.go @@ -1,69 +1,17 @@ package nodes -import "time" +import "github.com/mudler/LocalAI/pkg/clusterrouting" -// ReplicaCandidate is the minimum view of a loaded model replica needed to -// apply the routing policy. It is intentionally decoupled from the gorm models -// (BackendNode, NodeModel) so the same picker can run against fresh DB rows -// (SmartRouter.Route → FindAndLockNodeWithModel) and against an in-memory -// snapshot (the per-frontend rotating cache flagged in pkg/model — see TODO -// below). -type ReplicaCandidate struct { - NodeID string - Address string - ReplicaIndex int - InFlight int - LastUsed time.Time - AvailableVRAM uint64 -} +// ReplicaCandidate aliases the canonical type in pkg/clusterrouting. The policy +// implementation moved there so the p2p federation server can share it without +// importing this package (which pulls in gorm). Because this is a type alias, +// existing references such as the LoadedReplicaStats interface method and the +// ReplicaCandidate(rw) row conversion in registry.go remain valid unchanged. +type ReplicaCandidate = clusterrouting.ReplicaCandidate -// PickBestReplica is the single source of truth for which loaded replica of a -// model serves the next request. -// -// Policy (ordered tiers, first non-tie wins): -// 1. Least in-flight wins — primary load-balancing signal. -// 2. Oldest last_used wins — round-robin between equally-loaded replicas. -// Every successful pick refreshes last_used (in FindAndLockNodeWithModel's -// transaction and in TouchNodeModel on cache hits), so the "oldest" tier -// naturally rotates through the candidate set without a separate cursor. -// 3. Largest available_vram wins — cold-start tiebreaker for replicas that -// have never been picked (identical last_used). -// -// Two callers must agree on this policy: -// -// - SmartRouter.Route, via the SQL ORDER BY in FindAndLockNodeWithModel -// (registry.go). That query MUST mirror this function — TestPickerSQLMirror -// asserts both sides agree on a representative dataset. -// -// - The per-frontend rotating-replica cache (NOT YET IMPLEMENTED — see -// pkg/model/loader.go and pkg/model/initializers.go for the integration -// point). When that cache lands, it will call PickBestReplica against an -// in-memory snapshot using locally-tracked in-flight counters and skip the -// per-request DB round-trip. -// -// Returns nil when the candidate list is empty. Does not allocate. +// PickBestReplica delegates to the canonical implementation in pkg/clusterrouting. +// The SQL ORDER BY in FindAndLockNodeWithModel (registry.go) must mirror it; the +// "policy mirror" spec in registry_test.go asserts they agree. func PickBestReplica(candidates []ReplicaCandidate) *ReplicaCandidate { - if len(candidates) == 0 { - return nil - } - best := &candidates[0] - for i := 1; i < len(candidates); i++ { - c := &candidates[i] - if betterReplica(c, best) { - best = c - } - } - return best -} - -// betterReplica reports whether candidate a is preferred over candidate b -// under the policy documented on PickBestReplica. -func betterReplica(a, b *ReplicaCandidate) bool { - if a.InFlight != b.InFlight { - return a.InFlight < b.InFlight - } - if !a.LastUsed.Equal(b.LastUsed) { - return a.LastUsed.Before(b.LastUsed) - } - return a.AvailableVRAM > b.AvailableVRAM + return clusterrouting.PickBestReplica(candidates) } diff --git a/pkg/clusterrouting/clusterrouting_suite_test.go b/pkg/clusterrouting/clusterrouting_suite_test.go new file mode 100644 index 000000000..9301ff662 --- /dev/null +++ b/pkg/clusterrouting/clusterrouting_suite_test.go @@ -0,0 +1,13 @@ +package clusterrouting + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestClusterRouting(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ClusterRouting Suite") +} diff --git a/pkg/clusterrouting/replica.go b/pkg/clusterrouting/replica.go new file mode 100644 index 000000000..9c3beceff --- /dev/null +++ b/pkg/clusterrouting/replica.go @@ -0,0 +1,66 @@ +// Package clusterrouting holds the transport-agnostic replica selection policy +// shared by the NATS distributed mode (core/services/nodes) and the p2p +// federation server (core/p2p). It deliberately depends on nothing heavier than +// the standard library so either transport can import it without pulling in a +// database driver or message bus. +package clusterrouting + +import "time" + +// ReplicaCandidate is the minimum view of a loaded model replica needed to +// apply the routing policy. It is intentionally decoupled from any storage +// model (gorm rows on the NATS side, gossiped NodeData on the p2p side) so the +// same picker runs against fresh DB rows, an in-memory snapshot, or p2p gossip. +type ReplicaCandidate struct { + NodeID string + Address string + ReplicaIndex int + InFlight int + LastUsed time.Time + AvailableVRAM uint64 +} + +// PickBestReplica is the single source of truth for which loaded replica of a +// model serves the next request. +// +// Policy (ordered tiers, first non-tie wins): +// 1. Least in-flight wins: primary load-balancing signal. +// 2. Oldest last_used wins: round-robin between equally-loaded replicas. +// Every successful pick refreshes last_used (in the NATS +// FindAndLockNodeWithModel transaction and in TouchNodeModel on cache +// hits), so the "oldest" tier naturally rotates through the candidate set +// without a separate cursor. +// 3. Largest available_vram wins: cold-start tiebreaker for replicas that +// have never been picked (identical last_used). +// +// The NATS SQL ORDER BY in FindAndLockNodeWithModel (registry.go) MUST mirror +// this function; registry_test.go's "agrees with PickBestReplica on a seeded +// dataset (policy mirror)" spec asserts both sides agree on a representative +// dataset and fails fast if they drift. +// +// Returns nil when the candidate list is empty. Does not allocate. +func PickBestReplica(candidates []ReplicaCandidate) *ReplicaCandidate { + if len(candidates) == 0 { + return nil + } + best := &candidates[0] + for i := 1; i < len(candidates); i++ { + c := &candidates[i] + if betterReplica(c, best) { + best = c + } + } + return best +} + +// betterReplica reports whether candidate a is preferred over candidate b +// under the policy documented on PickBestReplica. +func betterReplica(a, b *ReplicaCandidate) bool { + if a.InFlight != b.InFlight { + return a.InFlight < b.InFlight + } + if !a.LastUsed.Equal(b.LastUsed) { + return a.LastUsed.Before(b.LastUsed) + } + return a.AvailableVRAM > b.AvailableVRAM +} diff --git a/core/services/nodes/replicapicker_test.go b/pkg/clusterrouting/replica_test.go similarity index 95% rename from core/services/nodes/replicapicker_test.go rename to pkg/clusterrouting/replica_test.go index d71b83808..5627fa72d 100644 --- a/core/services/nodes/replicapicker_test.go +++ b/pkg/clusterrouting/replica_test.go @@ -1,4 +1,4 @@ -package nodes +package clusterrouting import ( "time" @@ -36,7 +36,7 @@ var _ = Describe("PickBestReplica", func() { It("uses oldest last_used as the tiebreaker when in_flight ties", func() { // All three tied on in_flight=0. Without last_used, available_vram - // would pin every pick to the fattest node — the exact bug + // would pin every pick to the fattest node: the exact bug // fix(distributed): round-robin replicas of the same model addressed. cs := []ReplicaCandidate{ {NodeID: "fat-recent", InFlight: 0, LastUsed: ref.Add(2 * time.Second), AvailableVRAM: 24_000_000_000}, @@ -47,7 +47,7 @@ var _ = Describe("PickBestReplica", func() { }) It("uses largest available_vram as the final tiebreaker", func() { - // in_flight tied AND last_used tied — pick the largest GPU. + // in_flight tied AND last_used tied: pick the largest GPU. cs := []ReplicaCandidate{ {NodeID: "small", InFlight: 0, LastUsed: ref, AvailableVRAM: 8_000_000_000}, {NodeID: "fat", InFlight: 0, LastUsed: ref, AvailableVRAM: 24_000_000_000},