From c40f3521032c7687e8b9c85020f7857e211ab693 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Thu, 18 Dec 2025 16:12:50 -0800 Subject: [PATCH] net/udprelay: expose peer relay metrics (#18218) Adding both user and client metrics for peer relay forwarded bytes and packets, and the total endpoints gauge. User metrics: tailscaled_peer_relay_forwarded_packets_total{transport_in, transport_out} tailscaled_peer_relay_forwarded_bytes_total{transport_in, transport_out} tailscaled_peer_relay_endpoints_total{} Where the transport labels can be of "udp4" or "udp6". Client metrics: udprelay_forwarded_(packets|bytes)_udp(4|6)_udp(4|6) udprelay_endpoints RELNOTE: Expose tailscaled metrics for peer relay. Updates tailscale/corp#30820 Change-Id: I1a905d15bdc5ee84e28017e0b93210e2d9660259 Signed-off-by: Alex Valiushko --- feature/relayserver/relayserver.go | 2 +- net/udprelay/metrics.go | 153 +++++++++++++++++++++++++++++ net/udprelay/metrics_test.go | 63 ++++++++++++ net/udprelay/server.go | 58 +++++++++-- net/udprelay/server_test.go | 5 +- 5 files changed, 269 insertions(+), 12 deletions(-) create mode 100644 net/udprelay/metrics.go create mode 100644 net/udprelay/metrics_test.go diff --git a/feature/relayserver/relayserver.go b/feature/relayserver/relayserver.go index 4f23ae18e..b29a6abed 100644 --- a/feature/relayserver/relayserver.go +++ b/feature/relayserver/relayserver.go @@ -70,7 +70,7 @@ func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r * func newExtension(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, error) { e := &extension{ newServerFn: func(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (relayServer, error) { - return udprelay.NewServer(logf, port, onlyStaticAddrPorts) + return udprelay.NewServer(logf, port, onlyStaticAddrPorts, sb.Sys().UserMetricsRegistry()) }, logf: logger.WithPrefix(logf, featureName+": "), } diff --git a/net/udprelay/metrics.go b/net/udprelay/metrics.go new file mode 100644 index 000000000..45d3c9f34 --- /dev/null +++ b/net/udprelay/metrics.go @@ -0,0 +1,153 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package udprelay + +import ( + "expvar" + + "tailscale.com/util/clientmetric" + "tailscale.com/util/usermetric" +) + +var ( + // Although we only need one, [clientmetric.AggregateCounter] is the only + // method to embed [expvar.Int] into client metrics. + cMetricForwarded44Packets = clientmetric.NewAggregateCounter("udprelay_forwarded_packets_udp4_udp4") + cMetricForwarded46Packets = clientmetric.NewAggregateCounter("udprelay_forwarded_packets_udp4_udp6") + cMetricForwarded64Packets = clientmetric.NewAggregateCounter("udprelay_forwarded_packets_udp6_udp4") + cMetricForwarded66Packets = clientmetric.NewAggregateCounter("udprelay_forwarded_packets_udp6_udp6") + + cMetricForwarded44Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp4_udp4") + cMetricForwarded46Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp4_udp6") + cMetricForwarded64Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp4") + cMetricForwarded66Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp6") + + // [clientmetric.Gauge] does not let us embed existing counters, + // [metrics.addEndpoints] records data into client and user gauges independently. + cMetricEndpoints = clientmetric.NewGauge("udprelay_endpoints") +) + +type transport string + +const ( + transportUDP4 transport = "udp4" + transportUDP6 transport = "udp6" +) + +type forwardedLabel struct { + transportIn transport `prom:"transport_in"` + transportOut transport `prom:"transport_out"` +} + +type endpointLabel struct { +} + +type metrics struct { + forwarded44Packets expvar.Int + forwarded46Packets expvar.Int + forwarded64Packets expvar.Int + forwarded66Packets expvar.Int + + forwarded44Bytes expvar.Int + forwarded46Bytes expvar.Int + forwarded64Bytes expvar.Int + forwarded66Bytes expvar.Int + + endpoints expvar.Int +} + +// registerMetrics publishes user and client metric counters for peer relay server. +// +// It will panic if called twice with the same registry. +func registerMetrics(reg *usermetric.Registry) *metrics { + var ( + uMetricForwardedPackets = usermetric.NewMultiLabelMapWithRegistry[forwardedLabel]( + reg, + "tailscaled_peer_relay_forwarded_packets_total", + "counter", + "Number of packets forwarded via Peer Relay", + ) + uMetricForwardedBytes = usermetric.NewMultiLabelMapWithRegistry[forwardedLabel]( + reg, + "tailscaled_peer_relay_forwarded_bytes_total", + "counter", + "Number of bytes forwarded via Peer Relay", + ) + uMetricEndpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel]( + reg, + "tailscaled_peer_relay_endpoints_total", + "gauge", + "Number of allocated Peer Relay endpoints", + ) + forwarded44 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP4} + forwarded46 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP6} + forwarded64 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP4} + forwarded66 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP6} + m = new(metrics) + ) + + // Publish user metrics. + uMetricForwardedPackets.Set(forwarded44, &m.forwarded44Packets) + uMetricForwardedPackets.Set(forwarded46, &m.forwarded46Packets) + uMetricForwardedPackets.Set(forwarded64, &m.forwarded64Packets) + uMetricForwardedPackets.Set(forwarded66, &m.forwarded66Packets) + + uMetricForwardedBytes.Set(forwarded44, &m.forwarded44Bytes) + uMetricForwardedBytes.Set(forwarded46, &m.forwarded46Bytes) + uMetricForwardedBytes.Set(forwarded64, &m.forwarded64Bytes) + uMetricForwardedBytes.Set(forwarded66, &m.forwarded66Bytes) + + uMetricEndpoints.Set(endpointLabel{}, &m.endpoints) + + // Publish client metrics. + cMetricForwarded44Packets.Register(&m.forwarded44Packets) + cMetricForwarded46Packets.Register(&m.forwarded46Packets) + cMetricForwarded64Packets.Register(&m.forwarded64Packets) + cMetricForwarded66Packets.Register(&m.forwarded66Packets) + cMetricForwarded44Bytes.Register(&m.forwarded44Bytes) + cMetricForwarded46Bytes.Register(&m.forwarded46Bytes) + cMetricForwarded64Bytes.Register(&m.forwarded64Bytes) + cMetricForwarded66Bytes.Register(&m.forwarded66Bytes) + + return m +} + +// addEndpoints updates the total endpoints gauge. Value can be negative. +// It records two gauges independently, see [cMetricEndpoints] doc. +func (m *metrics) addEndpoints(value int64) { + m.endpoints.Add(value) + cMetricEndpoints.Add(value) +} + +// countForwarded records user and client metrics according to the +// inbound and outbound address families. +func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) { + if in4 && out4 { + m.forwarded44Packets.Add(packets) + m.forwarded44Bytes.Add(bytes) + } else if in4 && !out4 { + m.forwarded46Packets.Add(packets) + m.forwarded46Bytes.Add(bytes) + } else if !in4 && out4 { + m.forwarded64Packets.Add(packets) + m.forwarded64Bytes.Add(bytes) + } else { + m.forwarded66Packets.Add(packets) + m.forwarded66Bytes.Add(bytes) + } +} + +// deregisterMetrics unregisters the underlying expvar counters +// from clientmetrics. +func deregisterMetrics() { + cMetricForwarded44Packets.UnregisterAll() + cMetricForwarded46Packets.UnregisterAll() + cMetricForwarded64Packets.UnregisterAll() + cMetricForwarded66Packets.UnregisterAll() + cMetricForwarded44Bytes.UnregisterAll() + cMetricForwarded46Bytes.UnregisterAll() + cMetricForwarded64Bytes.UnregisterAll() + cMetricForwarded66Bytes.UnregisterAll() + cMetricEndpoints.Set(0) +} diff --git a/net/udprelay/metrics_test.go b/net/udprelay/metrics_test.go new file mode 100644 index 000000000..25345dc6b --- /dev/null +++ b/net/udprelay/metrics_test.go @@ -0,0 +1,63 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package udprelay + +import ( + "slices" + "testing" + + qt "github.com/frankban/quicktest" + "tailscale.com/util/usermetric" +) + +func TestMetrics(t *testing.T) { + c := qt.New(t) + deregisterMetrics() + r := &usermetric.Registry{} + m := registerMetrics(r) + + // Expect certain prom names registered. + have := r.MetricNames() + want := []string{ + "tailscaled_peer_relay_forwarded_packets_total", + "tailscaled_peer_relay_forwarded_bytes_total", + "tailscaled_peer_relay_endpoints_total", + } + slices.Sort(have) + slices.Sort(want) + c.Assert(have, qt.CmpEquals(), want) + + // Validate addEndpoints. + m.addEndpoints(1) + c.Assert(m.endpoints.Value(), qt.Equals, int64(1)) + c.Assert(cMetricEndpoints.Value(), qt.Equals, int64(1)) + m.addEndpoints(-1) + c.Assert(m.endpoints.Value(), qt.Equals, int64(0)) + c.Assert(cMetricEndpoints.Value(), qt.Equals, int64(0)) + + // Validate countForwarded. + m.countForwarded(true, true, 1, 1) + c.Assert(m.forwarded44Bytes.Value(), qt.Equals, int64(1)) + c.Assert(m.forwarded44Packets.Value(), qt.Equals, int64(1)) + c.Assert(cMetricForwarded44Bytes.Value(), qt.Equals, int64(1)) + c.Assert(cMetricForwarded44Packets.Value(), qt.Equals, int64(1)) + + m.countForwarded(true, false, 2, 2) + c.Assert(m.forwarded46Bytes.Value(), qt.Equals, int64(2)) + c.Assert(m.forwarded46Packets.Value(), qt.Equals, int64(2)) + c.Assert(cMetricForwarded46Bytes.Value(), qt.Equals, int64(2)) + c.Assert(cMetricForwarded46Packets.Value(), qt.Equals, int64(2)) + + m.countForwarded(false, true, 3, 3) + c.Assert(m.forwarded64Bytes.Value(), qt.Equals, int64(3)) + c.Assert(m.forwarded64Packets.Value(), qt.Equals, int64(3)) + c.Assert(cMetricForwarded64Bytes.Value(), qt.Equals, int64(3)) + c.Assert(cMetricForwarded64Packets.Value(), qt.Equals, int64(3)) + + m.countForwarded(false, false, 4, 4) + c.Assert(m.forwarded66Bytes.Value(), qt.Equals, int64(4)) + c.Assert(m.forwarded66Packets.Value(), qt.Equals, int64(4)) + c.Assert(cMetricForwarded66Bytes.Value(), qt.Equals, int64(4)) + c.Assert(cMetricForwarded66Packets.Value(), qt.Equals, int64(4)) +} diff --git a/net/udprelay/server.go b/net/udprelay/server.go index 45127dfae..e98fdf7bb 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -43,6 +43,7 @@ "tailscale.com/types/views" "tailscale.com/util/eventbus" "tailscale.com/util/set" + "tailscale.com/util/usermetric" ) const ( @@ -76,6 +77,7 @@ type Server struct { wg sync.WaitGroup closeCh chan struct{} netChecker *netcheck.Client + metrics *metrics mu sync.Mutex // guards the following fields macSecrets views.Slice[[blake2s.Size]byte] // [0] is most recent, max 2 elements @@ -320,8 +322,8 @@ func (e *serverEndpoint) isBoundLocked() bool { // port selection is left up to the host networking stack. If // onlyStaticAddrPorts is true, then dynamic addr:port discovery will be // disabled, and only addr:port's set via [Server.SetStaticAddrPorts] will be -// used. -func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Server, err error) { +// used. Metrics must be non-nil. +func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool, metrics *usermetric.Registry) (s *Server, err error) { s = &Server{ logf: logf, disco: key.NewDisco(), @@ -333,6 +335,7 @@ func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Serv nextVNI: minVNI, } s.discoPublic = s.disco.Public() + s.metrics = registerMetrics(metrics) // TODO(creachadair): Find a way to plumb this in during initialization. // As-written, messages published here will not be seen by other components @@ -670,6 +673,7 @@ func (s *Server) endpointGCLoop() { defer s.mu.Unlock() for k, v := range s.serverEndpointByDisco { if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) { + s.metrics.addEndpoints(-1) delete(s.serverEndpointByDisco, k) s.serverEndpointByVNI.Delete(v.vni) } @@ -686,36 +690,50 @@ func (s *Server) endpointGCLoop() { } } -func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort) { +// handlePacket unwraps headers and dispatches packet handling according to its +// type and destination. If the returned address is valid, write will contain data +// to transmit, and isDataPacket signals whether input was a data packet or OOB +// signaling. +// +// write, to, isDataPacket := s.handlePacket(from, buf) +// if to.IsValid() && isDataPacket { +// // ..handle data transmission +// } + +func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort, isDataPacket bool) { if stun.Is(b) && b[1] == 0x01 { // A b[1] value of 0x01 (STUN method binding) is sufficiently // non-overlapping with the Geneve header where the LSB is always 0 // (part of 6 "reserved" bits). s.netChecker.ReceiveSTUNPacket(b, from) - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } gh := packet.GeneveHeader{} err := gh.Decode(b) if err != nil { - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } e, ok := s.serverEndpointByVNI.Load(gh.VNI.Get()) if !ok { // unknown VNI - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } now := mono.Now() if gh.Control { if gh.Protocol != packet.GeneveProtocolDisco { // control packet, but not Disco - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } msg := b[packet.GeneveFixedHeaderLength:] secrets := s.getMACSecrets(now) - return e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now) + write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now) + isDataPacket = false + return } - return e.(*serverEndpoint).handleDataPacket(from, b, now) + write, to = e.(*serverEndpoint).handleDataPacket(from, b, now) + isDataPacket = true + return } func (s *Server) getMACSecrets(now mono.Time) views.Slice[[blake2s.Size]byte] { @@ -783,16 +801,32 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF return } + // Aggregate counts for the packet batch before writing metrics. + forwardedByOutAF := struct { + bytes4 int64 + packets4 int64 + bytes6 int64 + packets6 int64 + }{} for _, msg := range msgs[:n] { if msg.N == 0 { continue } buf := msg.Buffers[0][:msg.N] from := msg.Addr.(*net.UDPAddr).AddrPort() - write, to := s.handlePacket(from, buf) + write, to, isDataPacket := s.handlePacket(from, buf) if !to.IsValid() { continue } + if isDataPacket { + if to.Addr().Is4() { + forwardedByOutAF.bytes4 += int64(len(write)) + forwardedByOutAF.packets4++ + } else { + forwardedByOutAF.bytes6 += int64(len(write)) + forwardedByOutAF.packets6++ + } + } if from.Addr().Is4() == to.Addr().Is4() || otherSocket != nil { buffs, ok := writeBuffsByDest[to] if !ok { @@ -823,6 +857,9 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF } delete(writeBuffsByDest, dest) } + + s.metrics.countForwarded(readFromSocketIsIPv4, true, forwardedByOutAF.bytes4, forwardedByOutAF.packets4) + s.metrics.countForwarded(readFromSocketIsIPv4, false, forwardedByOutAF.bytes6, forwardedByOutAF.packets6) } } @@ -932,6 +969,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv s.serverEndpointByVNI.Store(e.vni, e) s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString()) + s.metrics.addEndpoints(1) return endpoint.ServerEndpoint{ ServerDisco: s.discoPublic, ClientDisco: pair.Get(), diff --git a/net/udprelay/server_test.go b/net/udprelay/server_test.go index c4b365641..59917e1c6 100644 --- a/net/udprelay/server_test.go +++ b/net/udprelay/server_test.go @@ -21,6 +21,7 @@ "tailscale.com/tstime/mono" "tailscale.com/types/key" "tailscale.com/types/views" + "tailscale.com/util/usermetric" ) type testClient struct { @@ -209,7 +210,9 @@ func TestServer(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - server, err := NewServer(t.Logf, 0, true) + reg := new(usermetric.Registry) + deregisterMetrics() + server, err := NewServer(t.Logf, 0, true, reg) if err != nil { t.Fatal(err) }