diff --git a/core/cli/federated.go b/core/cli/federated.go index c61adab0f..10935dc0a 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -14,12 +14,13 @@ type FederatedCLI struct { RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"` Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"` TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"` + UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-size limit in megabytes" group:"api"` } func (f *FederatedCLI) Run(ctx *cliContext.Context) error { warnDeprecatedFlags() - fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker) + fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker, int64(f.UploadLimit)*1024*1024) c, cancel := context.WithCancel(context.Background()) diff --git a/core/p2p/federated.go b/core/p2p/federated.go index a84693875..45035d428 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -9,6 +9,7 @@ import ( "time" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" "github.com/mudler/LocalAI/pkg/clusterrouting" "github.com/mudler/xlog" ) @@ -28,9 +29,13 @@ type FederatedServer struct { requestTable map[string]int loadBalanced bool workerTarget string + bodyLimit int64 // max request body bytes (0 = unlimited) + prefixCfg prefixcache.Config + prefixIndex *prefixcache.Index } -func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer { +func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64) *FederatedServer { + cfg := prefixcache.DefaultConfig() return &FederatedServer{ listenAddr: listenAddr, service: service, @@ -38,6 +43,9 @@ func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, requestTable: map[string]int{}, loadBalanced: loadBalanced, workerTarget: workerTarget, + bodyLimit: bodyLimit, + prefixCfg: cfg, + prefixIndex: prefixcache.NewIndex(cfg), } } @@ -159,6 +167,65 @@ func (fs *FederatedServer) SelectBestServer() string { return best.NodeID } +// affinityPreferred returns the peer the prefix index considers warm for this +// chain, or "" when there is no match strong enough among the candidates. It +// reuses prefixcache's per-model radix-tree Decide; the final load-guarded pick +// is done by clusterrouting.PickWithAffinity so the VRAM tier is preserved. +func affinityPreferred(idx *prefixcache.Index, model string, chain []uint64, candidates []clusterrouting.ReplicaCandidate, cfg prefixcache.Config, now time.Time) string { + if idx == nil || len(chain) == 0 || len(candidates) == 0 { + return "" + } + keys := make([]prefixcache.ReplicaKey, 0, len(candidates)) + for _, c := range candidates { + keys = append(keys, prefixcache.ReplicaKey{NodeID: c.NodeID}) + } + d := idx.Decide(model, chain, keys, now) + if d.HasHot && d.MatchRatio >= cfg.MinPrefixMatch { + return d.Hot.NodeID + } + return "" +} + +// selectPeer chooses the federated peer to serve a request for model with the +// given raw body. It filters candidates by model, computes the prefix chain, +// consults the affinity index, and makes the final load+VRAM-aware pick. It +// returns the chosen peer ID and the chain (so the caller can Observe after a +// successful forward). An empty model and nil body degrade to load+VRAM only. +// Returns "" when no eligible peer is online. +func (fs *FederatedServer) selectPeer(model string, body []byte, now time.Time) (string, []uint64) { + fs.syncTableStatus() + nodes := GetAvailableNodes(fs.service) + // Snapshot candidates under the lock (it only guards requestTable), then + // release before the prefix hashing and tree walk, which are lock-free + // (candidates is a copy; prefixIndex/prefixCfg are set once at construction). + fs.Lock() + candidates := buildFederatedCandidates(nodes, fs.requestTable, now, model) + fs.Unlock() + if len(candidates) == 0 { + return "", nil + } + var chain []uint64 + preferred := "" + if fs.prefixIndex != nil && model != "" && len(body) > 0 { + chain = prefixcache.ExtractChain(model, string(body), fs.prefixCfg) + preferred = affinityPreferred(fs.prefixIndex, model, chain, candidates, fs.prefixCfg, now) + } + best := clusterrouting.PickWithAffinity(candidates, preferred, fs.prefixCfg.BalanceAbsThreshold) + if best == nil { + return "", chain + } + return best.NodeID, chain +} + +// observeServed records that peerID served the given chain for model, so the +// next request sharing that prefix is routed back to the same warm peer. +func (fs *FederatedServer) observeServed(model string, chain []uint64, peerID string, now time.Time) { + if fs.prefixIndex == nil || len(chain) == 0 || peerID == "" || model == "" { + return + } + fs.prefixIndex.Observe(model, chain, prefixcache.ReplicaKey{NodeID: peerID}, now) +} + func (fs *FederatedServer) RecordRequest(nodeID string) { fs.Lock() defer fs.Unlock() diff --git a/core/p2p/federated_test.go b/core/p2p/federated_test.go index b0d2310d9..703e18fe7 100644 --- a/core/p2p/federated_test.go +++ b/core/p2p/federated_test.go @@ -7,6 +7,7 @@ import ( . "github.com/onsi/gomega" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" "github.com/mudler/LocalAI/pkg/clusterrouting" ) @@ -115,3 +116,32 @@ var _ = Describe("extractModel", func() { Expect(extractModel("/x", "", []byte("--multipart-boundary--"))).To(Equal("")) }) }) + +var _ = Describe("affinityPreferred", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + It("returns the warm peer once a chain has been observed for it", func() { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) + chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg) + idx.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref) + + cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}} + Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("warm")) + }) + + It("returns empty when no chain has been observed", func() { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) + chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg) + cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}} + Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("")) + }) + + It("returns empty for a nil index or empty chain", func() { + cfg := prefixcache.DefaultConfig() + Expect(affinityPreferred(nil, "m1", []uint64{1}, nil, cfg, ref)).To(Equal("")) + idx := prefixcache.NewIndex(cfg) + Expect(affinityPreferred(idx, "m1", nil, nil, cfg, ref)).To(Equal("")) + }) +})