diff --git a/cmd/k8s-operator/depaware.txt b/cmd/k8s-operator/depaware.txt index 6a6e7d61f..14d246572 100644 --- a/cmd/k8s-operator/depaware.txt +++ b/cmd/k8s-operator/depaware.txt @@ -906,6 +906,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/ tailscale.com/types/bools from tailscale.com/tsnet+ tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/ipproto from tailscale.com/net/flowtrack+ tailscale.com/types/key from tailscale.com/client/local+ tailscale.com/types/lazy from tailscale.com/ipn/ipnlocal+ diff --git a/cmd/tailscaled/depaware-min.txt b/cmd/tailscaled/depaware-min.txt index a2d20deda..de8c2953e 100644 --- a/cmd/tailscaled/depaware-min.txt +++ b/cmd/tailscaled/depaware-min.txt @@ -126,6 +126,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/dnstype from tailscale.com/client/tailscale/apitype+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/control/controlbase+ diff --git a/cmd/tailscaled/depaware-minbox.txt b/cmd/tailscaled/depaware-minbox.txt index 5121b56d0..87c410319 100644 --- a/cmd/tailscaled/depaware-minbox.txt +++ b/cmd/tailscaled/depaware-minbox.txt @@ -151,6 +151,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/dnstype from tailscale.com/client/tailscale/apitype+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/client/local+ diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index da480d1a6..46562548b 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -399,6 +399,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/bools from tailscale.com/wgengine/netlog tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/net/flowtrack+ tailscale.com/types/key from tailscale.com/client/local+ diff --git a/cmd/tsidp/depaware.txt b/cmd/tsidp/depaware.txt index e29ae9348..b93af7d5b 100644 --- a/cmd/tsidp/depaware.txt +++ b/cmd/tsidp/depaware.txt @@ -308,6 +308,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar tailscale.com/types/bools from tailscale.com/tsnet+ tailscale.com/types/dnstype from tailscale.com/client/local+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/client/local+ tailscale.com/types/lazy from tailscale.com/cmd/tsidp+ diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index fe227b45e..57d5521d1 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -452,7 +452,9 @@ func (mrs mapRoutineState) UpdateFullNetmap(nm *netmap.NetworkMap) { c.sendStatus("mapRoutine-got-netmap", nil, "", nm) } // Reset the backoff timer if we got a netmap. + c.mu.Lock() mrs.bo.Reset() + c.mu.Unlock() } func (mrs mapRoutineState) UpdateNetmapDelta(muts []netmap.NodeMutation) bool { @@ -526,13 +528,18 @@ func (c *Auto) mapRoutine() { c.mu.Lock() c.inMapPoll = false paused := c.paused - c.mu.Unlock() if paused { mrs.bo.BackOff(ctx, nil) - c.logf("mapRoutine: paused") } else { mrs.bo.BackOff(ctx, err) + } + c.mu.Unlock() + + // Now safe to call functions that might acquire the mutex + if paused { + c.logf("mapRoutine: paused") + } else { report(err, "PollNetMap") } } diff --git a/control/controlclient/direct.go b/control/controlclient/direct.go index bd9f4d42d..db44c03f8 100644 --- a/control/controlclient/direct.go +++ b/control/controlclient/direct.go @@ -369,11 +369,13 @@ func NewDirect(opts Options) (*Direct, error) { if !ok { return } + c.logf("controlclient direct: updating discoKey for %v via mapSession", update.Src) c.streamingMapSession.updateDiscoForNode( peer.ID(), update.Key, time.Now(), false) } else { // We do not yet have a mapSession, perhaps because we don't have a // connection to control. Punt the handling down to userspace+magicsock. + c.logf("controlclient direct: updating discoKey for %v via magicsock", update.Src) go discoKeyPub.Publish(update) } }) @@ -852,19 +854,23 @@ func (c *Direct) PollNetMap(ctx context.Context, nu NetmapUpdater) error { type rememberLastNetmapUpdater struct { last *netmap.NetworkMap + done chan (any) } func (nu *rememberLastNetmapUpdater) UpdateFullNetmap(nm *netmap.NetworkMap) { nu.last = nm + nu.done <- nil } // FetchNetMapForTest fetches the netmap once. func (c *Direct) FetchNetMapForTest(ctx context.Context) (*netmap.NetworkMap, error) { var nu rememberLastNetmapUpdater + nu.done = make(chan any) err := c.sendMapRequest(ctx, false, &nu) if err == nil && nu.last == nil { return nil, errors.New("[unexpected] sendMapRequest success without callback") } + <-nu.done return nu.last, err } @@ -1110,7 +1116,11 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap sess := newMapSession(persist.PrivateNodeKey(), nu, c.controlKnobs) defer sess.Close() c.streamingMapSession = sess - defer func() { c.streamingMapSession = nil }() + defer func() { + c.mu.Lock() + c.streamingMapSession = nil + c.mu.Unlock() + }() sess.cancel = cancel sess.logf = c.logf sess.vlogf = vlogf @@ -1264,7 +1274,7 @@ func NetmapFromMapResponseForDebug(ctx context.Context, pr persist.PersistView, return nil, errors.New("PersistView invalid") } - nu := &rememberLastNetmapUpdater{} + nu := &rememberLastNetmapUpdater{done: make(chan any)} sess := newMapSession(pr.PrivateNodeKey(), nu, nil) defer sess.Close() @@ -1272,6 +1282,7 @@ func NetmapFromMapResponseForDebug(ctx context.Context, pr persist.PersistView, return nil, fmt.Errorf("HandleNonKeepAliveMapResponse: %w", err) } + <-nu.done return sess.netmap(), nil } diff --git a/control/controlclient/map.go b/control/controlclient/map.go index 7f421254f..96362a1b1 100644 --- a/control/controlclient/map.go +++ b/control/controlclient/map.go @@ -120,7 +120,7 @@ func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnob cancel: func() {}, onDebug: func(context.Context, *tailcfg.Debug) error { return nil }, onSelfNodeChanged: func(*netmap.NetworkMap) {}, - changeQueue: make(chan *tailcfg.MapResponse, 16), + changeQueue: make(chan *tailcfg.MapResponse, 32), } ms.sessionAliveCtx, ms.sessionAliveCtxClose = context.WithCancel(context.Background()) go ms.run() @@ -133,7 +133,16 @@ func (ms *mapSession) run() { case change := <-ms.changeQueue: ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, change) case <-ms.sessionAliveCtx.Done(): - return + // Drain any remaining items in the queue before exiting + for { + select { + case change := <-ms.changeQueue: + ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, change) + default: + // Queue is empty, exit + return + } + } } } } @@ -169,7 +178,7 @@ func (ms *mapSession) updateDiscoForNode(id tailcfg.NodeID, key key.DiscoPublic, DiscoKey: &key, } resp.PeersChangedPatch = append(resp.PeersChangedPatch, change) - ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, &resp) + ms.changeQueue <- &resp } func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error { @@ -322,23 +331,32 @@ func (ms *mapSession) removeUnwantedDiscoUpdates(resp *tailcfg.MapResponse) { filtered := resp.PeersChangedPatch[:0] for _, change := range resp.PeersChangedPatch { - keep := false - - if *change.Online { - keep = true - } else { - existingNode := existingMap.Peers[existingMap.PeerIndexByNodeID(change.NodeID)] - - if existingLastSeen, ok := existingNode.LastSeen().GetOk(); ok && - change.LastSeen.After(existingLastSeen) { - keep = true - } + // Accept if: + // - DiscoKey is nil and did not change. + // - Fields we rely on for rejection is missing. + if change.DiscoKey == nil || change.Online == nil || change.LastSeen == nil { + filtered = append(filtered, change) + continue } - if keep { + // Accept if: + // - Node is online. + if *change.Online { + filtered = append(filtered, change) + continue + } + + existingNode := existingMap.Peers[existingMap.PeerIndexByNodeID(change.NodeID)] + + // Accept if: + // - lastSeen moved forward in time. + if existingLastSeen, ok := existingNode.LastSeen().GetOk(); ok && + change.LastSeen.After(existingLastSeen) { filtered = append(filtered, change) } } + + resp.PeersChangedPatch = filtered } // updateStateFromResponse updates ms from res. It takes ownership of res. diff --git a/control/controlclient/map_test.go b/control/controlclient/map_test.go index 11d4593f0..95cac9693 100644 --- a/control/controlclient/map_test.go +++ b/control/controlclient/map_test.go @@ -12,6 +12,7 @@ "net/netip" "reflect" "strings" + "sync" "sync/atomic" "testing" "time" @@ -395,7 +396,13 @@ func formatNodes(nodes []*tailcfg.Node) string { func newTestMapSession(t testing.TB, nu NetmapUpdater) *mapSession { ms := newMapSession(key.NewNode(), nu, new(controlknobs.Knobs)) t.Cleanup(ms.Close) - ms.logf = t.Logf + var mu sync.Mutex + logger := func(format string, args ...any) { + mu.Lock() + defer mu.Unlock() + t.Logf(format, args...) + } + ms.logf = logger return ms } @@ -1099,6 +1106,8 @@ func BenchmarkMapSessionDelta(b *testing.B) { ctx := context.Background() nu := &countingNetmapUpdater{} ms := newTestMapSession(b, nu) + // We're benchmarking, and this is making the logger sad + ms.logf = func(string, ...any) {} res := &tailcfg.MapResponse{ Node: &tailcfg.Node{ ID: 1, diff --git a/tsnet/depaware.txt b/tsnet/depaware.txt index 5b08200c9..b58376544 100644 --- a/tsnet/depaware.txt +++ b/tsnet/depaware.txt @@ -303,6 +303,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware) tailscale.com/types/bools from tailscale.com/tsnet+ tailscale.com/types/dnstype from tailscale.com/client/local+ tailscale.com/types/empty from tailscale.com/ipn+ + tailscale.com/types/events from tailscale.com/control/controlclient+ tailscale.com/types/ipproto from tailscale.com/ipn+ tailscale.com/types/key from tailscale.com/client/local+ tailscale.com/types/lazy from tailscale.com/hostinfo+ diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index aad5793f7..156e58bca 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -361,9 +361,8 @@ func (de *endpoint) setProbeUDPLifetimeConfigLocked(desired *ProbeUDPLifetimeCon // endpointDisco is the current disco key and short string for an endpoint. This // structure is immutable. type endpointDisco struct { - key key.DiscoPublic // for discovery messages. - short string // ShortString of discoKey. - viaTSMP bool // the key was learned via TSMP + key key.DiscoPublic // for discovery messages. + short string // ShortString of discoKey. } type sentPing struct { @@ -1466,52 +1465,15 @@ func (de *endpoint) setLastPing(ipp netip.AddrPort, now mono.Time) { state.lastPing = now } -// updateDiscoKeyLocked takes in a new discoKey for the endpoint to be updated. -// It ensures that the new key is not a previously held key. This prevents -// control from overwriting a key set by TSMP in a case where the endpoint -// represented by de is unable to contact control and has shared its disco key -// via TSMP. If key is a previously held key, this method is a noop. de.mu must -// be held while calling. The return value is true if we stored a different key -// but false if the key is the same or a previously used key. -func (de *endpoint) updateDiscoKeyLocked(key *key.DiscoPublic, viaTSMP bool) bool { +func (de *endpoint) updateDiscoKey(key *key.DiscoPublic) { if key == nil { de.disco.Store((*endpointDisco)(nil)) - return true - } - - epDisco := de.disco.Load() - // Existing key is nil, set new key - if epDisco == nil { + } else { de.disco.Store(&endpointDisco{ - key: *key, - short: key.ShortString(), - viaTSMP: viaTSMP, + key: *key, + short: key.ShortString(), }) - return true } - - // Key is the same. If we had learned it via TSMP before and are now getting - // it via control, update the field, but do not change the key. - if epDisco.key.Compare(*key) == 0 { - if epDisco.viaTSMP && !viaTSMP { - epDisco.viaTSMP = false - de.disco.Store(epDisco) - } - return false - } - - // The new key is from control but the old one viaTSMP, do nothing. - if !viaTSMP && epDisco.viaTSMP { - return false - } - - // New key that needs to be stored. - de.disco.Store(&endpointDisco{ - key: *key, - short: key.ShortString(), - viaTSMP: viaTSMP, - }) - return true } // updateFromNode updates the endpoint based on a tailcfg.Node from a NetMap @@ -1540,14 +1502,11 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p if discoKey != n.DiscoKey() { de.c.logf("[v1] magicsock: disco: node %s changed from %s to %s", de.publicKey.ShortString(), discoKey, n.DiscoKey()) key := n.DiscoKey() - keyChanged := de.updateDiscoKeyLocked(&key, false) + de.updateDiscoKey(&key) de.debugUpdates.Add(EndpointChange{ When: time.Now(), What: "updateFromNode-resetLocked", }) - if keyChanged { - de.resetLocked() - } } if n.HomeDERP() == 0 { if de.derpAddr.IsValid() { diff --git a/wgengine/magicsock/endpoint_test.go b/wgengine/magicsock/endpoint_test.go index 1fc386d09..43ff012c7 100644 --- a/wgengine/magicsock/endpoint_test.go +++ b/wgengine/magicsock/endpoint_test.go @@ -453,57 +453,3 @@ func Test_endpoint_udpRelayEndpointReady(t *testing.T) { }) } } - -func TestUpdateDiscoKey(t *testing.T) { - t.Run("SetKeyNotTSMP", func(t *testing.T) { - de := &endpoint{} - newKey := key.NewDisco().Public() - de.updateDiscoKeyLocked(&newKey, false) - if newKey.Compare(de.disco.Load().key) != 0 { - t.Errorf("disco keys not equal, expected %v, got %v", newKey, de.disco.Load().key) - } - }) - - t.Run("SetKeyTSMP", func(t *testing.T) { - de := &endpoint{} - newKey := key.NewDisco().Public() - de.updateDiscoKeyLocked(&newKey, true) - if newKey.Compare(de.disco.Load().key) != 0 { - t.Errorf("disco keys not equal, expected %v, got %v", newKey, de.disco.Load().key) - } - }) - - t.Run("SetNilKey", func(t *testing.T) { - de := &endpoint{} - de.updateDiscoKeyLocked(nil, false) - if de.disco.Load() != nil { - t.Errorf("disco keys not equal, expected %v, got %v", nil, de.disco.Load().key) - } - }) - - t.Run("SetPreviouslySetKey", func(t *testing.T) { - de := &endpoint{} - oldKey := key.NewDisco().Public() - newKey := key.NewDisco().Public() - de.updateDiscoKeyLocked(&oldKey, false) - de.updateDiscoKeyLocked(&newKey, true) - de.updateDiscoKeyLocked(&oldKey, false) // <- Should not change the key - - if newKey.Compare(de.disco.Load().key) != 0 { - t.Errorf("disco keys not equal, expected %v, got %v", newKey, de.disco.Load().key) - } - - de.updateDiscoKeyLocked(&newKey, false) // <- Connected to control - if de.disco.Load().viaTSMP { - t.Errorf("disco keys is learned viaTSMP, expected false") - } - - newerKey := key.NewDisco().Public() - de.updateDiscoKeyLocked(&newerKey, true) // <- No longer connected to control - de.updateDiscoKeyLocked(&oldKey, false) - de.updateDiscoKeyLocked(&newKey, false) - if newerKey.Compare(de.disco.Load().key) != 0 { - t.Errorf("disco keys not equal, expected %v, got %v", newerKey, de.disco.Load().key) - } - }) -} diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 75cad4052..d7d4079e0 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -3180,14 +3180,12 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) { ep.nodeAddr = n.Addresses().At(0).Addr() } ep.initFakeUDPAddr() - ep.mu.Lock() if n.DiscoKey().IsZero() { - ep.updateDiscoKeyLocked(nil, false) + ep.updateDiscoKey(nil) } else { key := n.DiscoKey() - ep.updateDiscoKeyLocked(&key, false) + ep.updateDiscoKey(&key) } - ep.mu.Unlock() if debugPeerMap() { c.logEndpointCreated(n) @@ -4321,9 +4319,7 @@ func (c *Conn) HandleDiscoKeyAdvertisement(node tailcfg.NodeView, update packet. return } c.discoInfoForKnownPeerLocked(discoKey) - ep.mu.Lock() - ep.updateDiscoKeyLocked(&discoKey, true) - ep.mu.Unlock() + ep.updateDiscoKey(&discoKey) c.peerMap.upsertEndpoint(ep, oldDiscoKey) c.logf("magicsock: updated disco key for peer %v to %v", nodeKey.ShortString(), discoKey.ShortString()) metricTSMPDiscoKeyAdvertisementApplied.Add(1)