From 152707960eaeed3afcca1aacee458eb0f4d44d8a Mon Sep 17 00:00:00 2001 From: julianknodt Date: Tue, 8 Jun 2021 12:57:33 -0700 Subject: [PATCH] derp/: Switch approach for metrics Instead of explicitly encoding 5 and 10 seconds and checking for then, use a timer to keep track of how much total time has elapsed since a flow started, where a flow is defined for any period of time where every contiguous 3 minute window has at least 1 packet. Signed-off-by: julianknodt --- .gitignore | 1 + derp/derp_server.go | 87 +++++++++++++++++++++++++++++++++------------ derp/derp_test.go | 18 ++++++---- 3 files changed, 78 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 39a4e8702..2d067ea62 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ *.dll *.so *.dylib +*.swp cmd/tailscale/tailscale cmd/tailscaled/tailscaled diff --git a/derp/derp_server.go b/derp/derp_server.go index 64828d58f..5a80c3018 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -20,6 +20,7 @@ "io" "io/ioutil" "log" + "math" "math/big" "math/rand" "os" @@ -41,6 +42,9 @@ var debug, _ = strconv.ParseBool(os.Getenv("DERP_DEBUG_LOGS")) +// How long a flow is considered to be active for. +var DerpFlowLogTime = 3 * time.Minute + // verboseDropKeys is the set of destination public keys that should // verbosely log whenever DERP drops a packet. var verboseDropKeys = map[key.Public]bool{} @@ -120,8 +124,10 @@ type Server struct { multiForwarderCreated expvar.Int multiForwarderDeleted expvar.Int removePktForwardOther expvar.Int - clientsInUse5Sec expvar.Int // Number of clients using Derp after 5 seconds. - clientsInUse10Sec expvar.Int + + flow_mu sync.Mutex + activeFlows map[*sclient]flow + flowLogs metrics.LabelMap mu sync.Mutex closed bool @@ -142,6 +148,17 @@ type Server struct { sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum } +// Flow retains metrics about packets sent in serial. +// +type flow struct { + packetKinds struct { + disco expvar.Int + other expvar.Int + } + createdAt time.Time + timer *time.Timer +} + // PacketForwarder is something that can forward packets. // // It's mostly an inteface for circular dependency reasons; the @@ -184,6 +201,9 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { memSys0: ms.Sys, watchers: map[*sclient]bool{}, sentTo: map[key.Public]map[key.Public]int64{}, + + activeFlows: map[*sclient]flow{}, + flowLogs: metrics.LabelMap{Label: "minutes"}, } s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get("disco") @@ -459,7 +479,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN done: ctx.Done(), remoteAddr: remoteAddr, connectedAt: time.Now(), - lastPktAt: time.Now(), sendQueue: make(chan pkt, perClientSendQueueDepth), peerGone: make(chan key.Public), canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, @@ -639,6 +658,8 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { if err != nil { return fmt.Errorf("client %x: recvPacket: %v", c.key, err) } + /// Do not need to block on updating metrics + go s.updateFlow(c, disco.LooksLikeDiscoWrapper(contents), nil) var fwd PacketForwarder s.mu.Lock() @@ -666,7 +687,6 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } return nil } - c.markLastPktAt() p := pkt{ bs: contents, @@ -858,6 +878,46 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.Publi return dstKey, contents, nil } +// Updates an active flow for a given client given that we +// saw a packet from them +func (s *Server) updateFlow(c *sclient, isDiscoPacket bool, done chan<- bool) { + s.flow_mu.Lock() + defer s.flow_mu.Unlock() + + flow, exists := s.activeFlows[c] + if isDiscoPacket { + flow.packetKinds.disco.Add(1) + } else { + flow.packetKinds.other.Add(1) + } + + if !exists { + flow.createdAt = time.Now() + s.activeFlows[c] = flow + } + + if flow.timer == nil { + flow.timer = time.AfterFunc(DerpFlowLogTime, func() { + s.flow_mu.Lock() + defer s.flow_mu.Unlock() + running_time := time.Since(flow.createdAt) + // report how many flows were alive for how many minutes + s.flowLogs.Get(fmt.Sprint(math.Ceil(running_time.Minutes()))).Add(1) + delete(s.activeFlows, c) + if done != nil { + done <- true + } + }) + } else { + if !flow.timer.Reset(DerpFlowLogTime) { + // If the previous timer already ran, just stop timer and exit since it either removed it + // from the map or is waiting on the lock. + flow.timer.Stop() + } + } + +} + // zpub is the key.Public zero value. var zpub key.Public @@ -905,7 +965,6 @@ type sclient struct { // Owned by run, not thread-safe. br *bufio.Reader connectedAt time.Time - lastPktAt time.Time preferred bool // Owned by sender, not thread-safe. @@ -1067,21 +1126,6 @@ func (c *sclient) sendPeerPresent(peer key.Public) error { return err } -func (c *sclient) markLastPktAt() { - old := c.lastPktAt - curr := time.Now() - c.lastPktAt = curr - // If we've been connected for over 5 seconds and haven't previously - since_old := old.Sub(c.connectedAt) - since_now := curr.Sub(c.connectedAt) - if since_old <= 5*time.Second && since_now > 5*time.Second { - c.s.clientsInUse5Sec.Add(1) - } - if since_old <= 10*time.Second && since_now > 10*time.Second { - c.s.clientsInUse10Sec.Add(1) - } -} - // sendMeshUpdates drains as many mesh peerStateChange entries as // possible into the write buffer WITHOUT flushing or otherwise // blocking (as it holds c.s.mu while working). If it can't drain them @@ -1310,8 +1354,7 @@ func (s *Server) ExpVar() expvar.Var { m.Set("multiforwarder_created", &s.multiForwarderCreated) m.Set("multiforwarder_deleted", &s.multiForwarderDeleted) m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther) - m.Set("clients_inuse_after_5_sec", &s.clientsInUse5Sec) - m.Set("clients_inuse_after_10_sec", &s.clientsInUse10Sec) + m.Set("live_flow_durations", &s.flowLogs) var expvarVersion expvar.String expvarVersion.Set(version.Long) m.Set("version", &expvarVersion) diff --git a/derp/derp_test.go b/derp/derp_test.go index 1c4d186ff..a429186d3 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -28,6 +28,10 @@ "tailscale.com/types/logger" ) +func init() { + DerpFlowLogTime = time.Nanosecond +} + func newPrivateKey(tb testing.TB) (k key.Private) { tb.Helper() if _, err := crand.Read(k[:]); err != nil { @@ -774,7 +778,7 @@ func TestForwarderRegistration(t *testing.T) { }) } -func TestLastAliveCounter(t *testing.T) { +func TestDerpFlowLogging(t *testing.T) { ts := newTestServer(t) defer ts.close(t) wantCounter := func(c *expvar.Int, want int) { @@ -783,14 +787,16 @@ func TestLastAliveCounter(t *testing.T) { t.Errorf("counter = %v; want %v", got, want) } } - wantCounter(&ts.s.clientsInUse5Sec, 0) + wantCounter(ts.s.flowLogs.Get("1"), 0) tc0 := newRegularClient(t, ts, "c0") - time.Sleep(6 * time.Second) + defer tc0.close(t) + time.Sleep(10 * time.Microsecond) for _, sc := range ts.s.clients { - sc.markLastPktAt() + done := make(chan bool, 1) + ts.s.updateFlow(sc, false, done) + <-done } - wantCounter(&ts.s.clientsInUse5Sec, 1) - tc0.close(t) + wantCounter(ts.s.flowLogs.Get("1"), 1) } func TestMetaCert(t *testing.T) {