diff --git a/core/p2p/affinity_sync.go b/core/p2p/affinity_sync.go new file mode 100644 index 000000000..6cef494e9 --- /dev/null +++ b/core/p2p/affinity_sync.go @@ -0,0 +1,81 @@ +package p2p + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" + "github.com/mudler/edgevpn/pkg/blockchain" + "github.com/mudler/edgevpn/pkg/hub" + "github.com/mudler/edgevpn/pkg/node" + "github.com/mudler/xlog" +) + +// affinitySubjectKey is the hub.Message annotation carrying the logical subject +// (observe vs invalidate) so the receiver can dispatch the way a NATS subject +// would. The generic channel has no subject routing, so we carry it ourselves. +const affinitySubjectKey = "subject" + +// genericChannelPublisher adapts an edgevpn node's generic broadcast channel to +// the prefixcache publisher interface (Publish(subject, v)). It lets a +// federation server reuse prefixcache.Sync for cross-server affinity coherence +// without NATS: each event is JSON-encoded into a hub.Message and broadcast over +// the generic channel (not the slow blockchain ledger). +type genericChannelPublisher struct { + node *node.Node +} + +// Publish satisfies prefixcache's (unexported) publisher interface structurally. +func (p *genericChannelPublisher) Publish(subject string, v any) error { + payload, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("marshalling affinity event: %w", err) + } + return p.node.PublishMessage(&hub.Message{ + Message: string(payload), + Annotations: map[string]interface{}{affinitySubjectKey: subject}, + }) +} + +// applyAffinityMessage decodes a generic-channel affinity message and applies it +// to sync WITHOUT re-broadcasting (ApplyObserve/ApplyInvalidate). now is the +// local clock so TTL is measured per server. Unknown subjects, malformed +// payloads, and nil inputs are ignored (debug-logged), never fatal. +func applyAffinityMessage(sync *prefixcache.Sync, m *hub.Message, now time.Time) { + if sync == nil || m == nil { + return + } + subject, _ := m.Annotations[affinitySubjectKey].(string) + switch subject { + case messaging.SubjectPrefixCacheObserve: + var ev messaging.PrefixCacheObserveEvent + if err := json.Unmarshal([]byte(m.Message), &ev); err != nil { + xlog.Debug("affinity: bad observe payload", "error", err) + return + } + sync.ApplyObserve(ev, now) + case messaging.SubjectPrefixCacheInvalidate: + var ev messaging.PrefixCacheInvalidateEvent + if err := json.Unmarshal([]byte(m.Message), &ev); err != nil { + xlog.Debug("affinity: bad invalidate payload", "error", err) + return + } + sync.ApplyInvalidate(ev) + default: + // Other generic-channel traffic; not ours. + } +} + +// affinityHandler returns the edgevpn generic-channel handler that applies remote +// affinity events to this server's index. It is registered at node construction +// (handlers cannot be added after Start) and reads fs.prefixSync lazily, which is +// safe because messages only arrive after Start, by which point Start has wired +// fs.prefixSync. +func (fs *FederatedServer) affinityHandler() node.Handler { + return func(_ *blockchain.Ledger, m *hub.Message, _ chan *hub.Message) error { + applyAffinityMessage(fs.prefixSync, m, time.Now()) + return nil + } +} diff --git a/core/p2p/affinity_sync_test.go b/core/p2p/affinity_sync_test.go new file mode 100644 index 000000000..066f7f335 --- /dev/null +++ b/core/p2p/affinity_sync_test.go @@ -0,0 +1,48 @@ +package p2p + +import ( + "encoding/json" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/nodes/prefixcache" + "github.com/mudler/edgevpn/pkg/hub" +) + +var _ = Describe("applyAffinityMessage", func() { + ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + observeMsg := func(ev messaging.PrefixCacheObserveEvent) *hub.Message { + payload, _ := json.Marshal(ev) + return &hub.Message{ + Message: string(payload), + Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}, + } + } + + It("applies a peer observe so the local index resolves the warm peer", func() { + cfg := prefixcache.DefaultConfig() + idx := prefixcache.NewIndex(cfg) + sync := prefixcache.NewSync(idx, nil) + chain := prefixcache.ExtractChain("m1", "a fairly long shared system prompt body for the prefix chain", cfg) + + applyAffinityMessage(sync, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m1", Chain: chain, NodeID: "warm", Replica: 0}), ref) + + d := idx.Decide("m1", chain, []prefixcache.ReplicaKey{{NodeID: "warm"}, {NodeID: "cold"}}, ref) + Expect(d.HasHot).To(BeTrue()) + Expect(d.Hot.NodeID).To(Equal("warm")) + }) + + It("ignores malformed, unknown-subject, and nil inputs without panicking", func() { + sync := prefixcache.NewSync(prefixcache.NewIndex(prefixcache.DefaultConfig()), nil) + applyAffinityMessage(sync, &hub.Message{Message: "not-json", Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}}, ref) + applyAffinityMessage(sync, &hub.Message{Message: "{}", Annotations: map[string]interface{}{affinitySubjectKey: "some.other.subject"}}, ref) + applyAffinityMessage(sync, &hub.Message{Message: "{}"}, ref) + applyAffinityMessage(nil, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m"}), ref) + applyAffinityMessage(sync, nil, ref) + Expect(true).To(BeTrue()) + }) +}) diff --git a/core/p2p/federated.go b/core/p2p/federated.go index 8dcb491a2..d12ad7829 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -32,6 +32,7 @@ type FederatedServer struct { bodyLimit int64 // max request body bytes (0 = unlimited) prefixCfg prefixcache.Config prefixIndex *prefixcache.Index + prefixSync *prefixcache.Sync } func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64) *FederatedServer {