diff --git a/derp/derp_server.go b/derp/derp_server.go index 32b09fb7b..75a10ab1c 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -490,6 +490,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN discoSendQueue: make(chan pkt, perClientSendQueueDepth), peerGone: make(chan key.Public), canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, + flows: make(map[key.Public]*flow), } if c.canMesh { c.meshUpdate = make(chan struct{}) @@ -536,6 +537,8 @@ func (c *sclient) run(ctx context.Context) error { } return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err) } + // read c.crapIsDirty atomic bool, do some flow cleanup + switch ft { case frameNotePreferred: err = c.handleFrameNotePreferred(ft, fl) @@ -665,14 +668,41 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } var fwd PacketForwarder - s.mu.Lock() - dst := s.clients[dstKey] - if dst == nil { - fwd = s.clientsMesh[dstKey] - } else { - s.notePeerSendLocked(c.key, dst) + var dst *sclient + + var f *flow + var ok bool + if f, ok = c.flows[dstKey]; ok { + if f.dst != nil { + select { + case <-f.dst.done: + flow.close() + default: + dst = f.dst + } + } } - s.mu.Unlock() + if dst == nil { + s.mu.Lock() + dst = s.clients[dstKey] + if dst == nil { + fwd = s.clientsMesh[dstKey] + } else { + s.notePeerSendLocked(c.key, dst) + } + s.mu.Unlock() + } + if f == nil { + f = &flow{ + c: c, + dst: dst, + } + f.idleTimer = time.AfterFunc(f.onIdle, time.Minute) + c.flows[dstKey] = f + } + f.pkts++ + f.bytes += int64(len(contents)) + f.markActive() if dst == nil { if fwd != nil { @@ -932,6 +962,26 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d return srcKey, dstKey, contents, nil } +type flow struct { + c *sclient // source of flow + dst *sclient // non-nil for local client; nil means use forwarder + + // Various stats: + pkts int64 + bytes int64 + + lastLog time.Time // etc + idleTimer *time.Timer +} + +func (f *flow) markActive() { + f.idleTimer.Reset(time.Minute) +} + +func (f *flow) onIdle() { + panic("TODOXXX") +} + // sclient is a client connection to the server. // // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) @@ -956,6 +1006,7 @@ type sclient struct { br *bufio.Reader connectedAt time.Time preferred bool + flows map[key.Public]*flow // Owned by sender, not thread-safe. bw *bufio.Writer