diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 1f99f57ec..bca59ec9a 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -99,6 +99,13 @@ type endpoint struct { expired bool // whether the node has expired isWireguardOnly bool // whether the endpoint is WireGuard only relayCapable bool // whether the node is capable of speaking via a [tailscale.com/net/udprelay.Server] + + // pathMetricExpiryTimer fires after inactivity to update peersConnected. + pathMetricExpiryTimer *time.Timer + // pathMetricExpiryAt is the scheduled expiry time for pathMetricExpiryTimer. + // It guards against timer reset races by ensuring callbacks only act when due. + pathMetricExpiryAt mono.Time + lastReportedPath Path // last path type reported in metrics; used for accurate decrement } // udpRelayEndpointReady determines whether the given relay [addrQuality] should @@ -127,6 +134,7 @@ func (de *endpoint) udpRelayEndpointReady(maybeBest addrQuality) { de.c.logf("magicsock: disco: node %v %v now using %v mtu=%v", de.publicKey.ShortString(), de.discoShort(), maybeBest.epAddr, maybeBest.wireMTU) de.setBestAddrLocked(maybeBest) de.trustBestAddrUntil = now.Add(trustUDPAddrDuration) + de.updatePathMetricLocked(now) } } @@ -486,6 +494,7 @@ func (de *endpoint) deleteEndpointLocked(why string, ep netip.AddrPort) { From: de.bestAddr, }) de.setBestAddrLocked(addrQuality{}) + de.updatePathMetricLocked(mono.Now()) } } @@ -509,6 +518,7 @@ func (de *endpoint) noteRecvActivity(src epAddr, now mono.Time) bool { de.bestAddr.ap = src.ap de.bestAddrAt = now de.trustBestAddrUntil = now.Add(5 * time.Second) + de.updatePathMetricLocked(now) de.mu.Unlock() } else { // TODO(jwhited): subject to change as part of silent disco effort. @@ -518,6 +528,7 @@ func (de *endpoint) noteRecvActivity(src epAddr, now mono.Time) bool { de.mu.Lock() if de.heartbeatDisabled && de.bestAddr.epAddr == src { de.trustBestAddrUntil = now.Add(trustUDPAddrDuration) + de.updatePathMetricLocked(now) } de.mu.Unlock() } @@ -525,6 +536,9 @@ func (de *endpoint) noteRecvActivity(src epAddr, now mono.Time) bool { elapsed := now.Sub(de.lastRecvWG.LoadAtomic()) if elapsed > 10*time.Second { de.lastRecvWG.StoreAtomic(now) + de.mu.Lock() + de.updatePathMetricLocked(now) + de.mu.Unlock() if de.c.noteRecvActivity == nil { return false @@ -582,6 +596,9 @@ func (de *endpoint) addrForSendLocked(now mono.Time) (udpAddr epAddr, derpAddr n // We had a bestAddr but it expired so send both to it // and DERP. + if now.After(de.trustBestAddrUntil) && de.lastReportedPath != PathDERP && de.lastReportedPath != PathNone { + de.updatePathMetricLocked(now) + } return udpAddr, de.derpAddr, false } @@ -639,6 +656,7 @@ func (de *endpoint) addrForWireGuardSendLocked(now mono.Time) (udpAddr epAddr, s // less than one second in good cases, in which case this will be then // extended to 15 seconds. de.trustBestAddrUntil = now.Add(time.Second) + de.updatePathMetricLocked(now) return udpAddr, needPing } @@ -949,7 +967,11 @@ func (de *endpoint) wantFullPingLocked(now mono.Time) bool { } func (de *endpoint) noteTxActivityExtTriggerLocked(now mono.Time) { + prev := de.lastSendExt de.lastSendExt = now + if prev.IsZero() || now.Sub(prev) > sessionActiveTimeout { + de.updatePathMetricLocked(now) + } if de.heartBeatTimer == nil && !de.heartbeatDisabled { de.heartBeatTimer = time.AfterFunc(heartbeatInterval, de.heartbeat) } @@ -1181,6 +1203,7 @@ func (de *endpoint) discoPingTimeout(txid stun.TxID) { if sp.to == de.bestAddr.epAddr && sp.to.vni.IsSet() && bestUntrusted { // TODO(jwhited): consider applying this to direct UDP paths as well de.clearBestAddrLocked() + de.updatePathMetricLocked(mono.Now()) } if debugDisco() || !de.bestAddr.ap.IsValid() || bestUntrusted { de.c.dlogf("[v1] magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid[:6], sp.to, de.publicKey.ShortString(), de.discoShort()) @@ -1525,6 +1548,7 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, p de.setEndpointsLocked(n.Endpoints()) de.relayCapable = capVerIsRelayCapable(n.Cap()) + de.updatePathMetricLocked(mono.Now()) } func (de *endpoint) setEndpointsLocked(eps interface { @@ -1636,6 +1660,7 @@ func (de *endpoint) noteBadEndpoint(udpAddr epAddr) { st.clear() } } + de.updatePathMetricLocked(mono.Now()) } // noteConnectivityChange is called when connectivity changes enough @@ -1650,6 +1675,7 @@ func (de *endpoint) noteConnectivityChange() { for k := range de.endpointState { de.endpointState[k].clear() } + de.updatePathMetricLocked(mono.Now()) } // pingSizeToPktLen calculates the minimum path MTU that would permit @@ -1789,6 +1815,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src epAdd de.bestAddrAt = now de.trustBestAddrUntil = now.Add(trustUDPAddrDuration) } + de.updatePathMetricLocked(now) } return } @@ -2024,6 +2051,149 @@ func (de *endpoint) populatePeerStatus(ps *ipnstate.PeerStatus) { } } +// currentPathTypeLocked returns the current connection path type for this endpoint. +// Returns PathNone if the peer is inactive (no send/recv within sessionActiveTimeout) +// or no path is available. +// +// de.mu must be held. +func (de *endpoint) currentPathTypeLocked(now mono.Time) Path { + lastActivity := de.lastSendExt + if recv := de.lastRecvWG.LoadAtomic(); recv.After(lastActivity) { + lastActivity = recv + } + if lastActivity.IsZero() || now.Sub(lastActivity) > sessionActiveTimeout { + return PathNone + } + + if de.bestAddr.ap.IsValid() && !now.After(de.trustBestAddrUntil) { + if de.bestAddr.vni.IsSet() { + if de.bestAddr.ap.Addr().Is6() { + return PathPeerRelayIPv6 + } + return PathPeerRelayIPv4 + } + if de.bestAddr.ap.Addr().Is6() { + return PathDirectIPv6 + } + return PathDirectIPv4 + } + + if de.derpAddr.IsValid() { + return PathDERP + } + + return PathNone +} + +// updatePathMetricLocked updates the peersConnected gauge to reflect the current path. +// +// de.mu must be held. +func (de *endpoint) updatePathMetricLocked(now mono.Time) { + if de.c == nil || de.c.metrics == nil { + return + } + current := de.currentPathTypeLocked(now) + if current != de.lastReportedPath { + de.addPathMetricLocked(de.lastReportedPath, -1) + de.addPathMetricLocked(current, 1) + de.lastReportedPath = current + } + de.schedulePathMetricExpiryLocked(now) +} + +// addPathMetricLocked adjusts the peersConnected gauge for the provided path. +// +// de.mu must be held. +func (de *endpoint) addPathMetricLocked(path Path, delta int64) { + if path == PathNone || delta == 0 || de.c == nil || de.c.metrics == nil { + return + } + switch path { + case PathDirectIPv4: + de.c.metrics.peersConnectedDirectIPv4.Add(delta) + case PathDirectIPv6: + de.c.metrics.peersConnectedDirectIPv6.Add(delta) + case PathDERP: + de.c.metrics.peersConnectedDERP.Add(delta) + case PathPeerRelayIPv4: + de.c.metrics.peersConnectedPeerRelayIPv4.Add(delta) + case PathPeerRelayIPv6: + de.c.metrics.peersConnectedPeerRelayIPv6.Add(delta) + } +} + +// schedulePathMetricExpiryLocked schedules a timer to re-evaluate the path metric +// after inactivity. +// +// de.mu must be held. +func (de *endpoint) schedulePathMetricExpiryLocked(now mono.Time) { + if de.lastReportedPath == PathNone { + de.stopPathMetricExpiryTimerLocked() + return + } + lastActivity := de.lastSendExt + if recv := de.lastRecvWG.LoadAtomic(); recv.After(lastActivity) { + lastActivity = recv + } + if lastActivity.IsZero() { + de.stopPathMetricExpiryTimerLocked() + return + } + expiry := lastActivity.Add(sessionActiveTimeout) + delay := expiry.Sub(now) + if delay <= 0 { + de.stopPathMetricExpiryTimerLocked() + return + } + de.pathMetricExpiryAt = expiry + if de.pathMetricExpiryTimer != nil { + de.pathMetricExpiryTimer.Reset(delay) + return + } + de.pathMetricExpiryTimer = time.AfterFunc(delay, de.pathMetricExpiryTimerFired) +} + +// stopPathMetricExpiryTimerLocked stops and clears the path metric timer. +// +// de.mu must be held. +func (de *endpoint) stopPathMetricExpiryTimerLocked() { + if t := de.pathMetricExpiryTimer; t != nil { + t.Stop() + de.pathMetricExpiryTimer = nil + } + de.pathMetricExpiryAt = 0 +} + +func (de *endpoint) pathMetricExpiryTimerFired() { + de.mu.Lock() + defer de.mu.Unlock() + if de.pathMetricExpiryTimer == nil || de.pathMetricExpiryAt.IsZero() { + return + } + now := mono.Now() + if de.pathMetricExpiryAt.After(now) { + de.pathMetricExpiryTimer.Reset(de.pathMetricExpiryAt.Sub(now)) + return + } + de.pathMetricExpiryTimer = nil + de.pathMetricExpiryAt = 0 + de.updatePathMetricLocked(now) +} + +// clearPathMetricLocked clears this endpoint's path metric. Used during shutdown. +// +// de.mu must be held. +func (de *endpoint) clearPathMetricLocked() { + if de.c == nil || de.c.metrics == nil { + return + } + if de.lastReportedPath != PathNone { + de.addPathMetricLocked(de.lastReportedPath, -1) + de.lastReportedPath = PathNone + } + de.stopPathMetricExpiryTimerLocked() +} + // stopAndReset stops timers associated with de and resets its state back to zero. // It's called when a discovery endpoint is no longer present in the // NetworkMap, or when magicsock is transitioning from running to @@ -2033,6 +2203,9 @@ func (de *endpoint) stopAndReset() { de.mu.Lock() defer de.mu.Unlock() + // Decrement metric for this endpoint's current path before cleanup. + de.clearPathMetricLocked() + if closing := de.c.closing.Load(); !closing { if de.isWireguardOnly { de.c.logf("[v1] magicsock: doing cleanup for wireguard key %s", de.publicKey.ShortString()) @@ -2058,6 +2231,7 @@ func (de *endpoint) stopAndReset() { func (de *endpoint) resetLocked() { de.lastSendExt = 0 de.lastFullPing = 0 + de.lastRecvWG.StoreAtomic(0) de.clearBestAddrLocked() for _, es := range de.endpointState { es.lastPing = 0 @@ -2083,6 +2257,7 @@ func (de *endpoint) setDERPHome(regionID uint16) { de.mu.Lock() defer de.mu.Unlock() de.derpAddr = netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(regionID)) + de.updatePathMetricLocked(mono.Now()) if de.c.hasPeerRelayServers.Load() { de.c.relayManager.handleDERPHomeChange(de.publicKey, regionID) } diff --git a/wgengine/magicsock/endpoint_test.go b/wgengine/magicsock/endpoint_test.go index 43ff012c7..156e5d6e8 100644 --- a/wgengine/magicsock/endpoint_test.go +++ b/wgengine/magicsock/endpoint_test.go @@ -453,3 +453,119 @@ func Test_endpoint_udpRelayEndpointReady(t *testing.T) { }) } } + +func Test_endpoint_currentPathTypeLocked(t *testing.T) { + now := mono.Now() + active := now.Add(-time.Second) + inactive := now.Add(-sessionActiveTimeout - time.Second) + trusted := now.Add(time.Minute) + expired := now.Add(-time.Minute) + + relayVNI := packet.VirtualNetworkID{} + relayVNI.Set(7) + + tests := []struct { + name string + ep endpoint + at mono.Time + want Path + setRecv bool + }{ + { + name: "inactive no activity", + ep: endpoint{}, + at: now, + want: PathNone, + }, + { + name: "inactive stale activity", + ep: endpoint{ + lastSendExt: inactive, + bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("192.0.2.1:123")}}, + trustBestAddrUntil: trusted, + }, + at: now, + want: PathNone, + }, + { + name: "direct ipv4 by send", + ep: endpoint{ + lastSendExt: active, + bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("192.0.2.2:123")}}, + trustBestAddrUntil: trusted, + }, + at: now, + want: PathDirectIPv4, + }, + { + name: "direct ipv6 by recv", + ep: endpoint{ + bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("[2001:db8::1]:123")}}, + trustBestAddrUntil: trusted, + }, + at: now, + want: PathDirectIPv6, + setRecv: true, + }, + { + name: "peer relay ipv4", + ep: endpoint{ + lastSendExt: active, + bestAddr: addrQuality{epAddr: epAddr{ + ap: netip.MustParseAddrPort("192.0.2.9:123"), + vni: relayVNI, + }}, + trustBestAddrUntil: trusted, + }, + at: now, + want: PathPeerRelayIPv4, + }, + { + name: "peer relay ipv6", + ep: endpoint{ + lastSendExt: active, + bestAddr: addrQuality{epAddr: epAddr{ + ap: netip.MustParseAddrPort("[2001:db8::9]:123"), + vni: relayVNI, + }}, + trustBestAddrUntil: trusted, + }, + at: now, + want: PathPeerRelayIPv6, + }, + { + name: "derp fallback when best addr expired", + ep: endpoint{ + lastSendExt: active, + bestAddr: addrQuality{epAddr: epAddr{ap: netip.MustParseAddrPort("192.0.2.3:123")}}, + trustBestAddrUntil: expired, + derpAddr: netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, 1), + }, + at: now, + want: PathDERP, + }, + { + name: "active but no path", + ep: endpoint{ + lastSendExt: active, + }, + at: now, + want: PathNone, + }, + } + + for i := range tests { + tt := &tests[i] + t.Run(tt.name, func(t *testing.T) { + if tt.setRecv { + tt.ep.lastRecvWG.StoreAtomic(active) + } + tt.ep.mu.Lock() + got := tt.ep.currentPathTypeLocked(tt.at) + tt.ep.mu.Unlock() + if got != tt.want { + t.Fatalf("currentPathTypeLocked = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 7c5442d0b..21ec98a0c 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -91,6 +91,7 @@ type Path string const ( + PathNone Path = "" // no active connection path PathDirectIPv4 Path = "direct_ipv4" PathDirectIPv6 Path = "direct_ipv6" PathDERP Path = "derp" @@ -99,7 +100,7 @@ ) type pathLabel struct { - // Path indicates the path that the packet took: + // Path indicates the path classification used for packet/connection metrics: // - direct_ipv4 // - direct_ipv6 // - derp @@ -149,6 +150,14 @@ type metrics struct { // outboundPacketsDroppedErrors is the total number of outbound packets // dropped due to errors. outboundPacketsDroppedErrors expvar.Int + + // peersConnectedTotal is the number of active peer connections (recent + // send/recv within sessionActiveTimeout), labeled by current path. + peersConnectedDirectIPv4 expvar.Int + peersConnectedDirectIPv6 expvar.Int + peersConnectedDERP expvar.Int + peersConnectedPeerRelayIPv4 expvar.Int + peersConnectedPeerRelayIPv6 expvar.Int } // A Conn routes UDP packets and actively manages a list of its endpoints. @@ -804,6 +813,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics { "counter", "Counts the number of bytes sent to other peers", ) + peersConnectedTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + reg, + "tailscaled_peers_connected", + "gauge", + "Counts the number of active peers by connection path", + ) outboundPacketsDroppedErrors := reg.DroppedPacketsOutbound() m := new(metrics) @@ -861,6 +876,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics { outboundPacketsDroppedErrors.Set(usermetric.DropLabels{Reason: usermetric.ReasonError}, &m.outboundPacketsDroppedErrors) + peersConnectedTotal.Set(pathDirectV4, &m.peersConnectedDirectIPv4) + peersConnectedTotal.Set(pathDirectV6, &m.peersConnectedDirectIPv6) + peersConnectedTotal.Set(pathDERP, &m.peersConnectedDERP) + peersConnectedTotal.Set(pathPeerRelayV4, &m.peersConnectedPeerRelayIPv4) + peersConnectedTotal.Set(pathPeerRelayV6, &m.peersConnectedPeerRelayIPv6) + return m }