From aa5da2e5f22a785f345e0a05dd8a318a8a50976e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 30 Apr 2026 04:20:20 +0000 Subject: [PATCH] ipn/ipnlocal, control/controlclient: process node adds/removes in constant time For large tailnets (~50k+ nodes) with frequent peer churn (ephemeral GitHub Actions workers etc.), tailscaled used to rebuild the full netmap and fan it out on the IPN bus on every MapResponse that added or removed a peer. There were two O(N) costs per delta: the full netmap rebuild + every Notify.NetMap encode to every bus watcher. This change tackles both: 1. Plumb O(1) peer add/remove through the delta path. PeersChanged and PeersRemoved no longer prevent the delta happy path; instead, they mutate the per-node-backend peer map in place. 2. Restrict ipn.Notify.NetMap emission to the platforms whose host GUIs still depend on it (Windows, macOS, iOS) and migrate in-tree consumers off it everywhere else: - Migrate reactive consumers (containerboot, kube agents, sniproxy, tsconsensus, etc.) off Notify.NetMap to the previously-added Notify.SelfChange signal so they no longer have to subscribe to the full netmap. - Add ipn.NotifyNoNetMap so GUI clients on "legacy-emit" platforms that have already migrated can opt out of the per-watcher NetMap encode. - Gate Notify.NetMap emission on the producer side by a compile- time GOOS check, so the supporting code is dead-code-eliminated on Linux and other geese where no GUI consumer needs it. Re-running BenchmarkGiantTailnet from tstest/largetailnet, which was added along with baseline numbers on unmodified main in ad5436af0d57, the per-delta cost (one peer add+remove pair) is now ~O(1) regardless of tailnet size N: N no-watcher (ms/op) bus-watcher (ms/op) before now factor before now factor 10000 32 0.11 300x 166 0.13 1300x 50000 222 0.11 2000x 865 0.13 6700x 100000 504 0.12 4100x 1765 0.13 13400x 250000 1551 0.12 12500x 4696 0.15 32400x Updates #12542 Change-Id: I94e34b37331d1a8ec74c299deffadf4d061fda9e Signed-off-by: Brad Fitzpatrick --- client/local/local.go | 26 +- cmd/tailscale/cli/debug.go | 11 +- control/controlclient/auto.go | 76 ++++- control/controlclient/direct.go | 51 +++ control/controlclient/map.go | 55 +++- ipn/backend.go | 168 ++++++++-- ipn/ipnlocal/bus.go | 47 ++- ipn/ipnlocal/bus_test.go | 33 +- ipn/ipnlocal/local.go | 378 ++++++++++++++++++----- ipn/ipnlocal/local_test.go | 200 +++++++++++- ipn/ipnlocal/node_backend.go | 94 +++++- ipn/localapi/debug.go | 2 - ipn/localapi/localapi.go | 58 +++- ipn/localapi/localapi_test.go | 60 ++++ ipn/serve.go | 2 +- tsnet/packet_filter_test.go | 29 +- tsnet/tsnet_test.go | 6 +- tstest/integration/integration_test.go | 25 +- tstest/largetailnet/delta_test.go | 279 +++++++++++++++++ tstest/largetailnet/largetailnet_test.go | 24 +- types/netmap/nodemut.go | 63 ++-- types/netmap/nodemut_test.go | 43 ++- wgengine/magicsock/magicsock.go | 2 +- 23 files changed, 1521 insertions(+), 211 deletions(-) create mode 100644 tstest/largetailnet/delta_test.go diff --git a/client/local/local.go b/client/local/local.go index 1a2d7342b..50050eb1b 100644 --- a/client/local/local.go +++ b/client/local/local.go @@ -1094,11 +1094,13 @@ func (lc *Client) DNSConfig(ctx context.Context) (*tailcfg.DNSConfig, error) { } // PeerByID returns a peer's current full [tailcfg.Node] looked up by its -// [tailcfg.NodeID], in O(1) time on the daemon side. It returns an error -// if no peer with that NodeID is in the current netmap. +// [tailcfg.NodeID]. It returns an error if no peer with that NodeID is in the +// current netmap. // -// It is intended for callers that need the latest state of a single peer -// without fetching the entire netmap. +// It is intended for callers that observed a peer-mutation signal (e.g. +// [ipn.Notify.PeerChangedPatch] or [ipn.Notify.PeersChanged]) and want the +// latest state of the affected node without having to apply the patch +// themselves. func (lc *Client) PeerByID(ctx context.Context, id tailcfg.NodeID) (*tailcfg.Node, error) { body, err := lc.get200(ctx, "/localapi/v0/peer-by-id?id="+strconv.FormatInt(int64(id), 10)) if err != nil { @@ -1107,6 +1109,22 @@ func (lc *Client) PeerByID(ctx context.Context, id tailcfg.NodeID) (*tailcfg.Nod return decodeJSON[*tailcfg.Node](body) } +// UserProfile returns the current [tailcfg.UserProfile] for the given +// [tailcfg.UserID]. It returns an error if no user with that UserID is in the +// current netmap. +// +// It is the LocalAPI fallback for IPN-bus consumers that see a UserID +// referenced by a peer Node and want to resolve it to a UserProfile. Sessions +// opted in to [ipn.NotifyPeerChanges] / [ipn.NotifyPeerPatches] also receive +// UserProfiles automatically via [ipn.Notify.UserProfiles]. +func (lc *Client) UserProfile(ctx context.Context, id tailcfg.UserID) (*tailcfg.UserProfile, error) { + body, err := lc.get200(ctx, "/localapi/v0/user-profile?id="+strconv.FormatInt(int64(id), 10)) + if err != nil { + return nil, err + } + return decodeJSON[*tailcfg.UserProfile](body) +} + // PingOpts contains options for the ping request. // // The zero value is valid, which means to use defaults. diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index 3531172bb..f6972f004 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -268,8 +268,7 @@ func debugCmd() *ffcli.Command { ShortHelp: "Subscribe to IPN message bus", FlagSet: (func() *flag.FlagSet { fs := newFlagSet("watch-ipn") - fs.BoolVar(&watchIPNArgs.netmap, "netmap", true, "include netmap in messages") - fs.BoolVar(&watchIPNArgs.initial, "initial", false, "include initial status") + fs.BoolVar(&watchIPNArgs.initial, "initial", false, "include the initial backend State and Prefs in the first message") fs.BoolVar(&watchIPNArgs.rateLimit, "rate-limit", true, "rate limit messages") fs.IntVar(&watchIPNArgs.count, "count", 0, "exit after printing this many statuses, or 0 to keep going forever") return fs @@ -632,16 +631,15 @@ func runPrefs(ctx context.Context, args []string) error { } var watchIPNArgs struct { - netmap bool initial bool rateLimit bool count int } func runWatchIPN(ctx context.Context, args []string) error { - var mask ipn.NotifyWatchOpt + mask := ipn.NotifyPeerChanges | ipn.NotifyPeerPatches if watchIPNArgs.initial { - mask = ipn.NotifyInitialState | ipn.NotifyInitialPrefs | ipn.NotifyInitialNetMap + mask |= ipn.NotifyInitialState | ipn.NotifyInitialPrefs } if watchIPNArgs.rateLimit { mask |= ipn.NotifyRateLimit @@ -657,9 +655,6 @@ func runWatchIPN(ctx context.Context, args []string) error { if err != nil { return err } - if !watchIPNArgs.netmap { - n.NetMap = nil - } j, _ := json.MarshalIndent(n, "", "\t") fmt.Printf("%s\n", j) } diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index 05c7552c8..a78db5acc 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -20,10 +20,12 @@ "tailscale.com/types/netmap" "tailscale.com/types/persist" "tailscale.com/types/structs" + "tailscale.com/types/views" "tailscale.com/util/backoff" "tailscale.com/util/clientmetric" "tailscale.com/util/execqueue" "tailscale.com/util/testenv" + "tailscale.com/wgengine/filter" ) type LoginGoal struct { @@ -479,11 +481,77 @@ func (mrs mapRoutineState) UpdateNetmapDelta(muts []netmap.NodeMutation) bool { ctx, cancel := context.WithTimeout(c.mapCtx, 2*time.Second) defer cancel() - var ok bool - err := c.observerQueue.RunSync(ctx, func() { - ok = ndu.UpdateNetmapDelta(muts) + ch := make(chan bool, 1) + c.observerQueue.Add(func() { + ch <- ndu.UpdateNetmapDelta(muts) }) - return err == nil && ok + select { + case ok := <-ch: + return ok + case <-ctx.Done(): + return false + } +} + +var ( + _ PacketFilterUpdater = mapRoutineState{} + _ UserProfileUpdater = mapRoutineState{} +) + +// UpdatePacketFilter implements [PacketFilterUpdater] by forwarding to +// [Auto.observer] if it implements [PacketFilterUpdater]. It returns +// false (signaling fall back to a full netmap rebuild) if the +// downstream observer doesn't implement [PacketFilterUpdater] or isn't +// in a state to accept updates. +func (mrs mapRoutineState) UpdatePacketFilter(rules views.Slice[tailcfg.FilterRule], parsed []filter.Match) bool { + c := mrs.c + c.mu.Lock() + goodState := c.loggedIn && c.inMapPoll + pfu, ok := c.observer.(PacketFilterUpdater) + c.mu.Unlock() + if !goodState || !ok { + return false + } + ctx, cancel := context.WithTimeout(c.mapCtx, 2*time.Second) + defer cancel() + ch := make(chan bool, 1) + c.observerQueue.Add(func() { + ch <- pfu.UpdatePacketFilter(rules, parsed) + }) + select { + case applied := <-ch: + return applied + case <-ctx.Done(): + return false + } +} + +// UpdateUserProfiles implements [UserProfileUpdater] by forwarding to +// [Auto.observer] if it implements [UserProfileUpdater]. It returns +// false (signaling fall back to a full netmap rebuild) if the +// downstream observer doesn't implement [UserProfileUpdater] or isn't +// in a state to accept updates. +func (mrs mapRoutineState) UpdateUserProfiles(profiles map[tailcfg.UserID]tailcfg.UserProfileView) bool { + c := mrs.c + c.mu.Lock() + goodState := c.loggedIn && c.inMapPoll + upu, ok := c.observer.(UserProfileUpdater) + c.mu.Unlock() + if !goodState || !ok { + return false + } + ctx, cancel := context.WithTimeout(c.mapCtx, 2*time.Second) + defer cancel() + ch := make(chan bool, 1) + c.observerQueue.Add(func() { + ch <- upu.UpdateUserProfiles(profiles) + }) + select { + case applied := <-ch: + return applied + case <-ctx.Done(): + return false + } } var _ patchDiscoKeyer = mapRoutineState{} diff --git a/control/controlclient/direct.go b/control/controlclient/direct.go index 032999cb9..ead5c0a45 100644 --- a/control/controlclient/direct.go +++ b/control/controlclient/direct.go @@ -56,6 +56,7 @@ "tailscale.com/types/netmap" "tailscale.com/types/persist" "tailscale.com/types/tkatype" + "tailscale.com/types/views" "tailscale.com/util/clientmetric" "tailscale.com/util/eventbus" "tailscale.com/util/singleflight" @@ -64,6 +65,7 @@ "tailscale.com/util/testenv" "tailscale.com/util/vizerror" "tailscale.com/util/zstdframe" + "tailscale.com/wgengine/filter" ) // Direct is the client that connects to a tailcontrol server for a node. @@ -226,6 +228,9 @@ type NetmapUpdater interface { // rather than just full updates. type NetmapDeltaUpdater interface { // UpdateNetmapDelta is called with discrete changes to the network map. + // The mutation slice may contain [netmap.NodeMutationAdd] and + // [netmap.NodeMutationRemove] entries when peers were added or removed, + // alongside per-field patches. // // The ok result is whether the implementation was able to apply the // mutations. It might return false if its internal state doesn't @@ -234,6 +239,52 @@ type NetmapDeltaUpdater interface { UpdateNetmapDelta([]netmap.NodeMutation) (ok bool) } +// PacketFilterUpdater is an optional interface that can be implemented by +// NetmapUpdater implementations to receive incremental packet-filter updates +// without a full netmap rebuild. +// +// It exists because the packet filter currently changes on every peer +// addition, so a MapResponse carrying PeersChanged almost always also carries +// PacketFilter (or PacketFilters). Handling the filter narrowly keeps peer +// churn O(1) on the controlclient side. +type PacketFilterUpdater interface { + // UpdatePacketFilter is called when a MapResponse's PacketFilter (or + // PacketFilters) changed. rules is the already-merged concatenation of + // the session's named packet filter chunks; parsed is the parsed form. + // + // It returns false to signal the caller to fall back to a full + // netmap rebuild. Proxy/forwarder implementations return false when + // their downstream destination doesn't implement + // [PacketFilterUpdater]; concrete implementations return true on + // successful apply. + UpdatePacketFilter(rules views.Slice[tailcfg.FilterRule], parsed []filter.Match) bool +} + +// UserProfileUpdater is an optional interface that can be implemented by +// NetmapUpdater implementations to receive incremental UserProfile updates +// without a full netmap rebuild. +// +// It exists so consumers of [ipn.Notify.UserProfiles] can be told about +// new or updated UserProfiles before (or with) the [ipn.Notify.PeersChanged] +// or [ipn.Notify.PeerChangedPatch] entry that references the corresponding +// UserID. +type UserProfileUpdater interface { + // UpdateUserProfiles is called when a MapResponse carries UserProfiles + // entries. profiles is the new/updated subset (NOT the full map); + // implementations should merge with whatever they already know. + // + // The values are [tailcfg.UserProfileView]s sharing backing memory + // with the caller's tracking map; implementations may store them + // directly without copying. + // + // It returns false to signal the caller to fall back to a full + // netmap rebuild. Proxy/forwarder implementations return false when + // their downstream destination doesn't implement + // [UserProfileUpdater]; concrete implementations return true on + // successful apply. + UpdateUserProfiles(profiles map[tailcfg.UserID]tailcfg.UserProfileView) bool +} + // patchDiscoKeyer is an optional interface that can be implemented by an [Observer] to be // notified about node disco keys received out-of-band from control, via // existing connection state. diff --git a/control/controlclient/map.go b/control/controlclient/map.go index 34b5ecc55..337c5012d 100644 --- a/control/controlclient/map.go +++ b/control/controlclient/map.go @@ -316,9 +316,11 @@ func (ms *mapSession) handleNonKeepAliveMapResponse(ctx context.Context, resp *t } if ms.tryHandleIncrementally(resp) { + metricMapResponseHandledIncrementally.Add(1) ms.occasionallyPrintSummary(ms.lastNetmapSummary) return nil } + metricMapResponseHandledFullRebuild.Add(1) // We have to rebuild the whole netmap (lots of garbage & work downstream of // our UpdateFullNetmap call). This is the part we tried to avoid but @@ -393,11 +395,49 @@ func (ms *mapSession) tryHandleIncrementally(res *tailcfg.MapResponse) bool { if !ok { return false } + // If the response carries a new packet filter, the updater must + // support pushing it narrowly; otherwise fall back to a full netmap + // rebuild. PacketFilter/PacketFilters are no longer in + // mapResponseContainsNonPatchFields, so MutationsFromMapResponse will + // happily return mutations alongside a filter change — we need to + // deliver the filter separately before those mutations land. + if res.PacketFilter != nil || res.PacketFilters != nil { + pfu, ok := ms.netmapUpdater.(PacketFilterUpdater) + if !ok { + return false + } + if !pfu.UpdatePacketFilter(ms.lastPacketFilterRules, ms.lastParsedPacketFilter) { + return false + } + } + // Same shape for UserProfiles: deliver any new/updated profiles before + // the peer mutations that may reference them, so bus consumers never + // see a UserID for which a profile hasn't been published. The values + // are read from ms.lastUserProfile (just populated by + // updateStateFromResponse) so views are shared with mapSession's + // store; downstream consumers can use [UserProfileView.Equal] for + // dedup without copying. + if len(res.UserProfiles) > 0 { + upu, ok := ms.netmapUpdater.(UserProfileUpdater) + if !ok { + return false + } + profiles := make(map[tailcfg.UserID]tailcfg.UserProfileView, len(res.UserProfiles)) + for _, up := range res.UserProfiles { + profiles[up.ID] = ms.lastUserProfile[up.ID] + } + if !upu.UpdateUserProfiles(profiles) { + return false + } + } mutations, ok := netmap.MutationsFromMapResponse(res, time.Now()) - if ok && len(mutations) > 0 { + if !ok { + return false + } + if len(mutations) > 0 { return nud.UpdateNetmapDelta(mutations) } - return ok + return true } // updateStats are some stats from updateStateFromResponse, primarily for @@ -697,6 +737,17 @@ func (ms *mapSession) updateStateFromResponse(resp *tailcfg.MapResponse) { patchifiedPeer = clientmetric.NewCounter("controlclient_patchified_peer") patchifiedPeerEqual = clientmetric.NewCounter("controlclient_patchified_peer_equal") + + // metricMapResponseHandledIncrementally counts non-keepalive MapResponses + // that were processed via [mapSession.tryHandleIncrementally] (i.e. the + // "fast" delta path that avoids rebuilding the full netmap). + metricMapResponseHandledIncrementally = clientmetric.NewCounter("controlclient_map_response_handled_incrementally") + + // metricMapResponseHandledFullRebuild counts non-keepalive MapResponses + // that fell through to the full netmap rebuild path because they + // carried a field that the incremental path can't handle. See + // [netmap.mapResponseContainsNonPatchFields]. + metricMapResponseHandledFullRebuild = clientmetric.NewCounter("controlclient_map_response_handled_full_rebuild") ) // updatePeersStateFromResponseres updates ms.peers from resp. diff --git a/ipn/backend.go b/ipn/backend.go index 51617e08e..bee4f1295 100644 --- a/ipn/backend.go +++ b/ipn/backend.go @@ -88,10 +88,61 @@ type EngineStatus struct { NotifyInitialClientVersion NotifyWatchOpt = 1 << 11 // if set, the first Notify message (sent immediately) will contain the current ClientVersion if available and if update checks are enabled - // NotifyPeerChanges, if set, causes netmap delta updates to be sent as [tailcfg.PeerChange] rather than a full NetMap. - // Full netmap responses from the control plane are still sent as a full NetMap. PeerChanges are only sent to sessions - // that have opted in to this mode. + // NotifyPeerChanges, if set, opts the watcher into peer-set delta + // notifications: [Notify.PeersChanged] (peer added or full-Node + // replaced) and [Notify.PeersRemoved] (peer removed by NodeID). + // + // Without this bit, peer adds/removes/replacements are not delivered + // over the bus at all (consumers fall back to fetching the netmap on + // demand or, on legacy-emit platforms, to watching [Notify.NetMap]). + // + // Watchers that want narrower per-field updates as well (Online, + // LastSeen, DERPHome, Endpoints) should additionally set + // [NotifyPeerPatches]. Without [NotifyPeerPatches], any per-field + // patch tailscaled would have emitted as a [tailcfg.PeerChange] is + // promoted into a full-Node entry in [Notify.PeersChanged] for this + // watcher, so a watcher that opts only into [NotifyPeerChanges] still + // observes every per-peer mutation; it just receives them as full + // Nodes rather than narrow patches. The cost is bus bandwidth. + // + // On platforms where the legacy [Notify.NetMap] is still emitted + // (Windows, macOS, iOS, Android), it is permitted to combine this + // with [NotifyInitialNetMap] for backwards compatibility. New code + // should pair this with [NotifyInitialStatus] instead. NotifyPeerChanges NotifyWatchOpt = 1 << 12 + + // NotifyNoNetMap, if set, suppresses the legacy [Notify.NetMap] field on + // runtime (non-initial) Notify messages delivered to this watcher. It + // only matters on platforms where tailscaled still emits NetMap on the + // bus by default — Windows, macOS, and iOS — and is intended for GUI + // clients on those platforms that have migrated to read peers via + // [Notify.PeersChanged] / [LocalClient.NetMap]. The initial-state NetMap + // (sent when [NotifyInitialNetMap] is set) is unaffected. + NotifyNoNetMap NotifyWatchOpt = 1 << 13 + + // NotifyInitialStatus, if set, causes the first Notify message (sent + // immediately) to contain the current [ipnstate.Status] in + // [Notify.InitialStatus]. Together with [Notify.SelfChange] and + // [Notify.PeersChanged] on subsequent messages, it lets a watcher + // stitch together a continuous view of the local node's state without + // fetching the netmap directly. Prefer this over [LocalClient.NetMap] + // for new code that wants a stable, client-facing snapshot type. + NotifyInitialStatus NotifyWatchOpt = 1 << 14 + + // NotifyPeerPatches, if set, opts the watcher into narrow per-field + // peer patches via [Notify.PeerChangedPatch]. It implies + // [NotifyPeerChanges]: a watcher with [NotifyPeerPatches] also + // receives [Notify.PeersChanged] and [Notify.PeersRemoved]. + // + // This is the lower-bandwidth mode: changes to fields that fit in a + // [tailcfg.PeerChange] (currently Online, LastSeen, DERPHome, + // Endpoints) ride as patches; only changes that don't fit ride as + // full Nodes in [Notify.PeersChanged]. + // + // Without this bit but with [NotifyPeerChanges], the producer + // promotes any patch into a full-Node entry in [Notify.PeersChanged] + // for this session, at the cost of bandwidth. + NotifyPeerPatches NotifyWatchOpt = 1 << 15 ) // Notify is a communication from a backend (e.g. tailscaled) to a frontend @@ -113,10 +164,9 @@ type Notify struct { // For State InUseOtherUser, ErrMessage is not critical and just contains the details. ErrMessage *string - LoginFinished *empty.Message // non-nil when/if the login process succeeded - State *State // if non-nil, the new or current IPN state - Prefs *PrefsView // if non-nil && Valid, the new or current preferences - NetMap *netmap.NetworkMap // if non-nil, the new or current netmap + LoginFinished *empty.Message // non-nil when/if the login process succeeded + State *State // if non-nil, the new or current IPN state + Prefs *PrefsView // if non-nil && Valid, the new or current preferences // SelfChange, if non-nil, indicates that this node's own [tailcfg.Node] // has changed: addresses, name, key expiry, capabilities, etc. It carries @@ -125,15 +175,98 @@ type Notify struct { // full netmap. // // Consumers that need additional state (peers, DNS config, packet - // filter) should react to SelfChange by fetching the relevant bits on - // demand via [LocalClient]. + // filter) should react to SelfChange by fetching the full netmap on + // demand via [LocalClient.NetMap]. SelfChange *tailcfg.Node `json:",omitzero"` - // PeerChanges, if non-nil, is a list of [tailcfg.PeerChange] that have occurred since the last - // full netmap update. This is sent in lieu of a full NetMap when [NotifyPeerChanges] is set in - // the session's mask and a netmap update is derived from an incremental MapResponse. - // Full MapResponse updates from the control plane are sent as a full NetMap. - PeerChanges []*tailcfg.PeerChange `json:",omitzero"` + // InitialStatus, if non-nil, is the current [ipnstate.Status]. It is + // only set in the first Notify of a session when the watcher requested + // [NotifyInitialStatus]. Together with subsequent [Notify.SelfChange] + // and [Notify.PeerChanges] messages, it lets a watcher stitch together + // a continuous view of node state without fetching the netmap. + InitialStatus *ipnstate.Status `json:",omitzero"` + + // NetMap, if non-nil, is the full network map. New consumers should prefer + // [LocalClient.NetMap] for one-shot fetches and [Notify.SelfChange] / + // [Notify.PeerChanges] for incremental reactive updates; NetMap on the bus + // is the legacy path retained for hosts whose GUIs have not yet finished + // migrating. It is delivered: + // + // - On the initial Notify if the watcher requested + // [NotifyInitialNetMap] (any platform). + // - On subsequent Notify messages, only when tailscaled is running + // on Windows, macOS, or iOS. On Linux and other platforms it is + // always nil after the initial notify. + // + // Deprecated: this field is only populated on Windows, macOS, and iOS and + // is slated for removal in favor of [Notify.InitialStatus] + + // [Notify.SelfChange] / [Notify.PeerChanges], etc, as this field + // doesn't scale. + NetMap *netmap.NetworkMap + + // PeerChangedPatch, if non-empty, lists narrow per-field peer patches + // since the last Notify (currently Online, LastSeen, DERPHome, + // Endpoints). It mirrors [tailcfg.MapResponse.PeersChangedPatch]. + // + // Peer additions and any peer change that can't be expressed as a + // [tailcfg.PeerChange] travel in [Notify.PeersChanged]; peer removals + // in [Notify.PeersRemoved]. + // + // Watchers must opt in to receive this field by setting + // [NotifyPeerPatches]; without that bit (but with [NotifyPeerChanges]) + // the producer promotes each patch into a full-Node entry in + // [Notify.PeersChanged] instead. + // + // The [tailcfg.PeerChange] type may grow more fields over time; + // consumers that see a [tailcfg.PeerChange] with a field they don't + // recognize should re-fetch the affected node by NodeID via + // [LocalClient.PeerByID] (an O(1) lookup) to learn its current value + // rather than ignoring the change. + PeerChangedPatch []*tailcfg.PeerChange `json:",omitzero"` + + // PeersChanged, if non-empty, lists peers whose full [tailcfg.Node] + // has been added or replaced since the last Notify. A node ID may + // appear here either because it is a brand-new peer or because the + // control plane sent a fresh full Node for an existing peer when the + // change wasn't expressible as a [tailcfg.PeerChange] patch (e.g. a + // CapMap, Addresses, Hostinfo, or Tags change). Consumers should + // upsert by NodeID. + // + // This mirrors [tailcfg.MapResponse.PeersChanged] semantics; peer + // removals travel in [Notify.PeersRemoved] and narrow per-field + // patches in [Notify.PeerChanges]. + PeersChanged []*tailcfg.Node `json:",omitzero"` + + // PeersRemoved, if non-empty, lists [tailcfg.NodeID]s that have been + // removed from the netmap since the last Notify. See + // [Notify.PeersChanged]. This mirrors + // [tailcfg.MapResponse.PeersRemoved]. + PeersRemoved []tailcfg.NodeID `json:",omitzero"` + + // UserProfiles, if non-empty, carries [tailcfg.UserProfileView] + // entries that have been added or updated since the last Notify on + // this session. Watchers must opt in via [NotifyPeerChanges] or + // [NotifyPeerPatches]; this field is gated on the same bits as + // [Notify.PeersChanged] / [Notify.PeerChangedPatch] because its + // only purpose is to let those consumers resolve the [tailcfg.UserID] + // referenced by a peer Node. + // + // The producer guarantees that any UserID referenced by a peer in + // a [Notify.PeersChanged] / [Notify.PeerChangedPatch] entry will + // have its profile delivered either earlier on this same session + // (e.g. via the initial NetMap or via an earlier Notify carrying + // UserProfiles) or in this same Notify. A consumer that sees a + // UserID it doesn't recognize on a session that opted in to + // peer-change notifications can treat it as a bug; the + // [LocalClient.UserProfile] LocalAPI fallback exists for sessions + // that didn't subscribe with the peer-change bits or that need to + // look up a UserID for any other reason. + // + // The values are [tailcfg.UserProfileView] so they share backing + // memory with the producer's tracking maps; consumers should treat + // them as read-only and use [tailcfg.UserProfileView.AsStruct] or + // the per-field accessors to read them. + UserProfiles map[tailcfg.UserID]tailcfg.UserProfileView `json:",omitzero"` Engine *EngineStatus // if non-nil, the new or current wireguard stats BrowseToURL *string // if non-nil, UI should open a browser right now @@ -204,14 +337,11 @@ func (n Notify) String() string { if n.Prefs != nil && n.Prefs.Valid() { fmt.Fprintf(&sb, "%v ", n.Prefs.Pretty()) } - if n.NetMap != nil { - sb.WriteString("NetMap{...} ") - } if n.SelfChange != nil { fmt.Fprintf(&sb, "SelfChange(%v) ", n.SelfChange.StableID) } - if n.PeerChanges != nil { - fmt.Fprintf(&sb, "PeerChanges(%d) ", len(n.PeerChanges)) + if n.PeerChangedPatch != nil { + fmt.Fprintf(&sb, "PeerChangedPatch(%d) ", len(n.PeerChangedPatch)) } if n.Engine != nil { fmt.Fprintf(&sb, "wg=%v ", *n.Engine) diff --git a/ipn/ipnlocal/bus.go b/ipn/ipnlocal/bus.go index 8be508010..60e7af1bc 100644 --- a/ipn/ipnlocal/bus.go +++ b/ipn/ipnlocal/bus.go @@ -5,13 +5,30 @@ import ( "context" + "runtime" "time" "tailscale.com/ipn" "tailscale.com/tailcfg" "tailscale.com/tstime" + "tailscale.com/util/mak" ) +// goosGetsLegacyNetmapNotify reports whether tailscaled, when running on the +// current GOOS, still emits the legacy [ipn.Notify.NetMap] field on runtime +// (non-initial) bus messages. It is true on platforms whose host GUIs have +// not yet finished migrating to the narrower bus signals +// ([ipn.Notify.SelfChange] / [ipn.Notify.PeerChanges]) and the on-demand +// [LocalClient.NetMap] fetch. +// +// runtime.GOOS is a compile-time constant, so the producer-side code that +// builds and ships NetMap on the bus is dead-code-eliminated on Linux and +// other geese where this is false. +const goosGetsLegacyNetmapNotify = runtime.GOOS == "windows" || + runtime.GOOS == "darwin" || + runtime.GOOS == "ios" || + runtime.GOOS == "android" + type rateLimitingBusSender struct { fn func(*ipn.Notify) (keepGoing bool) lastFlush time.Time // last call to fn, or zero value if none @@ -126,11 +143,21 @@ func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify { if dst == nil { dst = &ipn.Notify{Version: src.Version} } - if src.NetMap != nil { + if goosGetsLegacyNetmapNotify && src.NetMap != nil { + // Full netmap supersedes any accumulated peer-change deltas. dst.NetMap = src.NetMap - dst.PeerChanges = nil // full netmap supersedes any accumulated deltas - } else if src.PeerChanges != nil { - dst.PeerChanges = mergePeerChanges(dst.PeerChanges, src.PeerChanges) + dst.PeerChangedPatch = nil + } else if src.PeerChangedPatch != nil { + dst.PeerChangedPatch = mergePeerChangedPatch(dst.PeerChangedPatch, src.PeerChangedPatch) + } + if len(src.PeersChanged) > 0 { + dst.PeersChanged = append(dst.PeersChanged, src.PeersChanged...) + } + if len(src.PeersRemoved) > 0 { + dst.PeersRemoved = append(dst.PeersRemoved, src.PeersRemoved...) + } + for id, up := range src.UserProfiles { + mak.Set(&dst.UserProfiles, id, up) } if src.Engine != nil { dst.Engine = src.Engine @@ -138,10 +165,10 @@ func mergeBoringNotifies(dst, src *ipn.Notify) *ipn.Notify { return dst } -// mergePeerChanges merges new peer changes from src into dst, either -// mutating dst or allocating a new slice if dst is nil, returning the merged result. -// Values in src override those in dst for the same NodeID. -func mergePeerChanges(dst, src []*tailcfg.PeerChange) []*tailcfg.PeerChange { +// mergePeerChangedPatch merges new peer-changed patches from src into dst, +// either mutating dst or allocating a new slice if dst is nil, returning the +// merged result. Values in src override those in dst for the same NodeID. +func mergePeerChangedPatch(dst, src []*tailcfg.PeerChange) []*tailcfg.PeerChange { idxByNode := make(map[tailcfg.NodeID]int, len(dst)) for i, d := range dst { idxByNode[d.NodeID] = i @@ -191,8 +218,7 @@ func mergePeerChangeForIpnBus(old, new *tailcfg.PeerChange) *tailcfg.PeerChange // should be sent on the IPN bus immediately (e.g. to GUIs) without // rate limiting it for a few seconds. // -// It effectively reports whether n contains any field set that's -// not NetMap or Engine. +// PeerChanges and Engine are the only "boring" (rate-limitable) fields. func isNotableNotify(n *ipn.Notify) bool { if n == nil { return false @@ -206,6 +232,7 @@ func isNotableNotify(n *ipn.Notify) bool { n.ErrMessage != nil || n.LoginFinished != nil || n.SelfChange != nil || + n.InitialStatus != nil || !n.DriveShares.IsNil() || n.Health != nil || len(n.IncomingFiles) > 0 || diff --git a/ipn/ipnlocal/bus_test.go b/ipn/ipnlocal/bus_test.go index 048e5bff4..68551b695 100644 --- a/ipn/ipnlocal/bus_test.go +++ b/ipn/ipnlocal/bus_test.go @@ -30,7 +30,10 @@ func TestIsNotableNotify(t *testing.T) { {"empty", &ipn.Notify{}, false}, {"version", &ipn.Notify{Version: "foo"}, false}, {"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false}, - {"peerchanges", &ipn.Notify{PeerChanges: []*tailcfg.PeerChange{{}}}, false}, + {"peerchanges", &ipn.Notify{PeerChangedPatch: []*tailcfg.PeerChange{{}}}, false}, + {"peerschanged", &ipn.Notify{PeersChanged: []*tailcfg.Node{{}}}, false}, + {"peersremoved", &ipn.Notify{PeersRemoved: []tailcfg.NodeID{1}}, false}, + {"userprofiles", &ipn.Notify{UserProfiles: map[tailcfg.UserID]tailcfg.UserProfileView{1: (&tailcfg.UserProfile{}).View()}}, false}, {"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false}, {"selfchange", &ipn.Notify{SelfChange: &tailcfg.Node{}}, true}, } @@ -42,7 +45,7 @@ func TestIsNotableNotify(t *testing.T) { for sf := range rt.Fields() { n := &ipn.Notify{} switch sf.Name { - case "_", "NetMap", "PeerChanges", "SelfChange", "Engine", "Version": + case "_", "NetMap", "PeerChangedPatch", "SelfChange", "PeersChanged", "PeersRemoved", "UserProfiles", "Engine", "Version": // Already covered above or not applicable. continue case "DriveShares": @@ -123,8 +126,10 @@ func (st *rateLimitingBusSenderTester) advance(d time.Duration) { } func TestRateLimitingBusSender(t *testing.T) { - nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)} - nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)} + // Both share NodeID 1 so merge collapses to a single PeerChange and + // the later one (nm2) wins. + nm1 := &ipn.Notify{PeerChangedPatch: []*tailcfg.PeerChange{{NodeID: 1, DERPRegion: 1}}} + nm2 := &ipn.Notify{PeerChangedPatch: []*tailcfg.PeerChange{{NodeID: 1, DERPRegion: 2}}} eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)} eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)} @@ -163,8 +168,8 @@ func TestRateLimitingBusSender(t *testing.T) { t.Fatalf("got %d items; want 2", len(st.got)) } gotn := st.got[1] - if gotn.NetMap != nm2.NetMap { - t.Errorf("got wrong NetMap; got %p", gotn.NetMap) + if !reflect.DeepEqual(gotn.PeerChangedPatch, nm2.PeerChangedPatch) { + t.Errorf("got wrong PeerChangedPatch; got %v want %v", gotn.PeerChangedPatch, nm2.PeerChangedPatch) } if gotn.Engine != eng2.Engine { t.Errorf("got wrong Engine; got %p", gotn.Engine) @@ -208,8 +213,8 @@ func TestRateLimitingBusSender(t *testing.T) { st.advance(5 * time.Second) select { case n := <-flushc: - if n.NetMap != nm2.NetMap { - t.Errorf("got wrong NetMap; got %p", n.NetMap) + if !reflect.DeepEqual(n.PeerChangedPatch, nm2.PeerChangedPatch) { + t.Errorf("got wrong PeerChangedPatch; got %v want %v", n.PeerChangedPatch, nm2.PeerChangedPatch) } case <-time.After(10 * time.Second): t.Error("timeout") @@ -221,7 +226,7 @@ func TestRateLimitingBusSender(t *testing.T) { }) } -func TestMergePeerChanges(t *testing.T) { +func TestMergePeerChangedPatch(t *testing.T) { online := true offline := false @@ -232,7 +237,7 @@ func TestMergePeerChanges(t *testing.T) { new := []*tailcfg.PeerChange{ {NodeID: 2, DERPRegion: 2}, } - got := mergePeerChanges(old, new) + got := mergePeerChangedPatch(old, new) if len(got) != 2 { t.Fatalf("len = %d; want 2", len(got)) } @@ -249,7 +254,7 @@ func TestMergePeerChanges(t *testing.T) { new := []*tailcfg.PeerChange{ {NodeID: 1, DERPRegion: 5, Online: &offline}, } - got := mergePeerChanges(old, new) + got := mergePeerChangedPatch(old, new) if len(got) != 2 { t.Fatalf("len = %d; want 2 (merged, not appended)", len(got)) } @@ -273,7 +278,7 @@ func TestMergePeerChanges(t *testing.T) { {NodeID: 1, DERPRegion: 2}, {NodeID: 3, DERPRegion: 30}, } - got := mergePeerChanges(old, new) + got := mergePeerChangedPatch(old, new) if len(got) != 2 { t.Fatalf("len = %d; want 2", len(got)) } @@ -292,7 +297,7 @@ func TestMergePeerChanges(t *testing.T) { new := []*tailcfg.PeerChange{ {NodeID: 1, Online: &offline}, } - got := mergePeerChanges(old, new) + got := mergePeerChangedPatch(old, new) if len(got) != 1 { t.Fatalf("len = %d; want 1", len(got)) } @@ -311,7 +316,7 @@ func TestMergePeerChanges(t *testing.T) { new := []*tailcfg.PeerChange{ {NodeID: 1, DERPRegion: 1}, } - got := mergePeerChanges(nil, new) + got := mergePeerChangedPatch(nil, new) if len(got) != 1 { t.Fatalf("len = %d; want 1", len(got)) } diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 481436fbe..e8f5154a1 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -152,6 +152,18 @@ type watchSession struct { sessionID string cancel context.CancelFunc // to shut down the session mask ipn.NotifyWatchOpt // watch options for this session + + // lastSentUserProfile is the per-UserID [tailcfg.UserProfileView] + // most recently delivered to this session via [Notify.UserProfiles]. + // On a subsequent send, an incoming entry whose + // [tailcfg.UserProfileView.Equal] reports identity-or-equal-fields + // to the stored view for that UserID is dropped from the + // per-session copy of [Notify.UserProfiles], so the session only + // sees genuinely new or changed profiles. The views share backing + // memory with the producer's tracking maps, so the common + // "control re-announces the same profile" case is a pointer-cheap + // equality check. + lastSentUserProfile map[tailcfg.UserID]tailcfg.UserProfileView } var ( @@ -1338,7 +1350,13 @@ func (b *LocalBackend) UpdateStatus(sb *ipnstate.StatusBuilder) { b.mu.Lock() defer b.mu.Unlock() + b.updateStatusLocked(sb) +} +// updateStatusLocked is the b.mu-holding portion of [LocalBackend.UpdateStatus]. +// +// b.mu must be held. +func (b *LocalBackend) updateStatusLocked(sb *ipnstate.StatusBuilder) { cn := b.currentNode() nm := cn.NetMap() sb.MutateStatus(func(s *ipnstate.Status) { @@ -1540,9 +1558,9 @@ func (b *LocalBackend) WhoIsNodeKey(k key.NodePublic) (n tailcfg.NodeView, u tai cn := b.currentNode() if nid, ok := cn.NodeByKey(k); ok { if n, ok := cn.NodeByID(nid); ok { - up, ok := cn.NetMap().UserProfiles[n.User()] + up, _ := cn.UserByID(n.User()) u = profileFromView(up) - return n, u, ok + return n, u, true } } return n, u, false @@ -1638,14 +1656,32 @@ func (b *LocalBackend) PeerCapsForService(src netip.Addr, svcName tailcfg.Servic // given NodeID, in O(1) time. It returns ok=false if no such peer is in // the current netmap. // -// It is intended for callers that need the latest state of a single peer -// without fetching the entire netmap. +// It is intended for callers that observed a peer-mutation signal (e.g. +// [ipn.Notify.PeerChangedPatch] or [ipn.Notify.PeersChanged]) and want +// the latest state of the affected node without having to apply the patch +// themselves — useful for older clients that don't recognize a new +// [tailcfg.PeerChange] field, or that just don't want to bother. func (b *LocalBackend) PeerByID(id tailcfg.NodeID) (n tailcfg.NodeView, ok bool) { return b.currentNode().NodeByID(id) } +// UserProfile returns the current [tailcfg.UserProfile] for the given UserID, +// in O(1) time. It returns ok=false if no such User is in the current netmap. +// +// It is the LocalAPI/LocalBackend fallback for IPN-bus consumers that see a +// UserID they don't recognize and want to resolve it. +func (b *LocalBackend) UserProfile(id tailcfg.UserID) (u tailcfg.UserProfileView, ok bool) { + return b.currentNode().UserByID(id) +} + func (b *LocalBackend) GetFilterForTest() *filter.Filter { testenv.AssertInTest() + // Take b.mu so the read serializes with [setControlClientStatusLocked], + // which installs the netmap and the filter at separate sub-steps. Without + // this, a test thread that observes the new netmap (via [NetMapWithPeers]) + // can race ahead of the filter store and read the previous filter. + b.mu.Lock() + defer b.mu.Unlock() nb := b.currentNode() return nb.filterAtomic.Load() } @@ -1922,13 +1958,17 @@ func (b *LocalBackend) setControlClientStatusLocked(c controlclient.Client, st c // Notify watchers that the self node may have changed. Reactive // consumers (containerboot, kube agents, sniproxy, etc.) listen on - // this signal and re-fetch peers/DNS via the LocalAPI if they need - // more than self info. + // this signal and re-fetch peers/DNS via [LocalClient.NetMap] if + // they need more than self info. var selfChange *tailcfg.Node if st.NetMap.SelfNode.Valid() { selfChange = st.NetMap.SelfNode.AsStruct() } - b.sendLocked(ipn.Notify{NetMap: st.NetMap, SelfChange: selfChange}) + notify := ipn.Notify{SelfChange: selfChange} + if goosGetsLegacyNetmapNotify { + notify.NetMap = st.NetMap + } + b.sendLocked(notify) // The error here is unimportant as is the result. This will recalculate the suggested exit node // cache the value and push any changes to the IPN bus. @@ -2218,7 +2258,11 @@ func (b *LocalBackend) sysPolicyChanged(policy policyclient.PolicyChange) { } } -var _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil) +var ( + _ controlclient.NetmapDeltaUpdater = (*LocalBackend)(nil) + _ controlclient.PacketFilterUpdater = (*LocalBackend)(nil) + _ controlclient.UserProfileUpdater = (*LocalBackend)(nil) +) // UpdateNetmapDelta implements controlclient.NetmapDeltaUpdater. func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { @@ -2235,9 +2279,27 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo cn := b.currentNode() cn.UpdateNetmapDelta(muts) - if ms, ok := b.sys.MagicSock.GetOK(); ok { - ms.UpdateNetmapDelta(muts) + // Dispatch Add/Remove per-peer to magicsock, and any per-field + // patches via the existing UpdateNetmapDelta path. The per-peer + // methods take c.mu themselves, so we can't call them from inside + // magicsock.UpdateNetmapDelta which already holds c.mu. + peersAddedOrRemoved := false + ms := b.MagicConn() + for _, m := range muts { + switch m := m.(type) { + case netmap.NodeMutationAdd: + ms.UpsertPeer(m.Node) + peersAddedOrRemoved = true + metricNetmapDeltaPeerAdded.Add(1) + case netmap.NodeMutationRemove: + ms.RemovePeer(m.NodeIDBeingMutated()) + peersAddedOrRemoved = true + metricNetmapDeltaPeerRemoved.Add(1) + default: + metricNetmapDeltaPeerPatched.Add(1) + } } + ms.UpdateNetmapDelta(muts) // If auto exit nodes are enabled and our exit node went offline, // we need to schedule picking a new one. @@ -2268,15 +2330,30 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo return true } - if mutationsAreWorthyOfTellingIPNBus(muts) { - // The notifier will strip the netmap based on the watchOpts mask if the watcher - // has indicated it can handle PeerChanges. - notify = &ipn.Notify{NetMap: cn.netMapWithPeers()} - if peerChanges, ok := ipnBusPeerChangesFromNodeMutations(muts); ok { - notify.PeerChanges = peerChanges - } else { + // A single MapResponse can carry adds/removes (full Nodes) AND + // per-field patches in the same delta. Build one Notify that + // reflects all of them; per-session stripping in [sendToLocked] + // hides fields the watcher didn't opt in to (and promotes patches + // into full Nodes for watchers that asked for PeerChanges but not + // PeerPatches). + if peersAddedOrRemoved || mutationsAreWorthyOfTellingIPNBus(muts) { + notify = &ipn.Notify{} + for _, m := range muts { + switch m := m.(type) { + case netmap.NodeMutationAdd: + notify.PeersChanged = append(notify.PeersChanged, m.Node.AsStruct()) + case netmap.NodeMutationRemove: + notify.PeersRemoved = append(notify.PeersRemoved, m.NodeIDBeingMutated()) + } + } + if patches, ok := ipnBusPeerChangedPatchFromNodeMutations(muts); ok && len(patches) > 0 { + notify.PeerChangedPatch = patches + } else if !ok { b.logf("[unexpected] got mutations worthy of telling IPN bus but failed to convert to peer changes") } + if goosGetsLegacyNetmapNotify { + notify.NetMap = cn.netMapWithPeers() + } } else if testenv.InTest() { // In tests, send an empty Notify as a wake-up so end-to-end // integration tests in another repo can check on the status of @@ -2286,6 +2363,58 @@ func (b *LocalBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo return true } +// UpdatePacketFilter implements [controlclient.PacketFilterUpdater]. +// +// It is called by the controlclient when a MapResponse carries a new packet +// filter. Avoiding a full netmap rebuild matters here because the packet +// filter currently changes on every peer add on large tailnets. +func (b *LocalBackend) UpdatePacketFilter(rules views.Slice[tailcfg.FilterRule], parsed []filter.Match) bool { + b.mu.Lock() + defer b.mu.Unlock() + cn := b.currentNode() + if cn.NetMap() == nil { + // No netmap installed yet; the initial full-netmap path will + // take care of installing the filter. + return false + } + metricUpdatePacketFilter.Add(1) + cn.setPacketFilter(rules, parsed) + b.updateFilterLocked(b.pm.CurrentPrefs()) + return true +} + +// UpdateUserProfiles implements [controlclient.UserProfileUpdater]. +// +// It is called by the controlclient when a MapResponse carries new or +// updated [tailcfg.UserProfileView] entries. It merges them into the +// current netmap's UserProfiles so [LocalBackend.UserProfile] can +// resolve them, and emits an [ipn.Notify] with [Notify.UserProfiles] +// populated so IPN-bus consumers (sessions opted in to +// NotifyPeerChanges / NotifyPeerPatches) get the new profiles before +// any subsequent PeersChanged / PeerChangedPatch entries that reference +// these UserIDs. +// +// The views in profiles share backing memory with the controlclient +// caller's tracking map; nodeBackend stores them as-is, and per-bus +// sessions can dedup via [UserProfileView.Equal] without copying. +func (b *LocalBackend) UpdateUserProfiles(profiles map[tailcfg.UserID]tailcfg.UserProfileView) bool { + if len(profiles) == 0 { + return true + } + b.mu.Lock() + defer b.mu.Unlock() + cn := b.currentNode() + if cn.NetMap() == nil { + // No netmap installed yet; the initial full-netmap path will + // take care of installing UserProfiles. + return false + } + metricUpdateUserProfiles.Add(1) + cn.mergeUserProfiles(profiles) + b.sendLocked(ipn.Notify{UserProfiles: profiles}) + return true +} + // mustationsAreWorthyOfRecalculatingSuggestedExitNode reports whether any mutation type in muts is // worthy of recalculating the suggested exit node. func mutationsAreWorthyOfRecalculatingSuggestedExitNode(muts []netmap.NodeMutation, cn *nodeBackend, sid tailcfg.StableNodeID) bool { @@ -2324,32 +2453,40 @@ func mutationsAreWorthyOfRecalculatingSuggestedExitNode(muts []netmap.NodeMutati return false } -// ipnBusPeerChangesFromNodeMutations converts a slice of NodeMutations to a slice of -// *tailcfg.PeerChange for use in ipn.Notify.PeerChanges. -// Multiple mutations to the same node are merged into a single PeerChange. -// If we encounter any mutations that we cannot convert to a PeerChange, we return (nil, false) -// to indicate that the caller should send a Notify with the full netmap instead of -// trying to send granular peer changes. -func ipnBusPeerChangesFromNodeMutations(muts []netmap.NodeMutation) ([]*tailcfg.PeerChange, bool) { +// ipnBusPeerChangedPatchFromNodeMutations converts the patch-shaped subset of +// muts (per-field updates that fit in a [tailcfg.PeerChange]) into a slice of +// [tailcfg.PeerChange] for use in [ipn.Notify.PeerChangedPatch]. Multiple +// mutations against the same node are merged into a single PeerChange. +// +// Add/Remove mutations are skipped (they ride +// [ipn.Notify.PeersChanged]/[ipn.Notify.PeersRemoved]). Any other mutation +// type that doesn't fit a [tailcfg.PeerChange] causes ok=false; the caller +// should fall back to a full netmap rebuild. +func ipnBusPeerChangedPatchFromNodeMutations(muts []netmap.NodeMutation) ([]*tailcfg.PeerChange, bool) { byID := map[tailcfg.NodeID]*tailcfg.PeerChange{} var ordered []*tailcfg.PeerChange - for _, m := range muts { - nid := m.NodeIDBeingMutated() + getOrAdd := func(nid tailcfg.NodeID) *tailcfg.PeerChange { pc := byID[nid] if pc == nil { pc = &tailcfg.PeerChange{NodeID: nid} byID[nid] = pc ordered = append(ordered, pc) } + return pc + } + for _, m := range muts { switch v := m.(type) { + case netmap.NodeMutationAdd, netmap.NodeMutationRemove: + // These go in PeersChanged / PeersRemoved, not as patches. + continue case netmap.NodeMutationOnline: - pc.Online = &v.Online + getOrAdd(v.NodeIDBeingMutated()).Online = &v.Online case netmap.NodeMutationLastSeen: - pc.LastSeen = &v.LastSeen + getOrAdd(v.NodeIDBeingMutated()).LastSeen = &v.LastSeen case netmap.NodeMutationDERPHome: - pc.DERPRegion = v.DERPRegion + getOrAdd(v.NodeIDBeingMutated()).DERPRegion = v.DERPRegion case netmap.NodeMutationEndpoints: - pc.Endpoints = v.Endpoints + getOrAdd(v.NodeIDBeingMutated()).Endpoints = v.Endpoints default: return nil, false } @@ -2979,7 +3116,7 @@ func (b *LocalBackend) updateFilterLocked(prefs ipn.PrefsView) { for i := range addrs.Len() { localNetsB.AddPrefix(addrs.At(i)) } - packetFilter = netMap.PacketFilter + packetFilter = cn.PacketFilter() if cn.unlockedNodesPermitted(packetFilter) { b.health.SetUnhealthy(invalidPacketFilterWarnable, nil) @@ -3317,9 +3454,21 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A var ini *ipn.Notify + // Build the engine half of the InitialStatus before taking b.mu, since + // b.e.UpdateStatus has its own locking and shouldn't be called under + // b.mu (lock-ordering: outer-to-inner is b.mu -> engine, not the other + // way). The backend half is then populated under b.mu below, atomically + // with watcher registration so no events arrive on the watcher's + // channel before InitialStatus is delivered. + var statusSB *ipnstate.StatusBuilder + if mask&ipn.NotifyInitialStatus != 0 { + statusSB = &ipnstate.StatusBuilder{WantPeers: true} + b.e.UpdateStatus(statusSB) + } + b.mu.Lock() - const initialBits = ipn.NotifyInitialState | ipn.NotifyInitialPrefs | ipn.NotifyInitialNetMap | ipn.NotifyInitialDriveShares | ipn.NotifyInitialSuggestedExitNode | ipn.NotifyInitialClientVersion + const initialBits = ipn.NotifyInitialState | ipn.NotifyInitialPrefs | ipn.NotifyInitialNetMap | ipn.NotifyInitialStatus | ipn.NotifyInitialDriveShares | ipn.NotifyInitialSuggestedExitNode | ipn.NotifyInitialClientVersion if mask&initialBits != 0 { cn := b.currentNode() ini = &ipn.Notify{Version: version.Long()} @@ -3334,7 +3483,18 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A ini.Prefs = new(b.sanitizedPrefsLocked()) } if mask&ipn.NotifyInitialNetMap != 0 { - ini.NetMap = cn.NetMap() + if nm := cn.NetMap(); nm != nil && nm.SelfNode.Valid() { + ini.SelfChange = nm.SelfNode.AsStruct() + } + // The legacy initial NetMap is delivered cross-platform: it + // is what watchers asked for by setting NotifyInitialNetMap + // and is always a one-shot, so the cost of building it is + // paid once per bus subscription. + ini.NetMap = cn.netMapWithPeers() + } + if statusSB != nil { + b.updateStatusLocked(statusSB) + ini.InitialStatus = statusSB.Status() } if mask&ipn.NotifyInitialDriveShares != 0 && b.DriveSharingEnabled() { ini.DriveShares = b.pm.prefs.DriveShares() @@ -3463,16 +3623,6 @@ func (b *LocalBackend) DebugNotify(n ipn.Notify) { b.send(n) } -// DebugNotifyLastNetMap injects a fake notify message to clients, -// repeating whatever the last netmap was. -// -// It should only be used via the LocalAPI's debug handler. -func (b *LocalBackend) DebugNotifyLastNetMap() { - if nm := b.currentNode().NetMap(); nm != nil { - b.send(ipn.Notify{NetMap: nm}) - } -} - // DebugForceNetmapUpdate forces a full no-op netmap update of the current // netmap in all the various subsystems (wireguard, magicsock, LocalBackend). // @@ -3603,27 +3753,107 @@ func (b *LocalBackend) sendToLocked(n ipn.Notify, recipient notificationTarget) if !recipient.match(sess.owner) { continue } - nOut := &n - if n.PeerChanges != nil { - // Take a shallow copy of n so we can elide the PeerChanges or the Netmap - // based on the session's mask. - nOut = new(n) - if sess.mask&ipn.NotifyPeerChanges != 0 { - // Skip the full Netmap - nOut.NetMap = nil - } else { - // Skip the PeerChanges - nOut.PeerChanges = nil - } - } + nForSess := b.notifyForSessionLocked(sess, &n) select { - case sess.ch <- nOut: + case sess.ch <- nForSess: default: // Drop the notification if the channel is full. } } } +// notifyForSessionLocked returns the [ipn.Notify] to deliver to sess, +// applying per-session field gating to n: stripping fields the session +// didn't opt in to receive, promoting [Notify.PeerChangedPatch] entries +// into full-Node [Notify.PeersChanged] entries for sessions that asked +// for peer changes but not patches, and tracking on sess which +// [tailcfg.UserProfileView]s have already been delivered so subsequent +// sends only carry new/changed profiles. +// +// The returned pointer is either n itself (no adjustments needed for +// this session) or a fresh *ipn.Notify with the adjusted fields. The +// caller's *ipn.Notify is not mutated. +// +// b.mu must be held. +func (b *LocalBackend) notifyForSessionLocked(sess *watchSession, n *ipn.Notify) *ipn.Notify { + // Visibility of peer-set fields is governed by the watcher's mask: + // + // - NotifyPeerChanges: PeersChanged + PeersRemoved + // - NotifyPeerPatches (implies): + PeerChangedPatch + // + // A watcher with NotifyPeerChanges but not NotifyPeerPatches still + // observes every per-peer mutation; we just promote each + // PeerChangedPatch entry into a full-Node entry in PeersChanged so + // the watcher doesn't have to handle the patch shape. + wantsPeerChanges := sess.mask&(ipn.NotifyPeerChanges|ipn.NotifyPeerPatches) != 0 + wantsPeerPatches := sess.mask&ipn.NotifyPeerPatches != 0 + stripNetMap := goosGetsLegacyNetmapNotify && n.NetMap != nil && sess.mask&ipn.NotifyNoNetMap != 0 + stripPeersChanged := len(n.PeersChanged) > 0 && !wantsPeerChanges + stripPeersRemoved := len(n.PeersRemoved) > 0 && !wantsPeerChanges + stripPatches := len(n.PeerChangedPatch) > 0 && !wantsPeerPatches + promotePatches := len(n.PeerChangedPatch) > 0 && wantsPeerChanges && !wantsPeerPatches + + // UserProfiles ride alongside peer changes and are gated on the + // same opt-in. Sessions that didn't ask for peer changes get the + // field stripped entirely; opted-in sessions get a per-session + // subset containing only profiles that differ from what was last + // delivered to that session, compared via + // [tailcfg.UserProfileView.Equal] (pointer-cheap when the view + // shares backing memory with the previous send). + stripUserProfiles := len(n.UserProfiles) > 0 && !wantsPeerChanges + var sessUserProfiles map[tailcfg.UserID]tailcfg.UserProfileView + if !stripUserProfiles && len(n.UserProfiles) > 0 { + for id, up := range n.UserProfiles { + if up.Equal(sess.lastSentUserProfile[id]) { + continue // already has this exact profile + } + mak.Set(&sessUserProfiles, id, up) + mak.Set(&sess.lastSentUserProfile, id, up) + } + if len(sessUserProfiles) == 0 { + // All entries deduped. + stripUserProfiles = true + } + } + replaceUserProfiles := !stripUserProfiles && len(sessUserProfiles) != len(n.UserProfiles) + + if !stripNetMap && !stripPeersChanged && !stripPeersRemoved && !stripPatches && !stripUserProfiles && !replaceUserProfiles && !promotePatches { + return n + } + nCopy := *n + if stripNetMap { + nCopy.NetMap = nil + } + if stripPeersChanged { + nCopy.PeersChanged = nil + } + if stripPeersRemoved { + nCopy.PeersRemoved = nil + } + if promotePatches { + // Look up each patched peer's current Node and append it to + // PeersChanged. Watchers in this mode receive only full-Node + // updates; they never see PeerChangedPatch. + cn := b.currentNode() + for _, pc := range n.PeerChangedPatch { + nv, ok := cn.NodeByID(pc.NodeID) + if !ok { + continue + } + nCopy.PeersChanged = append(nCopy.PeersChanged, nv.AsStruct()) + } + } + if stripPatches { + nCopy.PeerChangedPatch = nil + } + if stripUserProfiles { + nCopy.UserProfiles = nil + } else if replaceUserProfiles { + nCopy.UserProfiles = sessUserProfiles + } + return &nCopy +} + // setAuthURLLocked sets the authURL and triggers [LocalBackend.popBrowserAuthNow] if the URL has changed. // This method is called when a new authURL is received from the control plane, meaning that either a user // has started a new interactive login (e.g., by running `tailscale login` or clicking Login in the GUI), @@ -4911,7 +5141,8 @@ func (b *LocalBackend) setPrefsLocked(newp *ipn.Prefs) ipn.PrefsView { } } if netMap != nil { - newProfile := profileFromView(netMap.UserProfiles[netMap.User()]) + selfProfileView, _ := cn.UserByID(netMap.User()) + newProfile := profileFromView(selfProfileView) if newLoginName := newProfile.LoginName; newLoginName != "" { if !oldp.Persist().Valid() { b.logf("active login: %s", newLoginName) @@ -5173,24 +5404,20 @@ func (b *LocalBackend) NetMap() *netmap.NetworkMap { // current. Use this for any caller that does not need to iterate Peers, // since it's O(1) regardless of tailnet size. // -// Returns nil if no network map has been received yet. +// It returns nil if no network map has been received yet. func (b *LocalBackend) NetMapNoPeers() *netmap.NetworkMap { return b.currentNode().NetMap() } -// NetMapWithPeers returns the latest network map with the Peers slice -// populated. +// NetMapWithPeers returns a copy of the latest cached network map with +// its Peers slice populated from the live per-node-backend peers map +// (i.e. reflecting any incremental delta updates applied since the last +// full netmap install). It is O(N) in the size of the peer set; prefer +// [LocalBackend.NetMapNoPeers] when only non-Peers fields are needed. // -// Currently this is the same as [LocalBackend.NetMapNoPeers]: the cached -// netmap's Peers slice may be stale relative to the live per-node-backend -// peers map. A follow-up change will switch this method to return a -// freshly-built netmap with up-to-date Peers, at O(N) cost per call. -// Callers that genuinely need the up-to-date peer set should use this -// method (and document why) so the upcoming change reaches them. -// -// Returns nil if no network map has been received yet. +// It returns nil if no netmap is yet available. func (b *LocalBackend) NetMapWithPeers() *netmap.NetworkMap { - return b.currentNode().NetMap() + return b.currentNode().netMapWithPeers() } // lookupPeerByIP returns the node public key for the peer that owns the @@ -8316,6 +8543,17 @@ func maybeUsernameOf(actor ipnauth.Actor) string { var ( metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus") metricIPForwardingCheckError = clientmetric.NewCounter("localbackend_ip_forwarding_check_error") + + // Counters for the controlclient's delta-update fast path: each + // counts a destination-side call into [LocalBackend] from + // [mapSession.tryHandleIncrementally]. Useful as test signals that a + // MapResponse landed on the incremental path with the expected + // payload shape. + metricNetmapDeltaPeerAdded = clientmetric.NewCounter("localbackend_netmap_delta_peer_added") + metricNetmapDeltaPeerRemoved = clientmetric.NewCounter("localbackend_netmap_delta_peer_removed") + metricNetmapDeltaPeerPatched = clientmetric.NewCounter("localbackend_netmap_delta_peer_patched") + metricUpdatePacketFilter = clientmetric.NewCounter("localbackend_update_packet_filter") + metricUpdateUserProfiles = clientmetric.NewCounter("localbackend_update_user_profiles") ) func (b *LocalBackend) stateEncrypted() opt.Bool { diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index bfaa2d31b..d63a8535d 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -1693,18 +1693,18 @@ func TestExitNodeNotifyOrder(t *testing.T) { // and an exit node ID notification (since an exit node is selected). // The netmap notification should be sent first. nw.watch(0, []wantedNotification{ - wantNetmapNotify(clientNetmap), + wantSelfChangeNotify(selfNode), wantExitNodeIDNotify(exitNode1.StableID()), }) lb.SetControlClientStatus(lb.cc, controlclient.Status{NetMap: clientNetmap}) nw.check() } -func wantNetmapNotify(want *netmap.NetworkMap) wantedNotification { +func wantSelfChangeNotify(want tailcfg.NodeView) wantedNotification { return wantedNotification{ - name: "Netmap", + name: "SelfChange", cond: func(t testing.TB, _ ipnauth.Actor, n *ipn.Notify) bool { - return n.NetMap == want + return n.SelfChange != nil && want.Valid() && n.SelfChange.StableID == want.StableID() }, } } @@ -2078,6 +2078,198 @@ func TestWatchNotificationsCallbacks(t *testing.T) { } } +// TestNotifyForSessionPeerVisibility verifies the per-session masking +// logic in [LocalBackend.notifyForSessionLocked] for the +// NotifyPeerChanges / NotifyPeerPatches flag pair: +// +// - A watcher with no peer-change bits should not see PeersChanged, +// PeersRemoved, or PeerChangedPatch. +// - A watcher with NotifyPeerChanges (but not NotifyPeerPatches) should +// see PeersChanged and PeersRemoved, AND any incoming +// PeerChangedPatch entries should be promoted to full Nodes in +// PeersChanged. PeerChangedPatch itself must be cleared. +// - A watcher with NotifyPeerPatches should see all three fields. +func TestNotifyForSessionPeerVisibility(t *testing.T) { + b := newTestLocalBackend(t) + + // Install a netmap with two peers so the patch-promotion path can + // resolve PeerChangedPatch entries to full Nodes. + nm := &netmap.NetworkMap{} + for _, id := range []tailcfg.NodeID{10, 20} { + nm.Peers = append(nm.Peers, (&tailcfg.Node{ + ID: id, + Key: makeNodeKeyFromID(id), + Addresses: []netip.Prefix{netip.MustParsePrefix(fmt.Sprintf("100.64.0.%d/32", id))}, + }).View()) + } + b.currentNode().SetNetMap(nm) + + // Build a Notify carrying every peer-change kind: an added peer + // (PeersChanged), a removed peer (PeersRemoved), and a patch for an + // existing peer (PeerChangedPatch). + addedPeer := &tailcfg.Node{ID: 30, Key: makeNodeKeyFromID(30)} + online := true + notify := ipn.Notify{ + PeersChanged: []*tailcfg.Node{addedPeer}, + PeersRemoved: []tailcfg.NodeID{99}, + PeerChangedPatch: []*tailcfg.PeerChange{{NodeID: 10, Online: &online}}, + } + + deliver := func(mask ipn.NotifyWatchOpt) *ipn.Notify { + sess := &watchSession{mask: mask} + b.mu.Lock() + defer b.mu.Unlock() + return b.notifyForSessionLocked(sess, ¬ify) + } + + t.Run("no_peer_bits", func(t *testing.T) { + n := deliver(0) + if len(n.PeersChanged) != 0 { + t.Errorf("PeersChanged = %v; want empty", n.PeersChanged) + } + if len(n.PeersRemoved) != 0 { + t.Errorf("PeersRemoved = %v; want empty", n.PeersRemoved) + } + if len(n.PeerChangedPatch) != 0 { + t.Errorf("PeerChangedPatch = %v; want empty", n.PeerChangedPatch) + } + }) + + t.Run("peer_changes_only_promotes_patches", func(t *testing.T) { + n := deliver(ipn.NotifyPeerChanges) + if len(n.PeerChangedPatch) != 0 { + t.Errorf("PeerChangedPatch should be stripped; got %v", n.PeerChangedPatch) + } + if len(n.PeersRemoved) != 1 || n.PeersRemoved[0] != 99 { + t.Errorf("PeersRemoved = %v; want [99]", n.PeersRemoved) + } + // PeersChanged should contain the originally-added peer (30) AND + // a promoted full-Node entry for the patched peer (10). + ids := make(map[tailcfg.NodeID]bool, len(n.PeersChanged)) + for _, p := range n.PeersChanged { + ids[p.ID] = true + } + if !ids[30] { + t.Errorf("PeersChanged missing added peer 30; got %+v", n.PeersChanged) + } + if !ids[10] { + t.Errorf("PeersChanged missing promoted peer 10; got %+v", n.PeersChanged) + } + }) + + t.Run("peer_patches_keeps_patch_field", func(t *testing.T) { + n := deliver(ipn.NotifyPeerPatches) + if len(n.PeerChangedPatch) != 1 || n.PeerChangedPatch[0].NodeID != 10 { + t.Errorf("PeerChangedPatch = %v; want [{NodeID:10,...}]", n.PeerChangedPatch) + } + if len(n.PeersChanged) != 1 || n.PeersChanged[0].ID != 30 { + t.Errorf("PeersChanged = %v; want [{ID:30}]", n.PeersChanged) + } + if len(n.PeersRemoved) != 1 || n.PeersRemoved[0] != 99 { + t.Errorf("PeersRemoved = %v; want [99]", n.PeersRemoved) + } + }) + + t.Run("both_bits_unchanged", func(t *testing.T) { + n := deliver(ipn.NotifyPeerChanges | ipn.NotifyPeerPatches) + if len(n.PeerChangedPatch) != 1 { + t.Errorf("PeerChangedPatch len = %d; want 1", len(n.PeerChangedPatch)) + } + if len(n.PeersChanged) != 1 { + t.Errorf("PeersChanged len = %d; want 1", len(n.PeersChanged)) + } + if len(n.PeersRemoved) != 1 { + t.Errorf("PeersRemoved len = %d; want 1", len(n.PeersRemoved)) + } + }) +} + +// TestNotifyForSessionUserProfilesGating verifies that +// [Notify.UserProfiles] is only delivered to sessions opted in to +// NotifyPeerChanges/NotifyPeerPatches, and is deduped per-UserID +// against [watchSession.lastSentUserProfile] across successive sends. +func TestNotifyForSessionUserProfilesGating(t *testing.T) { + b := newTestLocalBackend(t) + + deliver := func(sess *watchSession, profiles map[tailcfg.UserID]tailcfg.UserProfileView) *ipn.Notify { + b.mu.Lock() + defer b.mu.Unlock() + return b.notifyForSessionLocked(sess, &ipn.Notify{UserProfiles: profiles}) + } + + profiles := map[tailcfg.UserID]tailcfg.UserProfileView{ + 7: (&tailcfg.UserProfile{ID: 7, LoginName: "alice@example.com", DisplayName: "Alice"}).View(), + } + + t.Run("no_bits_strips", func(t *testing.T) { + n := deliver(&watchSession{}, profiles) + if len(n.UserProfiles) != 0 { + t.Errorf("UserProfiles = %v; want empty", n.UserProfiles) + } + }) + t.Run("peer_changes_delivers", func(t *testing.T) { + n := deliver(&watchSession{mask: ipn.NotifyPeerChanges}, profiles) + if got, want := len(n.UserProfiles), 1; got != want { + t.Fatalf("UserProfiles len = %d; want %d", got, want) + } + if n.UserProfiles[7].LoginName() != "alice@example.com" { + t.Errorf("got %+v; want alice", n.UserProfiles) + } + }) + t.Run("peer_patches_delivers", func(t *testing.T) { + n := deliver(&watchSession{mask: ipn.NotifyPeerPatches}, profiles) + if got, want := len(n.UserProfiles), 1; got != want { + t.Fatalf("UserProfiles len = %d; want %d", got, want) + } + }) + + // The remaining cases share a single session so the dedup state on + // [watchSession.lastSentUserProfile] persists across deliveries. + sess := &watchSession{mask: ipn.NotifyPeerChanges} + + t.Run("first_send", func(t *testing.T) { + n := deliver(sess, profiles) + if got, want := len(n.UserProfiles), 1; got != want { + t.Fatalf("UserProfiles len = %d; want %d", got, want) + } + }) + t.Run("dedup_repeat_same_map", func(t *testing.T) { + // Resending the exact same map should deliver nothing. + n := deliver(sess, profiles) + if len(n.UserProfiles) != 0 { + t.Errorf("got UserProfiles=%v on repeat; want empty (deduped)", n.UserProfiles) + } + }) + t.Run("per_user_dedup", func(t *testing.T) { + // A Notify with two profiles where only one changed should + // deliver only the changed one. + mixed := map[tailcfg.UserID]tailcfg.UserProfileView{ + 7: (&tailcfg.UserProfile{ID: 7, LoginName: "alice@example.com", DisplayName: "Alice"}).View(), // unchanged + 8: (&tailcfg.UserProfile{ID: 8, LoginName: "bob@example.com", DisplayName: "Bob the New"}).View(), // new + } + n := deliver(sess, mixed) + if got, want := len(n.UserProfiles), 1; got != want { + t.Fatalf("UserProfiles len = %d; want %d (only the new user)", got, want) + } + if _, ok := n.UserProfiles[7]; ok { + t.Errorf("UserProfiles still includes user 7 (should have been deduped)") + } + if got := n.UserProfiles[8].LoginName(); got != "bob@example.com" { + t.Errorf("UserProfiles[8].LoginName = %q; want bob", got) + } + }) + t.Run("changed_user_delivers", func(t *testing.T) { + // Updating an existing UserID re-sends just that one. + updated := map[tailcfg.UserID]tailcfg.UserProfileView{ + 7: (&tailcfg.UserProfile{ID: 7, LoginName: "alice@example.com", DisplayName: "Alice 2.0"}).View(), + } + n := deliver(sess, updated) + if n.UserProfiles[7].DisplayName() != "Alice 2.0" { + t.Errorf("got %+v; want updated alice", n.UserProfiles) + } + }) +} + // tests LocalBackend.updateNetmapDeltaLocked func TestUpdateNetmapDelta(t *testing.T) { b := newTestLocalBackend(t) diff --git a/ipn/ipnlocal/node_backend.go b/ipn/ipnlocal/node_backend.go index 59c26ebe5..3c21ff2a8 100644 --- a/ipn/ipnlocal/node_backend.go +++ b/ipn/ipnlocal/node_backend.go @@ -7,6 +7,7 @@ "cmp" "context" "fmt" + "maps" "net/netip" "slices" "sync" @@ -116,6 +117,20 @@ type nodeBackend struct { // It is mutated in place (with mu held) and must not escape the [nodeBackend]. nodeByKey map[key.NodePublic]tailcfg.NodeID + // userProfiles is the live set of user profiles, updated incrementally + // by mergeUserProfiles as deltas arrive. It parallels the peers map: + // netMap.UserProfiles is the frozen snapshot from the last full install, + // while this field reflects incremental updates. Readers that need a + // snapshot (e.g. the legacy Notify.NetMap path) must clone this map. + userProfiles map[tailcfg.UserID]tailcfg.UserProfileView + + // packetFilterRules and packetFilter are the live packet filter state, + // updated by setPacketFilter as deltas arrive. Like userProfiles, they + // exist separately from netMap's frozen fields so that concurrent + // JSON-encoding of a Notify.NetMap snapshot doesn't race with writes. + packetFilterRules views.Slice[tailcfg.FilterRule] + packetFilter []filter.Match + // keyWaitersForTest is the test-only registry of channels waiting for // a given peer key to first appear in the netmap. See // [nodeBackend.AwaitNodeKeyForTest]. It is populated lazily and remains @@ -239,12 +254,8 @@ func (nb *nodeBackend) PeerByStableID(id tailcfg.StableNodeID) (_ tailcfg.NodeVi func (nb *nodeBackend) UserByID(id tailcfg.UserID) (_ tailcfg.UserProfileView, ok bool) { nb.mu.Lock() - nm := nb.netMap - nb.mu.Unlock() - if nm == nil { - return tailcfg.UserProfileView{}, false - } - u, ok := nm.UserProfiles[id] + defer nb.mu.Unlock() + u, ok := nb.userProfiles[id] return u, ok } @@ -465,6 +476,9 @@ func (nb *nodeBackend) netMapWithPeers() *netmap.NetworkMap { slices.SortFunc(nm.Peers, func(a, b tailcfg.NodeView) int { return cmp.Compare(a.ID(), b.ID()) }) + nm.UserProfiles = maps.Clone(nb.userProfiles) + nm.PacketFilterRules = nb.packetFilterRules + nm.PacketFilter = nb.packetFilter return nm } @@ -477,8 +491,14 @@ func (nb *nodeBackend) SetNetMap(nm *netmap.NetworkMap) { nb.updatePeersLocked() nb.signalKeyWaitersForTestLocked() if nm != nil { + nb.userProfiles = maps.Clone(nm.UserProfiles) + nb.packetFilterRules = nm.PacketFilterRules + nb.packetFilter = nm.PacketFilter nb.derpMapViewPub.Publish(nm.DERPMap.View()) } else { + nb.userProfiles = nil + nb.packetFilterRules = views.Slice[tailcfg.FilterRule]{} + nb.packetFilter = nil nb.derpMapViewPub.Publish(tailcfg.DERPMapView{}) } } @@ -612,10 +632,39 @@ func (nb *nodeBackend) updatePeersLocked() { } } +// setPacketFilter stores the live packet filter rules and parsed +// matches. It does not touch the frozen netMap. nb.mu is acquired by +// this method. +func (nb *nodeBackend) setPacketFilter(rules views.Slice[tailcfg.FilterRule], parsed []filter.Match) { + nb.mu.Lock() + defer nb.mu.Unlock() + nb.packetFilterRules = rules + nb.packetFilter = parsed +} + +// PacketFilter returns the current live packet filter matches. +func (nb *nodeBackend) PacketFilter() []filter.Match { + nb.mu.Lock() + defer nb.mu.Unlock() + return nb.packetFilter +} + +// mergeUserProfiles merges new/updated [tailcfg.UserProfileView] +// entries into the live userProfiles map. It does not touch +// netMap.UserProfiles (which is frozen once set). Callers must hold +// [LocalBackend.mu]. nb.mu is acquired by this method. +func (nb *nodeBackend) mergeUserProfiles(profiles map[tailcfg.UserID]tailcfg.UserProfileView) { + nb.mu.Lock() + defer nb.mu.Unlock() + for id, up := range profiles { + mak.Set(&nb.userProfiles, id, up) + } +} + func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bool) { nb.mu.Lock() defer nb.mu.Unlock() - if nb.netMap == nil || len(nb.peers) == 0 { + if nb.netMap == nil { return false } @@ -625,9 +674,35 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo var mutableNodes map[tailcfg.NodeID]*tailcfg.Node for _, m := range muts { - n, ok := mutableNodes[m.NodeIDBeingMutated()] + switch m := m.(type) { + case netmap.NodeMutationAdd: + nid := m.Node.ID() + mak.Set(&nb.peers, nid, m.Node) + for _, ipp := range m.Node.Addresses().All() { + if ipp.IsSingleIP() { + mak.Set(&nb.nodeByAddr, ipp.Addr(), nid) + } + } + mak.Set(&nb.nodeByKey, m.Node.Key(), nid) + continue + case netmap.NodeMutationRemove: + nid := m.NodeIDBeingMutated() + if old, ok := nb.peers[nid]; ok { + for _, ipp := range old.Addresses().All() { + if ipp.IsSingleIP() { + delete(nb.nodeByAddr, ipp.Addr()) + } + } + delete(nb.nodeByKey, old.Key()) + delete(nb.peers, nid) + } + continue + } + // Per-field mutation. + nid := m.NodeIDBeingMutated() + n, ok := mutableNodes[nid] if !ok { - nv, ok := nb.peers[m.NodeIDBeingMutated()] + nv, ok := nb.peers[nid] if !ok { // TODO(bradfitz): unexpected metric? return false @@ -640,6 +715,7 @@ func (nb *nodeBackend) UpdateNetmapDelta(muts []netmap.NodeMutation) (handled bo for nid, n := range mutableNodes { nb.peers[nid] = n.View() } + nb.signalKeyWaitersForTestLocked() return true } diff --git a/ipn/localapi/debug.go b/ipn/localapi/debug.go index 6f222bef0..f6b0d631b 100644 --- a/ipn/localapi/debug.go +++ b/ipn/localapi/debug.go @@ -200,8 +200,6 @@ func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) { break } h.b.DebugNotify(n) - case "notify-last-netmap": - h.b.DebugNotifyLastNetMap() case "break-tcp-conns": err = h.b.DebugBreakTCPConns() case "break-derp-conns": diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 0fcd059ba..6164daac0 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -90,6 +90,7 @@ "shutdown": (*Handler).serveShutdown, "start": (*Handler).serveStart, "status": (*Handler).serveStatus, + "user-profile": (*Handler).serveUserProfile, "whois": (*Handler).serveWhoIs, } @@ -900,6 +901,12 @@ func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) { } mask = ipn.NotifyWatchOpt(v) } + // NotifyInitialNetMap is permitted alongside NotifyPeerChanges / + // NotifyPeerPatches for backwards compatibility with clients that + // set both (e.g. the Apple client). On platforms where + // goosGetsLegacyNetmapNotify is true, the initial netmap is + // delivered regardless; peer-change subscribers simply receive + // deltas after that point. w.Header().Set("Content-Type", "application/json") ctx := r.Context() @@ -1138,12 +1145,14 @@ type peerByIDBackend interface { PeerByID(tailcfg.NodeID) (tailcfg.NodeView, bool) } -// servePeerByID returns the current full [tailcfg.Node] for the peer with -// the NodeID given in the "id" query parameter, in O(1) time. It returns -// 404 if no such peer is in the current netmap. +// servePeerByID returns the current full [tailcfg.Node] for the peer with the +// NodeID given in the "id" query parameter. It returns 404 if no such peer is +// in the current netmap. // -// It is intended for clients that need the latest state of a single peer -// without fetching the entire netmap. +// It is intended for clients that observed a peer-mutation signal (e.g. +// [ipn.Notify.PeerChangedPatch] or [ipn.Notify.PeersChanged]) and want the +// latest state of the affected node without having to apply the patch +// themselves. func (h *Handler) servePeerByID(w http.ResponseWriter, r *http.Request) { h.servePeerByIDWithBackend(w, r, h.b) } @@ -1170,6 +1179,45 @@ func (h *Handler) servePeerByIDWithBackend(w http.ResponseWriter, r *http.Reques e.Encode(nv.AsStruct()) } +// userProfileBackend is the subset of [ipnlocal.LocalBackend] used by +// [Handler.serveUserProfile]. It exists so the handler can be tested +// with a trivial mock without spinning up a full LocalBackend. +type userProfileBackend interface { + UserProfile(tailcfg.UserID) (tailcfg.UserProfileView, bool) +} + +// serveUserProfile returns the current [tailcfg.UserProfile] for the User +// with the UserID given in the "id" query parameter, in O(1) time. It +// returns 404 if no such user is in the current netmap. +// +// It is the LocalAPI fallback for IPN-bus consumers that see a UserID +// referenced by a peer Node and want to resolve it to a UserProfile. +func (h *Handler) serveUserProfile(w http.ResponseWriter, r *http.Request) { + h.serveUserProfileWithBackend(w, r, h.b) +} + +func (h *Handler) serveUserProfileWithBackend(w http.ResponseWriter, r *http.Request, b userProfileBackend) { + if !h.PermitRead { + http.Error(w, "user-profile access denied", http.StatusForbidden) + return + } + idStr := r.FormValue("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id <= 0 { + http.Error(w, "invalid 'id' parameter", http.StatusBadRequest) + return + } + uv, ok := b.UserProfile(tailcfg.UserID(id)) + if !ok { + http.Error(w, "no user with that UserID", http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + e := json.NewEncoder(w) + e.SetIndent("", "\t") + e.Encode(uv.AsStruct()) +} + // serveSetExpirySooner sets the expiry date on the current machine, specified // by an `expiry` unix timestamp as POST or query param. func (h *Handler) serveSetExpirySooner(w http.ResponseWriter, r *http.Request) { diff --git a/ipn/localapi/localapi_test.go b/ipn/localapi/localapi_test.go index 84d8e1e0f..fc4ddbd92 100644 --- a/ipn/localapi/localapi_test.go +++ b/ipn/localapi/localapi_test.go @@ -461,6 +461,66 @@ func TestServePeerByID(t *testing.T) { }) } +type fakeUserProfileBackend map[tailcfg.UserID]*tailcfg.UserProfile + +func (f fakeUserProfileBackend) UserProfile(id tailcfg.UserID) (tailcfg.UserProfileView, bool) { + u, ok := f[id] + if !ok { + return tailcfg.UserProfileView{}, false + } + return u.View(), true +} + +func TestServeUserProfile(t *testing.T) { + h := handlerForTest(t, &Handler{PermitRead: true}) + b := fakeUserProfileBackend{ + 7: {ID: 7, LoginName: "alice@example.com", DisplayName: "Alice"}, + } + + tests := []struct { + name string + query string + wantCode int + wantLogin string + }{ + {"hit", "id=7", 200, "alice@example.com"}, + {"miss", "id=99", 404, ""}, + {"bad_id", "id=garbage", 400, ""}, + {"missing_id", "", 400, ""}, + {"zero_id", "id=0", 400, ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/v0/user-profile?"+tt.query, nil) + h.serveUserProfileWithBackend(rec, req, b) + if rec.Code != tt.wantCode { + t.Fatalf("status = %d, want %d; body=%q", rec.Code, tt.wantCode, rec.Body.String()) + } + if tt.wantCode != 200 { + return + } + var got tailcfg.UserProfile + if err := json.Unmarshal(rec.Body.Bytes(), &got); err != nil { + t.Fatalf("unmarshal body %q: %v", rec.Body.Bytes(), err) + } + if got.LoginName != tt.wantLogin { + t.Errorf("LoginName = %q, want %q", got.LoginName, tt.wantLogin) + } + }) + } + + t.Run("forbidden", func(t *testing.T) { + hh := handlerForTest(t, &Handler{PermitRead: false}) + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/v0/user-profile?id=7", nil) + hh.serveUserProfileWithBackend(rec, req, b) + if rec.Code != http.StatusForbidden { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusForbidden) + } + }) +} + func TestShouldDenyServeConfigForGOOSAndUserContext(t *testing.T) { newHandler := func(connIsLocalAdmin bool) *Handler { return handlerForTest(t, &Handler{ diff --git a/ipn/serve.go b/ipn/serve.go index 21d15ab81..5a65658a5 100644 --- a/ipn/serve.go +++ b/ipn/serve.go @@ -191,7 +191,7 @@ func (sc *ServeConfig) WebHandlerExists(svcName tailcfg.ServiceName, hp HostPort } // GetWebHandler returns the HTTPHandler for the given host:port and mount point. -// Returns nil if the handler does not exist. +// It returns nil if the handler does not exist. func (sc *ServeConfig) GetWebHandler(svcName tailcfg.ServiceName, hp HostPort, mount string) *HTTPHandler { if sc == nil { return nil diff --git a/tsnet/packet_filter_test.go b/tsnet/packet_filter_test.go index ca776436e..342210f4b 100644 --- a/tsnet/packet_filter_test.go +++ b/tsnet/packet_filter_test.go @@ -20,26 +20,23 @@ "tailscale.com/wgengine/filter" ) -// waitFor blocks until a NetMap is seen on the IPN bus that satisfies the given -// function f. Note: has no timeout, should be called with a ctx that has an -// appropriate timeout set. +// waitFor blocks until the LocalBackend's current netmap satisfies the given +// function f. It uses a bus subscription to wake up on netmap and peer-mutation +// events rather than polling. Note: it has no timeout and should be called with +// a ctx that has an appropriate timeout set. func waitFor(t testing.TB, ctx context.Context, s *Server, f func(*netmap.NetworkMap) bool) error { t.Helper() - watcher, err := s.localClient.WatchIPNBus(ctx, ipn.NotifyInitialNetMap) + w, err := s.localClient.WatchIPNBus(ctx, ipn.NotifyInitialState|ipn.NotifyPeerChanges) if err != nil { - t.Fatalf("error watching IPN bus: %s", err) + return fmt.Errorf("watching IPN bus: %w", err) } - defer watcher.Close() - + defer w.Close() for { - n, err := watcher.Next() - if err != nil { - return fmt.Errorf("getting next ipn.Notify from IPN bus: %w", err) + if nm := s.lb.NetMapWithPeers(); nm != nil && f(nm) { + return nil } - if n.NetMap != nil { - if f(n.NetMap) { - return nil - } + if _, err := w.Next(); err != nil { + return fmt.Errorf("waiting for netmap: %w", err) } } } @@ -192,7 +189,7 @@ type check struct { controlURL, c := startControl(t) s, _, pubKey := startServer(t, ctx, controlURL, "node") - if test.waitTest(s.lb.NetMap()) { + if test.waitTest(s.lb.NetMapWithPeers()) { t.Fatal("waitTest already passes before sending initial netmap: this will be flaky") } @@ -223,7 +220,7 @@ type check struct { t.Fatal("incrementalWaitTest must be set if incrementalMapResponse is set") } - if test.incrementalWaitTest(s.lb.NetMap()) { + if test.incrementalWaitTest(s.lb.NetMapWithPeers()) { t.Fatal("incrementalWaitTest already passes before sending incremental netmap: this will be flaky") } diff --git a/tsnet/tsnet_test.go b/tsnet/tsnet_test.go index 4ee0ab10c..b6677b5ec 100644 --- a/tsnet/tsnet_test.go +++ b/tsnet/tsnet_test.go @@ -1022,7 +1022,11 @@ func setUpServiceState(t *testing.T, name, ip string, host, client *Server, t.Helper() w := must.Get(s.localClient.WatchIPNBus(t.Context(), ipn.NotifyInitialNetMap)) defer w.Close() - for n := must.Get(w.Next()); !netmapUpToDate(n.NetMap); n = must.Get(w.Next()) { + for { + must.Get(w.Next()) + if nm := s.lb.NetMapWithPeers(); nm != nil && netmapUpToDate(nm) { + return + } } } waitForLatestNetmap(t, client) diff --git a/tstest/integration/integration_test.go b/tstest/integration/integration_test.go index 3064d6a26..b14a19c95 100644 --- a/tstest/integration/integration_test.go +++ b/tstest/integration/integration_test.go @@ -34,7 +34,6 @@ "github.com/miekg/dns" "go4.org/mem" "tailscale.com/client/local" - "tailscale.com/client/tailscale" "tailscale.com/cmd/testwrapper/flakytest" "tailscale.com/feature" _ "tailscale.com/feature/clientupdate" @@ -69,6 +68,14 @@ func TestMain(m *testing.M) { os.Exit(0) } +// fetchNetMapForTest fetches the current netmap from tailscaled via the +// "current-netmap" debug action. The debug action's payload shape is +// intentionally not part of any stable API; tests use it to inspect +// internal state. +func fetchNetMapForTest(ctx context.Context, lc *local.Client) (*netmap.NetworkMap, error) { + return local.GetDebugResultJSON[*netmap.NetworkMap](ctx, lc, "current-netmap") +} + // Tests that tailscaled starts up in TUN mode, and also without data races: // https://github.com/tailscale/tailscale/issues/7894 func TestTUNMode(t *testing.T) { @@ -1189,20 +1196,18 @@ func TestClientSideJailing(t *testing.T) { if err != nil { t.Fatal(err) } - waitPeerIsJailed := func(t *testing.T, b *tailscale.IPNBusWatcher, jailed bool) { + waitPeerIsJailed := func(t *testing.T, b *local.IPNBusWatcher, lc *local.Client, jailed bool) { t.Helper() for { - n, err := b.Next() + _, err := b.Next() if err != nil { t.Fatal(err) } - if n.NetMap == nil { + nm, err := fetchNetMapForTest(context.Background(), lc) + if err != nil || nm == nil || len(nm.Peers) == 0 { continue } - if len(n.NetMap.Peers) == 0 { - continue - } - if j := n.NetMap.Peers[0].IsJailed(); j == jailed { + if j := nm.Peers[0].IsJailed(); j == jailed { break } } @@ -1213,8 +1218,8 @@ func TestClientSideJailing(t *testing.T) { env.Control.SetJailed(k2, k1, tc.n1JailedForN2) // Wait for the jailed status to propagate. - waitPeerIsJailed(t, b1, tc.n2JailedForN1) - waitPeerIsJailed(t, b2, tc.n1JailedForN2) + waitPeerIsJailed(t, b1, lc1, tc.n2JailedForN1) + waitPeerIsJailed(t, b2, lc2, tc.n1JailedForN2) testDial(t, lc1, ip2, port, tc.n1JailedForN2) testDial(t, lc2, ip1, port, tc.n2JailedForN1) diff --git a/tstest/largetailnet/delta_test.go b/tstest/largetailnet/delta_test.go new file mode 100644 index 000000000..aba639f22 --- /dev/null +++ b/tstest/largetailnet/delta_test.go @@ -0,0 +1,279 @@ +// Copyright (c) Tailscale Inc & contributors +// SPDX-License-Identifier: BSD-3-Clause + +package largetailnet_test + +import ( + "context" + "fmt" + "net/http/httptest" + "net/netip" + "os" + "path/filepath" + "testing" + "time" + + "tailscale.com/ipn/store/mem" + "tailscale.com/tailcfg" + "tailscale.com/tsnet" + "tailscale.com/tstest" + "tailscale.com/tstest/integration" + "tailscale.com/tstest/integration/testcontrol" + "tailscale.com/tstest/largetailnet" + "tailscale.com/types/ipproto" + "tailscale.com/types/logger" + "tailscale.com/util/clientmetric" + "tailscale.com/wgengine/filter" +) + +// metricByName returns the [clientmetric.Metric] with the given name, +// failing the test if not found. +func metricByName(t testing.TB, name string) *clientmetric.Metric { + t.Helper() + for _, m := range clientmetric.Metrics() { + if m.Name() == name { + return m + } + } + t.Fatalf("metric %q not found", name) + return nil +} + +// TestNetmapDeltaFastPath drives a sequence of MapResponses against an +// in-process tsnet + testcontrol harness via [largetailnet.Streamer]'s +// AltMapStream hook, exercising every delta-message kind the +// incremental netmap path handles. After each delta it asserts both: +// +// - the appropriate fast-path metric counters incremented (i.e. we +// stayed on the incremental path and did not fall through to a full +// netmap rebuild); and +// +// - the corresponding side effect is observable on the [LocalBackend] +// (a fresh peer resolvable via PeerByID, a UserProfile resolvable +// via UserProfile, a packet filter rule reflected in +// GetFilterForTest, a per-field patch reflected in PeerByID, etc.). +// +// This is the destination-side companion to +// [tstest/largetailnet/BenchmarkGiantTailnet], which only measures cost +// of the same fast path — this test verifies correctness. +func TestNetmapDeltaFastPath(t *testing.T) { + tstest.Shard(t) + + logf := logger.Discard + if testing.Verbose() { + logf = t.Logf + } + + ctx, cancel := context.WithTimeout(t.Context(), 60*time.Second) + t.Cleanup(cancel) + + derpMap := integration.RunDERPAndSTUN(t, logf, "127.0.0.1") + + // Start with one initial peer (NodeID 2) so the initial netmap is + // realistic. The fast path will not fire for the initial response — + // it always goes through UpdateFullNetmap — but every subsequent + // SendDelta should. + streamer := largetailnet.New(1, derpMap) + ctrl := &testcontrol.Server{ + DERPMap: derpMap, + DNSConfig: &tailcfg.DNSConfig{}, + AltMapStream: streamer.AltMapStream(), + Logf: logf, + } + ctrl.HTTPTestServer = httptest.NewUnstartedServer(ctrl) + ctrl.HTTPTestServer.Start() + t.Cleanup(ctrl.HTTPTestServer.Close) + + tmp := filepath.Join(t.TempDir(), "tsnet") + if err := os.MkdirAll(tmp, 0o755); err != nil { + t.Fatal(err) + } + s := &tsnet.Server{ + Dir: tmp, + ControlURL: ctrl.HTTPTestServer.URL, + Hostname: "delta-test", + Store: new(mem.Store), + Ephemeral: true, + Logf: logf, + } + t.Cleanup(func() { s.Close() }) + if _, err := s.Up(ctx); err != nil { + t.Fatalf("tsnet.Server.Up: %v", err) + } + lb := tsnet.TestHooks.LocalBackend(s) + + // Snapshot baseline metric values; we'll assert deltas against + // these. Globals make per-test isolation impossible, but deltas + // are robust against interleaving (assuming no other test runs in + // parallel here, hence tstest.Shard above). + mFast := metricByName(t, "controlclient_map_response_handled_incrementally") + mFull := metricByName(t, "controlclient_map_response_handled_full_rebuild") + mAdd := metricByName(t, "localbackend_netmap_delta_peer_added") + mRem := metricByName(t, "localbackend_netmap_delta_peer_removed") + mPatch := metricByName(t, "localbackend_netmap_delta_peer_patched") + mFilter := metricByName(t, "localbackend_update_packet_filter") + mUsers := metricByName(t, "localbackend_update_user_profiles") + baseline := map[*clientmetric.Metric]int64{ + mFast: mFast.Value(), mFull: mFull.Value(), + mAdd: mAdd.Value(), mRem: mRem.Value(), mPatch: mPatch.Value(), + mFilter: mFilter.Value(), mUsers: mUsers.Value(), + } + dumpMetrics := func(t *testing.T) { + t.Helper() + for _, m := range []*clientmetric.Metric{mFast, mFull, mAdd, mRem, mPatch, mFilter, mUsers} { + t.Logf("metric %s = %d (baseline %d, delta %d)", m.Name(), m.Value(), baseline[m], m.Value()-baseline[m]) + } + } + waitDelta := func(t *testing.T, m *clientmetric.Metric, want int64) { + t.Helper() + err := tstest.WaitFor(2*time.Second, func() error { + got := m.Value() - baseline[m] + if got >= want { + return nil + } + return fmt.Errorf("%s delta = %d, want >= %d", m.Name(), got, want) + }) + if err != nil { + dumpMetrics(t) + t.Fatalf("%s: %v", m.Name(), err) + } + if got := m.Value() - baseline[m]; got != want { + t.Errorf("%s delta = %d, want exactly %d", m.Name(), got, want) + } + baseline[m] = m.Value() + } + + // Helper to send a MapResponse and wait for it to be processed by + // the client. We use the metric deltas as our synchronization + // point: SendDelta is synchronous from the streamer side, but the + // client processes the response on its own goroutine, so we wait + // for the fast-path counter to tick. + sendDelta := func(t *testing.T, mr *tailcfg.MapResponse) { + t.Helper() + if err := streamer.SendDelta(ctx, mr); err != nil { + t.Fatalf("SendDelta: %v", err) + } + } + + // Self IPv4, used as the destination in packet filter checks below. + // largetailnet derives self addresses from SelfNodeID via node4/node6; + // for SelfNodeID=1 that's 100.100.0.1. + selfIP4 := netip.MustParseAddr("100.100.0.1") + + // addedPeerID is set by the peer_added_with_filter_and_user_profile + // subtest and consumed later by peer_removed. + var addedPeerID tailcfg.NodeID + + t.Run("peer_added_with_filter_and_user_profile", func(t *testing.T) { + // Add a fresh peer. Bundle a new PacketFilter rule allowing + // TCP from that peer's IP to a port we'll later probe, and a + // new UserProfile for the user that owns the new peer. + newPeer := streamer.AllocPeer() + newUser := tailcfg.UserID(42) + newPeer.User = newUser + newPeer.Addresses = []netip.Prefix{netip.MustParsePrefix("100.64.0.42/32")} + addedPeerID = newPeer.ID + sendDelta(t, &tailcfg.MapResponse{ + PeersChanged: []*tailcfg.Node{newPeer}, + PacketFilter: []tailcfg.FilterRule{{ + SrcIPs: []string{"100.64.0.42/32"}, + IPProto: []int{int(ipproto.TCP)}, + DstPorts: []tailcfg.NetPortRange{{IP: "*", Ports: tailcfg.PortRange{First: 22, Last: 22}}}, + }}, + UserProfiles: []tailcfg.UserProfile{{ + ID: newUser, + LoginName: "alice@example.com", + DisplayName: "Alice", + }}, + }) + + waitDelta(t, mFast, 1) + waitDelta(t, mAdd, 1) + waitDelta(t, mFilter, 1) + waitDelta(t, mUsers, 1) + waitDelta(t, mFull, 0) + + // Side effects. + nv, ok := lb.PeerByID(newPeer.ID) + if !ok || nv.ID() != newPeer.ID { + t.Errorf("PeerByID(%d) ok=%v node=%v", newPeer.ID, ok, nv) + } + uv, ok := lb.UserProfile(newUser) + if !ok || uv.LoginName() != "alice@example.com" { + t.Errorf("UserProfile(%d) ok=%v login=%q", newUser, ok, uv.LoginName()) + } + pf := lb.GetFilterForTest() + if got := pf.Check(netip.MustParseAddr("100.64.0.42"), selfIP4, 22, ipproto.TCP); got != filter.Accept { + t.Errorf("packet filter Check from new peer = %s; want Accept", got) + } + }) + + t.Run("peer_patch_derp_home", func(t *testing.T) { + // Patch the initial peer's DERPRegion via PeersChangedPatch. + // This rides as NodeMutationDERPHome. + sendDelta(t, &tailcfg.MapResponse{ + PeersChangedPatch: []*tailcfg.PeerChange{{ + NodeID: 2, + DERPRegion: 7, + }}, + }) + + waitDelta(t, mFast, 1) + waitDelta(t, mPatch, 1) + waitDelta(t, mFull, 0) + + nv, ok := lb.PeerByID(2) + if !ok { + t.Fatalf("PeerByID(2) not found") + } + if got := nv.HomeDERP(); got != 7 { + t.Errorf("HomeDERP = %d, want 7", got) + } + }) + + t.Run("peer_online_and_last_seen", func(t *testing.T) { + // Online + LastSeen on the same delta. PeerSeenChange's value + // is true to set LastSeen, false to clear it; the time it gets + // is now() at the time MutationsFromMapResponse runs on the + // client, not a wire value. + sendDelta(t, &tailcfg.MapResponse{ + OnlineChange: map[tailcfg.NodeID]bool{2: true}, + PeerSeenChange: map[tailcfg.NodeID]bool{2: true}, + }) + + waitDelta(t, mFast, 1) + // Two mutations: one NodeMutationOnline + one NodeMutationLastSeen. + waitDelta(t, mPatch, 2) + waitDelta(t, mFull, 0) + + nv, ok := lb.PeerByID(2) + if !ok { + t.Fatalf("PeerByID(2) not found") + } + if o := nv.Online(); !o.Valid() || !o.Get() { + t.Errorf("Online = %v, want true", o) + } + }) + + t.Run("peer_removed", func(t *testing.T) { + if addedPeerID == 0 { + t.Fatal("peer_added_with_filter_and_user_profile must run first") + } + // Sanity check: the peer should currently exist. + if _, ok := lb.PeerByID(addedPeerID); !ok { + t.Fatalf("PeerByID(%d) missing before removal", addedPeerID) + } + + sendDelta(t, &tailcfg.MapResponse{ + PeersRemoved: []tailcfg.NodeID{addedPeerID}, + }) + + waitDelta(t, mFast, 1) + waitDelta(t, mRem, 1) + waitDelta(t, mFull, 0) + + if _, ok := lb.PeerByID(addedPeerID); ok { + t.Errorf("PeerByID(%d) still present after PeersRemoved", addedPeerID) + } + }) +} diff --git a/tstest/largetailnet/largetailnet_test.go b/tstest/largetailnet/largetailnet_test.go index 07f67df82..c9ebb1532 100644 --- a/tstest/largetailnet/largetailnet_test.go +++ b/tstest/largetailnet/largetailnet_test.go @@ -40,7 +40,7 @@ // processing peer-add/peer-remove deltas in steady state, with no IPN bus // subscribers attached. This represents the headless-tailscaled workload // (Linux subnet routers, container sidecars, ...) where the LocalBackend -// does not pay for fanning Notify.NetMap out to GUI watchers. +// does not pay for fanning Notify events out to GUI watchers. // // Use [BenchmarkGiantTailnetBusWatcher] for the GUI-client workload. // @@ -54,9 +54,9 @@ func BenchmarkGiantTailnet(b *testing.B) { // BenchmarkGiantTailnetBusWatcher is like [BenchmarkGiantTailnet] but // attaches one [local.Client.WatchIPNBus] subscriber for the duration of the -// benchmark. The Notify-fan-out cost (notably Notify.NetMap encoding to -// every watcher on every full-rebuild path) is therefore included in the -// per-delta measurement, which approximates the GUI-client workload. +// benchmark. The Notify-fan-out cost (per-watcher encoding done on every +// full-rebuild path) is therefore included in the per-delta measurement, +// which approximates the GUI-client workload. // // The benchmark is opt-in via --actually-test-giant-tailnet. func BenchmarkGiantTailnetBusWatcher(b *testing.B) { @@ -160,15 +160,17 @@ func benchGiantTailnet(b *testing.B, busWatcher bool) { notifyCh = make(chan struct{}, 1024) go func() { for { - n, err := bw.Next() - if err != nil { + if _, err := bw.Next(); err != nil { return } - if n.NetMap != nil || len(n.PeerChanges) > 0 { - select { - case notifyCh <- struct{}{}: - default: - } + // Any notify counts as a per-delta ack: peer add/remove + // in the delta path emits Notify.PeersChanged / + // Notify.PeersRemoved, peer patches emit + // Notify.PeerChanges, and self-node updates emit + // Notify.SelfChange. + select { + case notifyCh <- struct{}{}: + default: } } }() diff --git a/types/netmap/nodemut.go b/types/netmap/nodemut.go index 901296b1f..aa91d03ea 100644 --- a/types/netmap/nodemut.go +++ b/types/netmap/nodemut.go @@ -68,6 +68,25 @@ func (m NodeMutationLastSeen) Apply(n *tailcfg.Node) { n.LastSeen = new(m.LastSeen) } +// NodeMutationAdd is a NodeMutation that says a new peer has been added. +// Apply is a no-op: consumers of NodeMutationAdd must type-switch to handle +// adds by inserting Node into their peer map. +type NodeMutationAdd struct { + Node tailcfg.NodeView +} + +func (m NodeMutationAdd) NodeIDBeingMutated() tailcfg.NodeID { return m.Node.ID() } +func (m NodeMutationAdd) Apply(*tailcfg.Node) {} + +// NodeMutationRemove is a NodeMutation that says a peer has been removed. +// Apply is a no-op: consumers of NodeMutationRemove must type-switch to handle +// removes by deleting the node from their peer map. +type NodeMutationRemove struct { + mutatingNodeID +} + +func (m NodeMutationRemove) Apply(*tailcfg.Node) {} + var peerChangeFields = sync.OnceValue(func() []reflect.StructField { var fields []reflect.StructField rt := reflect.TypeFor[tailcfg.PeerChange]() @@ -110,8 +129,12 @@ func NodeMutationsFromPatch(p *tailcfg.PeerChange) (_ []NodeMutation, ok bool) { } // MutationsFromMapResponse returns all the discrete node mutations described -// by res. It returns ok=false if res contains any non-patch field as defined +// by res. It returns ok=false if res contains any non-delta field as defined // by mapResponseContainsNonPatchFields. +// +// Adds and removes (from res.PeersChanged / res.PeersRemoved) are emitted as +// NodeMutationAdd / NodeMutationRemove entries. Callers must type-switch to +// handle those alongside field mutations. func MutationsFromMapResponse(res *tailcfg.MapResponse, now time.Time) (ret []NodeMutation, ok bool) { if now.IsZero() { now = time.Now() @@ -119,8 +142,15 @@ func MutationsFromMapResponse(res *tailcfg.MapResponse, now time.Time) (ret []No if mapResponseContainsNonPatchFields(res) { return nil, false } - // All that remains is PeersChangedPatch, OnlineChange, and LastSeenChange. + for _, id := range res.PeersRemoved { + ret = append(ret, NodeMutationRemove{mutatingNodeID(id)}) + } + for _, n := range res.PeersChanged { + // Any n still in PeersChanged after patchifyPeersChanged is a + // truly-new (or replaced) peer. + ret = append(ret, NodeMutationAdd{Node: n.View()}) + } for _, p := range res.PeersChangedPatch { deltas, ok := NodeMutationsFromPatch(p) if !ok { @@ -142,25 +172,26 @@ func MutationsFromMapResponse(res *tailcfg.MapResponse, now time.Time) (ret []No return ret, true } -// mapResponseContainsNonPatchFields reports whether res contains only "patch" -// fields set (PeersChangedPatch primarily, but also including the legacy -// PeerSeenChange and OnlineChange fields). +// mapResponseContainsNonPatchFields reports whether res contains any field +// that can't be expressed as a per-peer NodeMutation (including the new +// NodeMutationAdd / NodeMutationRemove variants) or via the sibling narrow +// setter methods on the map-session backend (e.g. UpdatePacketFilter). // -// It ignores any of the meta fields that are handled by PollNetMap before the -// peer change handling gets involved. +// When this returns true, the caller must fall back to rebuilding and +// dispatching a full NetworkMap. When it returns false, the response can be +// handled incrementally. // -// The purpose of this function is to ask whether this is a tricky enough -// MapResponse to warrant a full netmap update. When this returns false, it -// means the response can be handled incrementally, patching up the local state. +// PeersChanged, PeersRemoved, and PacketFilter(s) are intentionally not in +// this list: new/removed peers ride NodeMutationAdd/Remove, packet +// filter updates are delivered via the backend's UpdatePacketFilter +// method, and UserProfile updates ride the backend's UpdateUserProfiles +// method. func mapResponseContainsNonPatchFields(res *tailcfg.MapResponse) bool { return res.Node != nil || res.DERPMap != nil || res.DNSConfig != nil || res.Domain != "" || res.CollectServices != "" || - res.PacketFilter != nil || - res.PacketFilters != nil || - res.UserProfiles != nil || res.Health != nil || res.DisplayMessages != nil || res.SSHPolicy != nil || @@ -170,11 +201,5 @@ func mapResponseContainsNonPatchFields(res *tailcfg.MapResponse) bool { res.ControlDialPlan != nil || res.ClientVersion != nil || res.Peers != nil || - res.PeersRemoved != nil || - // PeersChanged is too coarse to be considered a patch. Also, we convert - // PeersChanged to PeersChangedPatch in patchifyPeersChanged before this - // function is called, so it should never be set anyway. But for - // completedness, and for tests, check it too: - res.PeersChanged != nil || res.DeprecatedDefaultAutoUpdate != "" } diff --git a/types/netmap/nodemut_test.go b/types/netmap/nodemut_test.go index 1ae2ab1f9..965e9a367 100644 --- a/types/netmap/nodemut_test.go +++ b/types/netmap/nodemut_test.go @@ -52,7 +52,16 @@ func TestMapResponseContainsNonPatchFields(t *testing.T) { // They should be ignored. want = false case "PeersChangedPatch", "PeerSeenChange", "OnlineChange": - // The actual three delta fields we care about handling. + // The three legacy delta fields handled via NodeMutation patches. + want = false + case "PeersChanged", "PeersRemoved": + // Now carried as NodeMutationAdd / NodeMutationRemove entries. + want = false + case "PacketFilter", "PacketFilters": + // Now delivered separately via PacketFilterUpdater. + want = false + case "UserProfiles": + // Now delivered separately via UserProfileUpdater. want = false default: // Everything else should be conseratively handled as a @@ -175,6 +184,36 @@ func TestMutationsFromMapResponse(t *testing.T) { }, want: nil, }, + { + name: "peer-removed", + mr: &tailcfg.MapResponse{ + PeersRemoved: []tailcfg.NodeID{5}, + }, + want: muts(NodeMutationRemove{5}), + }, + { + name: "peer-added", + mr: &tailcfg.MapResponse{ + PeersChanged: []*tailcfg.Node{{ID: 7}}, + }, + want: muts(NodeMutationAdd{Node: (&tailcfg.Node{ID: 7}).View()}), + }, + { + name: "add-and-remove-mixed-with-patch", + mr: &tailcfg.MapResponse{ + PeersRemoved: []tailcfg.NodeID{3}, + PeersChanged: []*tailcfg.Node{{ID: 7}}, + PeersChangedPatch: []*tailcfg.PeerChange{{ + NodeID: 5, + DERPRegion: 2, + }}, + }, + want: muts( + NodeMutationRemove{3}, + NodeMutationDERPHome{5, 2}, + NodeMutationAdd{Node: (&tailcfg.Node{ID: 7}).View()}, + ), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -188,11 +227,13 @@ func TestMutationsFromMapResponse(t *testing.T) { if diff := cmp.Diff(tt.want, got, cmp.Comparer(func(a, b netip.Addr) bool { return a == b }), cmp.Comparer(func(a, b netip.AddrPort) bool { return a == b }), + cmp.Comparer(func(a, b tailcfg.NodeView) bool { return a.ID() == b.ID() }), cmp.AllowUnexported( NodeMutationEndpoints{}, NodeMutationDERPHome{}, NodeMutationOnline{}, NodeMutationLastSeen{}, + NodeMutationRemove{}, )); diff != "" { t.Errorf("wrong result (-want +got):\n%s", diff) } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 6461c552e..442e26579 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -3229,7 +3229,7 @@ func (c *Conn) UpsertPeer(n tailcfg.NodeView) { return } flags := c.debugFlagsLocked() - c.peersByID[n.ID()] = n + mak.Set(&c.peersByID, n.ID(), n) c.upsertPeerLocked(n, flags, debugRingBufferSize(len(c.peersByID))) var relayUpsert candidatePeerRelay