diff --git a/core/p2p/federated.go b/core/p2p/federated.go index cd1793a57..a84693875 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -1,8 +1,10 @@ package p2p import ( + "encoding/json" "fmt" "math/rand/v2" + "strings" "sync" "time" @@ -78,17 +80,21 @@ func (fs *FederatedServer) syncTableStatus() { } // buildFederatedCandidates maps the currently-online federated peers into the -// shared routing policy's candidate form. InFlight comes from the per-peer -// request counter (lower means fewer requests routed there); AvailableVRAM -// comes from the gossiped NodeData. LastUsed is intentionally left zero: -// federation has no per-peer last-used clock, and the request counter already -// spreads load, so the VRAM tier deterministically breaks in-flight ties. -func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time) []clusterrouting.ReplicaCandidate { +// shared routing policy's candidate form, optionally filtered to peers that can +// serve model. A peer with a non-empty advertised model set that lacks model is +// excluded; a peer with an empty set is treated as "unknown" and stays eligible +// (so older peers and mid-convergence peers are not starved). When model is "", +// no model filtering is applied. InFlight comes from the per-peer request +// counter; AvailableVRAM from the gossiped NodeData; LastUsed is left zero. +func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time, model string) []clusterrouting.ReplicaCandidate { candidates := make([]clusterrouting.ReplicaCandidate, 0, len(nodes)) for _, nd := range nodes { if !nd.IsOnlineAt(now) { continue } + if !servesModel(nd, model) { + continue + } candidates = append(candidates, clusterrouting.ReplicaCandidate{ NodeID: nd.ID, InFlight: requestTable[nd.ID], @@ -98,6 +104,40 @@ func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]i return candidates } +// servesModel reports whether nd is eligible to serve model. An empty model +// means "no filter". An empty advertised set means "unknown" and is eligible. +func servesModel(nd schema.NodeData, model string) bool { + if model == "" || len(nd.Models) == 0 { + return true + } + for _, m := range nd.Models { + if m == model { + return true + } + } + return false +} + +// extractModel best-effort resolves the target model of a buffered request, +// cheapest source first: an explicit query value, then the JSON body "model" +// field. Returns "" when it cannot be determined (for example a multipart or +// websocket request), in which case the caller routes by load/affinity only. +func extractModel(path, queryModel string, body []byte) string { + if strings.TrimSpace(queryModel) != "" { + return queryModel + } + if len(body) == 0 { + return "" + } + var probe struct { + Model string `json:"model"` + } + if err := json.Unmarshal(body, &probe); err != nil { + return "" + } + return probe.Model +} + // SelectBestServer picks the online federated peer to serve the next request // using the shared cluster-routing policy (least in-flight, then most free // VRAM). Returns "" when no peer is online. @@ -109,7 +149,7 @@ func (fs *FederatedServer) SelectBestServer() string { nodes := GetAvailableNodes(fs.service) fs.Lock() defer fs.Unlock() - candidates := buildFederatedCandidates(nodes, fs.requestTable, time.Now()) + candidates := buildFederatedCandidates(nodes, fs.requestTable, time.Now(), "") best := clusterrouting.PickBestReplica(candidates) if best == nil { xlog.Debug("No online federated peers to select", "request_table", fs.requestTable) diff --git a/core/p2p/federated_test.go b/core/p2p/federated_test.go index eb9eea145..b0d2310d9 100644 --- a/core/p2p/federated_test.go +++ b/core/p2p/federated_test.go @@ -20,7 +20,7 @@ var _ = Describe("buildFederatedCandidates", func() { {ID: "online", LastSeen: onlineSeen}, {ID: "offline", LastSeen: offlineSeen}, } - cands := buildFederatedCandidates(nodes, map[string]int{}, ref) + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "") Expect(cands).To(HaveLen(1)) Expect(cands[0].NodeID).To(Equal("online")) }) @@ -30,7 +30,7 @@ var _ = Describe("buildFederatedCandidates", func() { {ID: "a", LastSeen: onlineSeen}, {ID: "b", LastSeen: onlineSeen}, } - cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref) + cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref, "") byID := map[string]int{} for _, c := range cands { byID[c.NodeID] = c.InFlight @@ -43,7 +43,7 @@ var _ = Describe("buildFederatedCandidates", func() { nodes := []schema.NodeData{ {ID: "gpu", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000}, } - cands := buildFederatedCandidates(nodes, map[string]int{}, ref) + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "") Expect(cands[0].AvailableVRAM).To(Equal(uint64(24_000_000_000))) }) @@ -55,9 +55,63 @@ var _ = Describe("buildFederatedCandidates", func() { {ID: "idle-small", LastSeen: onlineSeen, AvailableVRAM: 8_000_000_000}, {ID: "idle-big", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000}, } - cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref) + cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref, "") best := clusterrouting.PickBestReplica(cands) Expect(best).ToNot(BeNil()) Expect(best.NodeID).To(Equal("idle-big")) }) }) + +var _ = Describe("model-aware candidate building", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + seen := ref.Add(-10 * time.Second) + + It("keeps peers that advertise the requested model", func() { + nodes := []schema.NodeData{ + {ID: "has", LastSeen: seen, Models: []string{"m1", "m2"}}, + {ID: "hasnot", LastSeen: seen, Models: []string{"other"}}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1") + Expect(cands).To(HaveLen(1)) + Expect(cands[0].NodeID).To(Equal("has")) + }) + + It("keeps peers with an empty (unknown) model set eligible for any model", func() { + nodes := []schema.NodeData{ + {ID: "unknown", LastSeen: seen, Models: nil}, + {ID: "hasnot", LastSeen: seen, Models: []string{"other"}}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1") + Expect(cands).To(HaveLen(1)) + Expect(cands[0].NodeID).To(Equal("unknown")) + }) + + It("does not filter when the requested model is empty", func() { + nodes := []schema.NodeData{ + {ID: "a", LastSeen: seen, Models: []string{"x"}}, + {ID: "b", LastSeen: seen, Models: []string{"y"}}, + } + cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "") + Expect(cands).To(HaveLen(2)) + }) +}) + +var _ = Describe("extractModel", func() { + It("reads the JSON body model field", func() { + body := []byte(`{"model":"llama-3","messages":[]}`) + Expect(extractModel("/v1/chat/completions", "", body)).To(Equal("llama-3")) + }) + + It("prefers a path/query model over the body", func() { + body := []byte(`{"model":"frombody"}`) + Expect(extractModel("/x", "fromquery", body)).To(Equal("fromquery")) + }) + + It("returns empty when no model is present", func() { + Expect(extractModel("/x", "", []byte(`{"messages":[]}`))).To(Equal("")) + }) + + It("returns empty on non-JSON / unparseable body without panicking", func() { + Expect(extractModel("/x", "", []byte("--multipart-boundary--"))).To(Equal("")) + }) +})