mirror of
https://github.com/tailscale/tailscale.git
synced 2026-04-03 22:25:27 -04:00
derp: WIP notes on adding a flow type
very rough, uncompiled.
This commit is contained in:
@@ -490,6 +490,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
|
||||
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
|
||||
peerGone: make(chan key.Public),
|
||||
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
|
||||
flows: make(map[key.Public]*flow),
|
||||
}
|
||||
if c.canMesh {
|
||||
c.meshUpdate = make(chan struct{})
|
||||
@@ -536,6 +537,8 @@ func (c *sclient) run(ctx context.Context) error {
|
||||
}
|
||||
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
|
||||
}
|
||||
// read c.crapIsDirty atomic bool, do some flow cleanup
|
||||
|
||||
switch ft {
|
||||
case frameNotePreferred:
|
||||
err = c.handleFrameNotePreferred(ft, fl)
|
||||
@@ -665,14 +668,41 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
}
|
||||
|
||||
var fwd PacketForwarder
|
||||
s.mu.Lock()
|
||||
dst := s.clients[dstKey]
|
||||
if dst == nil {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
} else {
|
||||
s.notePeerSendLocked(c.key, dst)
|
||||
var dst *sclient
|
||||
|
||||
var f *flow
|
||||
var ok bool
|
||||
if f, ok = c.flows[dstKey]; ok {
|
||||
if f.dst != nil {
|
||||
select {
|
||||
case <-f.dst.done:
|
||||
flow.close()
|
||||
default:
|
||||
dst = f.dst
|
||||
}
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if dst == nil {
|
||||
s.mu.Lock()
|
||||
dst = s.clients[dstKey]
|
||||
if dst == nil {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
} else {
|
||||
s.notePeerSendLocked(c.key, dst)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
if f == nil {
|
||||
f = &flow{
|
||||
c: c,
|
||||
dst: dst,
|
||||
}
|
||||
f.idleTimer = time.AfterFunc(f.onIdle, time.Minute)
|
||||
c.flows[dstKey] = f
|
||||
}
|
||||
f.pkts++
|
||||
f.bytes += int64(len(contents))
|
||||
f.markActive()
|
||||
|
||||
if dst == nil {
|
||||
if fwd != nil {
|
||||
@@ -932,6 +962,26 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
|
||||
return srcKey, dstKey, contents, nil
|
||||
}
|
||||
|
||||
type flow struct {
|
||||
c *sclient // source of flow
|
||||
dst *sclient // non-nil for local client; nil means use forwarder
|
||||
|
||||
// Various stats:
|
||||
pkts int64
|
||||
bytes int64
|
||||
|
||||
lastLog time.Time // etc
|
||||
idleTimer *time.Timer
|
||||
}
|
||||
|
||||
func (f *flow) markActive() {
|
||||
f.idleTimer.Reset(time.Minute)
|
||||
}
|
||||
|
||||
func (f *flow) onIdle() {
|
||||
panic("TODOXXX")
|
||||
}
|
||||
|
||||
// sclient is a client connection to the server.
|
||||
//
|
||||
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
|
||||
@@ -956,6 +1006,7 @@ type sclient struct {
|
||||
br *bufio.Reader
|
||||
connectedAt time.Time
|
||||
preferred bool
|
||||
flows map[key.Public]*flow
|
||||
|
||||
// Owned by sender, not thread-safe.
|
||||
bw *bufio.Writer
|
||||
|
||||
Reference in New Issue
Block a user