wgengine/magicsock: add peer connection path metrics

Add tailscaled_peers_connected gauge metric tracking peers by path type
(direct_ipv4, direct_ipv6, derp, peer_relay_ipv4, peer_relay_ipv6).

Fixes #18534

Signed-off-by: Raj Singh <raj@tailscale.com>
This commit is contained in:
Raj Singh
2026-01-30 12:59:42 -06:00
parent 6de5b01e04
commit 62110c4ebd
3 changed files with 313 additions and 1 deletions

View File

@@ -99,6 +99,13 @@ type endpoint struct {
expired bool // whether the node has expired
isWireguardOnly bool // whether the endpoint is WireGuard only
relayCapable bool // whether the node is capable of speaking via a [tailscale.com/net/udprelay.Server]
// pathMetricExpiryTimer fires after inactivity to update peersConnected.
pathMetricExpiryTimer *time.Timer
// pathMetricExpiryAt is the scheduled expiry time for pathMetricExpiryTimer.
// It guards against timer reset races by ensuring callbacks only act when due.
pathMetricExpiryAt mono.Time
lastReportedPath Path // last path type reported in metrics; used for accurate decrement
}
// udpRelayEndpointReady determines whether the given relay [addrQuality] should
@@ -127,6 +134,7 @@ func (de *endpoint) udpRelayEndpointReady(maybeBest addrQuality) {
de.c.logf("magicsock: disco: node %v %v now using %v mtu=%v", de.publicKey.ShortString(), de.discoShort(), maybeBest.epAddr, maybeBest.wireMTU)
de.setBestAddrLocked(maybeBest)
de.trustBestAddrUntil = now.Add(trustUDPAddrDuration)
de.updatePathMetricLocked(now)
}
}
@@ -486,6 +494,7 @@ func (de *endpoint) deleteEndpointLocked(why string, ep netip.AddrPort) {
From: de.bestAddr,
})
de.setBestAddrLocked(addrQuality{})
de.updatePathMetricLocked(mono.Now())
}
}
@@ -509,6 +518,7 @@ func (de *endpoint) noteRecvActivity(src epAddr, now mono.Time) bool {
de.bestAddr.ap = src.ap
de.bestAddrAt = now
de.trustBestAddrUntil = now.Add(5 * time.Second)
de.updatePathMetricLocked(now)
de.mu.Unlock()
} else {
// TODO(jwhited): subject to change as part of silent disco effort.
@@ -518,6 +528,7 @@ func (de *endpoint) noteRecvActivity(src epAddr, now mono.Time) bool {
de.mu.Lock()
if de.heartbeatDisabled && de.bestAddr.epAddr == src {
de.trustBestAddrUntil = now.Add(trustUDPAddrDuration)
de.updatePathMetricLocked(now)
}
de.mu.Unlock()
}
@@ -525,6 +536,9 @@ func (de *endpoint) noteRecvActivity(src epAddr, now mono.Time) bool {
elapsed := now.Sub(de.lastRecvWG.LoadAtomic())
if elapsed > 10*time.Second {
de.lastRecvWG.StoreAtomic(now)
de.mu.Lock()
de.updatePathMetricLocked(now)
de.mu.Unlock()
if de.c.noteRecvActivity == nil {
return false
@@ -582,6 +596,9 @@ func (de *endpoint) addrForSendLocked(now mono.Time) (udpAddr epAddr, derpAddr n
// We had a bestAddr but it expired so send both to it
// and DERP.
if now.After(de.trustBestAddrUntil) && de.lastReportedPath != PathDERP && de.lastReportedPath != PathNone {
de.updatePathMetricLocked(now)
}
return udpAddr, de.derpAddr, false
}
@@ -639,6 +656,7 @@ func (de *endpoint) addrForWireGuardSendLocked(now mono.Time) (udpAddr epAddr, s
// less than one second in good cases, in which case this will be then
// extended to 15 seconds.
de.trustBestAddrUntil = now.Add(time.Second)
de.updatePathMetricLocked(now)
return udpAddr, needPing
}
@@ -949,7 +967,11 @@ func (de *endpoint) wantFullPingLocked(now mono.Time) bool {
}
func (de *endpoint) noteTxActivityExtTriggerLocked(now mono.Time) {
prev := de.lastSendExt
de.lastSendExt = now
if prev.IsZero() || now.Sub(prev) > sessionActiveTimeout {
de.updatePathMetricLocked(now)
}
if de.heartBeatTimer == nil && !de.heartbeatDisabled {
de.heartBeatTimer = time.AfterFunc(heartbeatInterval, de.heartbeat)
}
@@ -1181,6 +1203,7 @@ func (de *endpoint) discoPingTimeout(txid stun.TxID) {
if sp.to == de.bestAddr.epAddr && sp.to.vni.IsSet() && bestUntrusted {
// TODO(jwhited): consider applying this to direct UDP paths as well
de.clearBestAddrLocked()
de.updatePathMetricLocked(mono.Now())
}
if debugDisco() || !de.bestAddr.ap.IsValid() || bestUntrusted {
de.c.dlogf("[v1] magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid[:6], sp.to, de.publicKey.ShortString(), de.discoShort())
@@ -1525,6 +1548,7 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p
de.setEndpointsLocked(n.Endpoints())
de.relayCapable = capVerIsRelayCapable(n.Cap())
de.updatePathMetricLocked(mono.Now())
}
func (de *endpoint) setEndpointsLocked(eps interface {
@@ -1636,6 +1660,7 @@ func (de *endpoint) noteBadEndpoint(udpAddr epAddr) {
st.clear()
}
}
de.updatePathMetricLocked(mono.Now())
}
// noteConnectivityChange is called when connectivity changes enough
@@ -1650,6 +1675,7 @@ func (de *endpoint) noteConnectivityChange() {
for k := range de.endpointState {
de.endpointState[k].clear()
}
de.updatePathMetricLocked(mono.Now())
}
// pingSizeToPktLen calculates the minimum path MTU that would permit
@@ -1789,6 +1815,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src epAdd
de.bestAddrAt = now
de.trustBestAddrUntil = now.Add(trustUDPAddrDuration)
}
de.updatePathMetricLocked(now)
}
return
}
@@ -2024,6 +2051,149 @@ func (de *endpoint) populatePeerStatus(ps *ipnstate.PeerStatus) {
}
}
// currentPathTypeLocked returns the current connection path type for this endpoint.
// Returns PathNone if the peer is inactive (no send/recv within sessionActiveTimeout)
// or no path is available.
//
// de.mu must be held.
func (de *endpoint) currentPathTypeLocked(now mono.Time) Path {
lastActivity := de.lastSendExt
if recv := de.lastRecvWG.LoadAtomic(); recv.After(lastActivity) {
lastActivity = recv
}
if lastActivity.IsZero() || now.Sub(lastActivity) > sessionActiveTimeout {
return PathNone
}
if de.bestAddr.ap.IsValid() && !now.After(de.trustBestAddrUntil) {
if de.bestAddr.vni.IsSet() {
if de.bestAddr.ap.Addr().Is6() {
return PathPeerRelayIPv6
}
return PathPeerRelayIPv4
}
if de.bestAddr.ap.Addr().Is6() {
return PathDirectIPv6
}
return PathDirectIPv4
}
if de.derpAddr.IsValid() {
return PathDERP
}
return PathNone
}
// updatePathMetricLocked updates the peersConnected gauge to reflect the current path.
//
// de.mu must be held.
func (de *endpoint) updatePathMetricLocked(now mono.Time) {
if de.c == nil || de.c.metrics == nil {
return
}
current := de.currentPathTypeLocked(now)
if current != de.lastReportedPath {
de.addPathMetricLocked(de.lastReportedPath, -1)
de.addPathMetricLocked(current, 1)
de.lastReportedPath = current
}
de.schedulePathMetricExpiryLocked(now)
}
// addPathMetricLocked adjusts the peersConnected gauge for the provided path.
//
// de.mu must be held.
func (de *endpoint) addPathMetricLocked(path Path, delta int64) {
if path == PathNone || delta == 0 || de.c == nil || de.c.metrics == nil {
return
}
switch path {
case PathDirectIPv4:
de.c.metrics.peersConnectedDirectIPv4.Add(delta)
case PathDirectIPv6:
de.c.metrics.peersConnectedDirectIPv6.Add(delta)
case PathDERP:
de.c.metrics.peersConnectedDERP.Add(delta)
case PathPeerRelayIPv4:
de.c.metrics.peersConnectedPeerRelayIPv4.Add(delta)
case PathPeerRelayIPv6:
de.c.metrics.peersConnectedPeerRelayIPv6.Add(delta)
}
}
// schedulePathMetricExpiryLocked schedules a timer to re-evaluate the path metric
// after inactivity.
//
// de.mu must be held.
func (de *endpoint) schedulePathMetricExpiryLocked(now mono.Time) {
if de.lastReportedPath == PathNone {
de.stopPathMetricExpiryTimerLocked()
return
}
lastActivity := de.lastSendExt
if recv := de.lastRecvWG.LoadAtomic(); recv.After(lastActivity) {
lastActivity = recv
}
if lastActivity.IsZero() {
de.stopPathMetricExpiryTimerLocked()
return
}
expiry := lastActivity.Add(sessionActiveTimeout)
delay := expiry.Sub(now)
if delay <= 0 {
de.stopPathMetricExpiryTimerLocked()
return
}
de.pathMetricExpiryAt = expiry
if de.pathMetricExpiryTimer != nil {
de.pathMetricExpiryTimer.Reset(delay)
return
}
de.pathMetricExpiryTimer = time.AfterFunc(delay, de.pathMetricExpiryTimerFired)
}
// stopPathMetricExpiryTimerLocked stops and clears the path metric timer.
//
// de.mu must be held.
func (de *endpoint) stopPathMetricExpiryTimerLocked() {
if t := de.pathMetricExpiryTimer; t != nil {
t.Stop()
de.pathMetricExpiryTimer = nil
}
de.pathMetricExpiryAt = 0
}
func (de *endpoint) pathMetricExpiryTimerFired() {
de.mu.Lock()
defer de.mu.Unlock()
if de.pathMetricExpiryTimer == nil || de.pathMetricExpiryAt.IsZero() {
return
}
now := mono.Now()
if de.pathMetricExpiryAt.After(now) {
de.pathMetricExpiryTimer.Reset(de.pathMetricExpiryAt.Sub(now))
return
}
de.pathMetricExpiryTimer = nil
de.pathMetricExpiryAt = 0
de.updatePathMetricLocked(now)
}
// clearPathMetricLocked clears this endpoint's path metric. Used during shutdown.
//
// de.mu must be held.
func (de *endpoint) clearPathMetricLocked() {
if de.c == nil || de.c.metrics == nil {
return
}
if de.lastReportedPath != PathNone {
de.addPathMetricLocked(de.lastReportedPath, -1)
de.lastReportedPath = PathNone
}
de.stopPathMetricExpiryTimerLocked()
}
// stopAndReset stops timers associated with de and resets its state back to zero.
// It's called when a discovery endpoint is no longer present in the
// NetworkMap, or when magicsock is transitioning from running to
@@ -2033,6 +2203,9 @@ func (de *endpoint) stopAndReset() {
de.mu.Lock()
defer de.mu.Unlock()
// Decrement metric for this endpoint's current path before cleanup.
de.clearPathMetricLocked()
if closing := de.c.closing.Load(); !closing {
if de.isWireguardOnly {
de.c.logf("[v1] magicsock: doing cleanup for wireguard key %s", de.publicKey.ShortString())
@@ -2058,6 +2231,7 @@ func (de *endpoint) stopAndReset() {
func (de *endpoint) resetLocked() {
de.lastSendExt = 0
de.lastFullPing = 0
de.lastRecvWG.StoreAtomic(0)
de.clearBestAddrLocked()
for _, es := range de.endpointState {
es.lastPing = 0
@@ -2083,6 +2257,7 @@ func (de *endpoint) setDERPHome(regionID uint16) {
de.mu.Lock()
defer de.mu.Unlock()
de.derpAddr = netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(regionID))
de.updatePathMetricLocked(mono.Now())
if de.c.hasPeerRelayServers.Load() {
de.c.relayManager.handleDERPHomeChange(de.publicKey, regionID)
}

View File

@@ -453,3 +453,119 @@ func Test_endpoint_udpRelayEndpointReady(t *testing.T) {
})
}
}
func Test_endpoint_currentPathTypeLocked(t *testing.T) {
now := mono.Now()
active := now.Add(-time.Second)
inactive := now.Add(-sessionActiveTimeout - time.Second)
trusted := now.Add(time.Minute)
expired := now.Add(-time.Minute)
relayVNI := packet.VirtualNetworkID{}
relayVNI.Set(7)
tests := []struct {
name string
ep endpoint
at mono.Time
want Path
setRecv bool
}{
{
name: "inactive no activity",
ep: endpoint{},
at: now,
want: PathNone,
},
{
name: "inactive stale activity",
ep: endpoint{
lastSendExt: inactive,
bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("192.0.2.1:123")}},
trustBestAddrUntil: trusted,
},
at: now,
want: PathNone,
},
{
name: "direct ipv4 by send",
ep: endpoint{
lastSendExt: active,
bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("192.0.2.2:123")}},
trustBestAddrUntil: trusted,
},
at: now,
want: PathDirectIPv4,
},
{
name: "direct ipv6 by recv",
ep: endpoint{
bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("[2001:db8::1]:123")}},
trustBestAddrUntil: trusted,
},
at: now,
want: PathDirectIPv6,
setRecv: true,
},
{
name: "peer relay ipv4",
ep: endpoint{
lastSendExt: active,
bestAddr: addrQuality{epAddr: epAddr{
ap: netip.MustParseAddrPort("192.0.2.9:123"),
vni: relayVNI,
}},
trustBestAddrUntil: trusted,
},
at: now,
want: PathPeerRelayIPv4,
},
{
name: "peer relay ipv6",
ep: endpoint{
lastSendExt: active,
bestAddr: addrQuality{epAddr: epAddr{
ap: netip.MustParseAddrPort("[2001:db8::9]:123"),
vni: relayVNI,
}},
trustBestAddrUntil: trusted,
},
at: now,
want: PathPeerRelayIPv6,
},
{
name: "derp fallback when best addr expired",
ep: endpoint{
lastSendExt: active,
bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("192.0.2.3:123")}},
trustBestAddrUntil: expired,
derpAddr: netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, 1),
},
at: now,
want: PathDERP,
},
{
name: "active but no path",
ep: endpoint{
lastSendExt: active,
},
at: now,
want: PathNone,
},
}
for i := range tests {
tt := &tests[i]
t.Run(tt.name, func(t *testing.T) {
if tt.setRecv {
tt.ep.lastRecvWG.StoreAtomic(active)
}
tt.ep.mu.Lock()
got := tt.ep.currentPathTypeLocked(tt.at)
tt.ep.mu.Unlock()
if got != tt.want {
t.Fatalf("currentPathTypeLocked = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -91,6 +91,7 @@
type Path string
const (
PathNone Path = "" // no active connection path
PathDirectIPv4 Path = "direct_ipv4"
PathDirectIPv6 Path = "direct_ipv6"
PathDERP Path = "derp"
@@ -99,7 +100,7 @@
)
type pathLabel struct {
// Path indicates the path that the packet took:
// Path indicates the path classification used for packet/connection metrics:
// - direct_ipv4
// - direct_ipv6
// - derp
@@ -149,6 +150,14 @@ type metrics struct {
// outboundPacketsDroppedErrors is the total number of outbound packets
// dropped due to errors.
outboundPacketsDroppedErrors expvar.Int
// peersConnectedTotal is the number of active peer connections (recent
// send/recv within sessionActiveTimeout), labeled by current path.
peersConnectedDirectIPv4 expvar.Int
peersConnectedDirectIPv6 expvar.Int
peersConnectedDERP expvar.Int
peersConnectedPeerRelayIPv4 expvar.Int
peersConnectedPeerRelayIPv6 expvar.Int
}
// A Conn routes UDP packets and actively manages a list of its endpoints.
@@ -804,6 +813,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
"counter",
"Counts the number of bytes sent to other peers",
)
peersConnectedTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel](
reg,
"tailscaled_peers_connected",
"gauge",
"Counts the number of active peers by connection path",
)
outboundPacketsDroppedErrors := reg.DroppedPacketsOutbound()
m := new(metrics)
@@ -861,6 +876,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
outboundPacketsDroppedErrors.Set(usermetric.DropLabels{Reason: usermetric.ReasonError}, &m.outboundPacketsDroppedErrors)
peersConnectedTotal.Set(pathDirectV4, &m.peersConnectedDirectIPv4)
peersConnectedTotal.Set(pathDirectV6, &m.peersConnectedDirectIPv6)
peersConnectedTotal.Set(pathDERP, &m.peersConnectedDERP)
peersConnectedTotal.Set(pathPeerRelayV4, &m.peersConnectedPeerRelayIPv4)
peersConnectedTotal.Set(pathPeerRelayV6, &m.peersConnectedPeerRelayIPv6)
return m
}