mirror of
https://github.com/tailscale/tailscale.git
synced 2026-02-19 07:04:43 -05:00
WIP: Simplify disco update logic
This commit is contained in:
@@ -906,6 +906,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
|
||||
tailscale.com/types/bools from tailscale.com/tsnet+
|
||||
tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+
|
||||
tailscale.com/types/empty from tailscale.com/ipn+
|
||||
tailscale.com/types/events from tailscale.com/control/controlclient+
|
||||
tailscale.com/types/ipproto from tailscale.com/net/flowtrack+
|
||||
tailscale.com/types/key from tailscale.com/client/local+
|
||||
tailscale.com/types/lazy from tailscale.com/ipn/ipnlocal+
|
||||
|
||||
@@ -126,6 +126,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
|
||||
tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal+
|
||||
tailscale.com/types/dnstype from tailscale.com/client/tailscale/apitype+
|
||||
tailscale.com/types/empty from tailscale.com/ipn+
|
||||
tailscale.com/types/events from tailscale.com/control/controlclient+
|
||||
tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled
|
||||
tailscale.com/types/ipproto from tailscale.com/ipn+
|
||||
tailscale.com/types/key from tailscale.com/control/controlbase+
|
||||
|
||||
@@ -151,6 +151,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
|
||||
tailscale.com/types/appctype from tailscale.com/ipn/ipnlocal+
|
||||
tailscale.com/types/dnstype from tailscale.com/client/tailscale/apitype+
|
||||
tailscale.com/types/empty from tailscale.com/ipn+
|
||||
tailscale.com/types/events from tailscale.com/control/controlclient+
|
||||
tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled
|
||||
tailscale.com/types/ipproto from tailscale.com/ipn+
|
||||
tailscale.com/types/key from tailscale.com/client/local+
|
||||
|
||||
@@ -399,6 +399,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
|
||||
tailscale.com/types/bools from tailscale.com/wgengine/netlog
|
||||
tailscale.com/types/dnstype from tailscale.com/ipn/ipnlocal+
|
||||
tailscale.com/types/empty from tailscale.com/ipn+
|
||||
tailscale.com/types/events from tailscale.com/control/controlclient+
|
||||
tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled
|
||||
tailscale.com/types/ipproto from tailscale.com/net/flowtrack+
|
||||
tailscale.com/types/key from tailscale.com/client/local+
|
||||
|
||||
@@ -308,6 +308,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar
|
||||
tailscale.com/types/bools from tailscale.com/tsnet+
|
||||
tailscale.com/types/dnstype from tailscale.com/client/local+
|
||||
tailscale.com/types/empty from tailscale.com/ipn+
|
||||
tailscale.com/types/events from tailscale.com/control/controlclient+
|
||||
tailscale.com/types/ipproto from tailscale.com/ipn+
|
||||
tailscale.com/types/key from tailscale.com/client/local+
|
||||
tailscale.com/types/lazy from tailscale.com/cmd/tsidp+
|
||||
|
||||
@@ -452,7 +452,9 @@ func (mrs mapRoutineState) UpdateFullNetmap(nm *netmap.NetworkMap) {
|
||||
c.sendStatus("mapRoutine-got-netmap", nil, "", nm)
|
||||
}
|
||||
// Reset the backoff timer if we got a netmap.
|
||||
c.mu.Lock()
|
||||
mrs.bo.Reset()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (mrs mapRoutineState) UpdateNetmapDelta(muts []netmap.NodeMutation) bool {
|
||||
@@ -526,13 +528,18 @@ func (c *Auto) mapRoutine() {
|
||||
c.mu.Lock()
|
||||
c.inMapPoll = false
|
||||
paused := c.paused
|
||||
c.mu.Unlock()
|
||||
|
||||
if paused {
|
||||
mrs.bo.BackOff(ctx, nil)
|
||||
c.logf("mapRoutine: paused")
|
||||
} else {
|
||||
mrs.bo.BackOff(ctx, err)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// Now safe to call functions that might acquire the mutex
|
||||
if paused {
|
||||
c.logf("mapRoutine: paused")
|
||||
} else {
|
||||
report(err, "PollNetMap")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,11 +369,13 @@ func NewDirect(opts Options) (*Direct, error) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.logf("controlclient direct: updating discoKey for %v via mapSession", update.Src)
|
||||
c.streamingMapSession.updateDiscoForNode(
|
||||
peer.ID(), update.Key, time.Now(), false)
|
||||
} else {
|
||||
// We do not yet have a mapSession, perhaps because we don't have a
|
||||
// connection to control. Punt the handling down to userspace+magicsock.
|
||||
c.logf("controlclient direct: updating discoKey for %v via magicsock", update.Src)
|
||||
go discoKeyPub.Publish(update)
|
||||
}
|
||||
})
|
||||
@@ -852,19 +854,23 @@ func (c *Direct) PollNetMap(ctx context.Context, nu NetmapUpdater) error {
|
||||
|
||||
type rememberLastNetmapUpdater struct {
|
||||
last *netmap.NetworkMap
|
||||
done chan (any)
|
||||
}
|
||||
|
||||
func (nu *rememberLastNetmapUpdater) UpdateFullNetmap(nm *netmap.NetworkMap) {
|
||||
nu.last = nm
|
||||
nu.done <- nil
|
||||
}
|
||||
|
||||
// FetchNetMapForTest fetches the netmap once.
|
||||
func (c *Direct) FetchNetMapForTest(ctx context.Context) (*netmap.NetworkMap, error) {
|
||||
var nu rememberLastNetmapUpdater
|
||||
nu.done = make(chan any)
|
||||
err := c.sendMapRequest(ctx, false, &nu)
|
||||
if err == nil && nu.last == nil {
|
||||
return nil, errors.New("[unexpected] sendMapRequest success without callback")
|
||||
}
|
||||
<-nu.done
|
||||
return nu.last, err
|
||||
}
|
||||
|
||||
@@ -1110,7 +1116,11 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
|
||||
sess := newMapSession(persist.PrivateNodeKey(), nu, c.controlKnobs)
|
||||
defer sess.Close()
|
||||
c.streamingMapSession = sess
|
||||
defer func() { c.streamingMapSession = nil }()
|
||||
defer func() {
|
||||
c.mu.Lock()
|
||||
c.streamingMapSession = nil
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
sess.cancel = cancel
|
||||
sess.logf = c.logf
|
||||
sess.vlogf = vlogf
|
||||
@@ -1264,7 +1274,7 @@ func NetmapFromMapResponseForDebug(ctx context.Context, pr persist.PersistView,
|
||||
return nil, errors.New("PersistView invalid")
|
||||
}
|
||||
|
||||
nu := &rememberLastNetmapUpdater{}
|
||||
nu := &rememberLastNetmapUpdater{done: make(chan any)}
|
||||
sess := newMapSession(pr.PrivateNodeKey(), nu, nil)
|
||||
defer sess.Close()
|
||||
|
||||
@@ -1272,6 +1282,7 @@ func NetmapFromMapResponseForDebug(ctx context.Context, pr persist.PersistView,
|
||||
return nil, fmt.Errorf("HandleNonKeepAliveMapResponse: %w", err)
|
||||
}
|
||||
|
||||
<-nu.done
|
||||
return sess.netmap(), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -120,7 +120,7 @@ func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnob
|
||||
cancel: func() {},
|
||||
onDebug: func(context.Context, *tailcfg.Debug) error { return nil },
|
||||
onSelfNodeChanged: func(*netmap.NetworkMap) {},
|
||||
changeQueue: make(chan *tailcfg.MapResponse, 16),
|
||||
changeQueue: make(chan *tailcfg.MapResponse, 32),
|
||||
}
|
||||
ms.sessionAliveCtx, ms.sessionAliveCtxClose = context.WithCancel(context.Background())
|
||||
go ms.run()
|
||||
@@ -133,7 +133,16 @@ func (ms *mapSession) run() {
|
||||
case change := <-ms.changeQueue:
|
||||
ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, change)
|
||||
case <-ms.sessionAliveCtx.Done():
|
||||
return
|
||||
// Drain any remaining items in the queue before exiting
|
||||
for {
|
||||
select {
|
||||
case change := <-ms.changeQueue:
|
||||
ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, change)
|
||||
default:
|
||||
// Queue is empty, exit
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -169,7 +178,7 @@ func (ms *mapSession) updateDiscoForNode(id tailcfg.NodeID, key key.DiscoPublic,
|
||||
DiscoKey: &key,
|
||||
}
|
||||
resp.PeersChangedPatch = append(resp.PeersChangedPatch, change)
|
||||
ms.handleNonKeepAliveMapResponse(ms.sessionAliveCtx, &resp)
|
||||
ms.changeQueue <- &resp
|
||||
}
|
||||
|
||||
func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error {
|
||||
@@ -322,23 +331,32 @@ func (ms *mapSession) removeUnwantedDiscoUpdates(resp *tailcfg.MapResponse) {
|
||||
filtered := resp.PeersChangedPatch[:0]
|
||||
|
||||
for _, change := range resp.PeersChangedPatch {
|
||||
keep := false
|
||||
|
||||
if *change.Online {
|
||||
keep = true
|
||||
} else {
|
||||
existingNode := existingMap.Peers[existingMap.PeerIndexByNodeID(change.NodeID)]
|
||||
|
||||
if existingLastSeen, ok := existingNode.LastSeen().GetOk(); ok &&
|
||||
change.LastSeen.After(existingLastSeen) {
|
||||
keep = true
|
||||
}
|
||||
// Accept if:
|
||||
// - DiscoKey is nil and did not change.
|
||||
// - Fields we rely on for rejection is missing.
|
||||
if change.DiscoKey == nil || change.Online == nil || change.LastSeen == nil {
|
||||
filtered = append(filtered, change)
|
||||
continue
|
||||
}
|
||||
|
||||
if keep {
|
||||
// Accept if:
|
||||
// - Node is online.
|
||||
if *change.Online {
|
||||
filtered = append(filtered, change)
|
||||
continue
|
||||
}
|
||||
|
||||
existingNode := existingMap.Peers[existingMap.PeerIndexByNodeID(change.NodeID)]
|
||||
|
||||
// Accept if:
|
||||
// - lastSeen moved forward in time.
|
||||
if existingLastSeen, ok := existingNode.LastSeen().GetOk(); ok &&
|
||||
change.LastSeen.After(existingLastSeen) {
|
||||
filtered = append(filtered, change)
|
||||
}
|
||||
}
|
||||
|
||||
resp.PeersChangedPatch = filtered
|
||||
}
|
||||
|
||||
// updateStateFromResponse updates ms from res. It takes ownership of res.
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
"net/netip"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -395,7 +396,13 @@ func formatNodes(nodes []*tailcfg.Node) string {
|
||||
func newTestMapSession(t testing.TB, nu NetmapUpdater) *mapSession {
|
||||
ms := newMapSession(key.NewNode(), nu, new(controlknobs.Knobs))
|
||||
t.Cleanup(ms.Close)
|
||||
ms.logf = t.Logf
|
||||
var mu sync.Mutex
|
||||
logger := func(format string, args ...any) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
t.Logf(format, args...)
|
||||
}
|
||||
ms.logf = logger
|
||||
return ms
|
||||
}
|
||||
|
||||
@@ -1099,6 +1106,8 @@ func BenchmarkMapSessionDelta(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
nu := &countingNetmapUpdater{}
|
||||
ms := newTestMapSession(b, nu)
|
||||
// We're benchmarking, and this is making the logger sad
|
||||
ms.logf = func(string, ...any) {}
|
||||
res := &tailcfg.MapResponse{
|
||||
Node: &tailcfg.Node{
|
||||
ID: 1,
|
||||
|
||||
@@ -303,6 +303,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware)
|
||||
tailscale.com/types/bools from tailscale.com/tsnet+
|
||||
tailscale.com/types/dnstype from tailscale.com/client/local+
|
||||
tailscale.com/types/empty from tailscale.com/ipn+
|
||||
tailscale.com/types/events from tailscale.com/control/controlclient+
|
||||
tailscale.com/types/ipproto from tailscale.com/ipn+
|
||||
tailscale.com/types/key from tailscale.com/client/local+
|
||||
tailscale.com/types/lazy from tailscale.com/hostinfo+
|
||||
|
||||
@@ -361,9 +361,8 @@ func (de *endpoint) setProbeUDPLifetimeConfigLocked(desired *ProbeUDPLifetimeCon
|
||||
// endpointDisco is the current disco key and short string for an endpoint. This
|
||||
// structure is immutable.
|
||||
type endpointDisco struct {
|
||||
key key.DiscoPublic // for discovery messages.
|
||||
short string // ShortString of discoKey.
|
||||
viaTSMP bool // the key was learned via TSMP
|
||||
key key.DiscoPublic // for discovery messages.
|
||||
short string // ShortString of discoKey.
|
||||
}
|
||||
|
||||
type sentPing struct {
|
||||
@@ -1466,52 +1465,15 @@ func (de *endpoint) setLastPing(ipp netip.AddrPort, now mono.Time) {
|
||||
state.lastPing = now
|
||||
}
|
||||
|
||||
// updateDiscoKeyLocked takes in a new discoKey for the endpoint to be updated.
|
||||
// It ensures that the new key is not a previously held key. This prevents
|
||||
// control from overwriting a key set by TSMP in a case where the endpoint
|
||||
// represented by de is unable to contact control and has shared its disco key
|
||||
// via TSMP. If key is a previously held key, this method is a noop. de.mu must
|
||||
// be held while calling. The return value is true if we stored a different key
|
||||
// but false if the key is the same or a previously used key.
|
||||
func (de *endpoint) updateDiscoKeyLocked(key *key.DiscoPublic, viaTSMP bool) bool {
|
||||
func (de *endpoint) updateDiscoKey(key *key.DiscoPublic) {
|
||||
if key == nil {
|
||||
de.disco.Store((*endpointDisco)(nil))
|
||||
return true
|
||||
}
|
||||
|
||||
epDisco := de.disco.Load()
|
||||
// Existing key is nil, set new key
|
||||
if epDisco == nil {
|
||||
} else {
|
||||
de.disco.Store(&endpointDisco{
|
||||
key: *key,
|
||||
short: key.ShortString(),
|
||||
viaTSMP: viaTSMP,
|
||||
key: *key,
|
||||
short: key.ShortString(),
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
// Key is the same. If we had learned it via TSMP before and are now getting
|
||||
// it via control, update the field, but do not change the key.
|
||||
if epDisco.key.Compare(*key) == 0 {
|
||||
if epDisco.viaTSMP && !viaTSMP {
|
||||
epDisco.viaTSMP = false
|
||||
de.disco.Store(epDisco)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// The new key is from control but the old one viaTSMP, do nothing.
|
||||
if !viaTSMP && epDisco.viaTSMP {
|
||||
return false
|
||||
}
|
||||
|
||||
// New key that needs to be stored.
|
||||
de.disco.Store(&endpointDisco{
|
||||
key: *key,
|
||||
short: key.ShortString(),
|
||||
viaTSMP: viaTSMP,
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
// updateFromNode updates the endpoint based on a tailcfg.Node from a NetMap
|
||||
@@ -1540,14 +1502,11 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p
|
||||
if discoKey != n.DiscoKey() {
|
||||
de.c.logf("[v1] magicsock: disco: node %s changed from %s to %s", de.publicKey.ShortString(), discoKey, n.DiscoKey())
|
||||
key := n.DiscoKey()
|
||||
keyChanged := de.updateDiscoKeyLocked(&key, false)
|
||||
de.updateDiscoKey(&key)
|
||||
de.debugUpdates.Add(EndpointChange{
|
||||
When: time.Now(),
|
||||
What: "updateFromNode-resetLocked",
|
||||
})
|
||||
if keyChanged {
|
||||
de.resetLocked()
|
||||
}
|
||||
}
|
||||
if n.HomeDERP() == 0 {
|
||||
if de.derpAddr.IsValid() {
|
||||
|
||||
@@ -453,57 +453,3 @@ func Test_endpoint_udpRelayEndpointReady(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDiscoKey(t *testing.T) {
|
||||
t.Run("SetKeyNotTSMP", func(t *testing.T) {
|
||||
de := &endpoint{}
|
||||
newKey := key.NewDisco().Public()
|
||||
de.updateDiscoKeyLocked(&newKey, false)
|
||||
if newKey.Compare(de.disco.Load().key) != 0 {
|
||||
t.Errorf("disco keys not equal, expected %v, got %v", newKey, de.disco.Load().key)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("SetKeyTSMP", func(t *testing.T) {
|
||||
de := &endpoint{}
|
||||
newKey := key.NewDisco().Public()
|
||||
de.updateDiscoKeyLocked(&newKey, true)
|
||||
if newKey.Compare(de.disco.Load().key) != 0 {
|
||||
t.Errorf("disco keys not equal, expected %v, got %v", newKey, de.disco.Load().key)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("SetNilKey", func(t *testing.T) {
|
||||
de := &endpoint{}
|
||||
de.updateDiscoKeyLocked(nil, false)
|
||||
if de.disco.Load() != nil {
|
||||
t.Errorf("disco keys not equal, expected %v, got %v", nil, de.disco.Load().key)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("SetPreviouslySetKey", func(t *testing.T) {
|
||||
de := &endpoint{}
|
||||
oldKey := key.NewDisco().Public()
|
||||
newKey := key.NewDisco().Public()
|
||||
de.updateDiscoKeyLocked(&oldKey, false)
|
||||
de.updateDiscoKeyLocked(&newKey, true)
|
||||
de.updateDiscoKeyLocked(&oldKey, false) // <- Should not change the key
|
||||
|
||||
if newKey.Compare(de.disco.Load().key) != 0 {
|
||||
t.Errorf("disco keys not equal, expected %v, got %v", newKey, de.disco.Load().key)
|
||||
}
|
||||
|
||||
de.updateDiscoKeyLocked(&newKey, false) // <- Connected to control
|
||||
if de.disco.Load().viaTSMP {
|
||||
t.Errorf("disco keys is learned viaTSMP, expected false")
|
||||
}
|
||||
|
||||
newerKey := key.NewDisco().Public()
|
||||
de.updateDiscoKeyLocked(&newerKey, true) // <- No longer connected to control
|
||||
de.updateDiscoKeyLocked(&oldKey, false)
|
||||
de.updateDiscoKeyLocked(&newKey, false)
|
||||
if newerKey.Compare(de.disco.Load().key) != 0 {
|
||||
t.Errorf("disco keys not equal, expected %v, got %v", newerKey, de.disco.Load().key)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3180,14 +3180,12 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
|
||||
ep.nodeAddr = n.Addresses().At(0).Addr()
|
||||
}
|
||||
ep.initFakeUDPAddr()
|
||||
ep.mu.Lock()
|
||||
if n.DiscoKey().IsZero() {
|
||||
ep.updateDiscoKeyLocked(nil, false)
|
||||
ep.updateDiscoKey(nil)
|
||||
} else {
|
||||
key := n.DiscoKey()
|
||||
ep.updateDiscoKeyLocked(&key, false)
|
||||
ep.updateDiscoKey(&key)
|
||||
}
|
||||
ep.mu.Unlock()
|
||||
|
||||
if debugPeerMap() {
|
||||
c.logEndpointCreated(n)
|
||||
@@ -4321,9 +4319,7 @@ func (c *Conn) HandleDiscoKeyAdvertisement(node tailcfg.NodeView, update packet.
|
||||
return
|
||||
}
|
||||
c.discoInfoForKnownPeerLocked(discoKey)
|
||||
ep.mu.Lock()
|
||||
ep.updateDiscoKeyLocked(&discoKey, true)
|
||||
ep.mu.Unlock()
|
||||
ep.updateDiscoKey(&discoKey)
|
||||
c.peerMap.upsertEndpoint(ep, oldDiscoKey)
|
||||
c.logf("magicsock: updated disco key for peer %v to %v", nodeKey.ShortString(), discoKey.ShortString())
|
||||
metricTSMPDiscoKeyAdvertisementApplied.Add(1)
|
||||
|
||||
Reference in New Issue
Block a user