From 618b606b4632c433defa799afb53801b56f268cb Mon Sep 17 00:00:00 2001 From: Michael Ben-Ami Date: Fri, 22 May 2026 12:01:11 -0400 Subject: [PATCH] feature/conn25: expire idle flows from FlowTable Track lastSeen on each cached flow and add a sweeper goroutine that periodically removes flows idle past the idle timeout. Introduce tunables for idle timeout, maximum flows removed per sweep (to limit mutex hold time), and the sweeper interval. Also cap the previously-unlimited tables: 10k client flows, 100k connector flows. Updates tailscale/corp#38630 Signed-off-by: Michael Ben-Ami --- feature/conn25/conn25.go | 1 + feature/conn25/datapath.go | 23 +++-- feature/conn25/flowtable.go | 129 +++++++++++++++++++++++--- feature/conn25/flowtable_test.go | 150 +++++++++++++++++++++++++++++++ 4 files changed, 286 insertions(+), 17 deletions(-) diff --git a/feature/conn25/conn25.go b/feature/conn25/conn25.go index b30fbb8f5..784b8a0c5 100644 --- a/feature/conn25/conn25.go +++ b/feature/conn25/conn25.go @@ -134,6 +134,7 @@ func (e *extension) Init(host ipnext.Host) error { ctx, cancel := context.WithCancelCause(context.Background()) e.ctxCancel = cancel go e.sendLoop(ctx) + dph.StartFlowExpirySweepers(ctx) return nil } diff --git a/feature/conn25/datapath.go b/feature/conn25/datapath.go index ca78a4adc..43daa89db 100644 --- a/feature/conn25/datapath.go +++ b/feature/conn25/datapath.go @@ -4,6 +4,7 @@ package conn25 import ( + "context" "errors" "net/netip" @@ -83,19 +84,29 @@ type datapathHandler struct { debugLogging bool } +const ( + maxClientFlows = 10_000 + maxConnectorFlows = 100_000 +) + func newDatapathHandler(ipMapper IPMapper, logf logger.Logf) *datapathHandler { return &datapathHandler{ - ipMapper: ipMapper, - - // TODO(mzb): Figure out sensible default max size for flow tables. - // Don't do any LRU eviction until we figure out deletion and expiration. - clientFlowTable: NewFlowTable(0), - connectorFlowTable: NewFlowTable(0), + ipMapper: ipMapper, + clientFlowTable: NewFlowTable(maxClientFlows), + connectorFlowTable: NewFlowTable(maxConnectorFlows), logf: logf, debugLogging: envknob.Bool("TS_CONN25_DATAPATH_DEBUG"), } } +// StartFlowExpirySweepers starts the sweepers that remove expired flows +// for both the client and connector flow tables. Each sweeper runs in +// its own new goroutine. +func (dh *datapathHandler) StartFlowExpirySweepers(ctx context.Context) { + go dh.clientFlowTable.StartExpiredSweeper(ctx) + go dh.connectorFlowTable.StartExpiredSweeper(ctx) +} + // HandlePacketFromWireGuard inspects packets coming from WireGuard, and performs // appropriate DNAT or SNAT actions for Connectors 2025. Returning [filter.Accept] signals // that the packet should pass through subsequent stages of the datapath pipeline. diff --git a/feature/conn25/flowtable.go b/feature/conn25/flowtable.go index bf07064f2..22d5654bb 100644 --- a/feature/conn25/flowtable.go +++ b/feature/conn25/flowtable.go @@ -5,10 +5,13 @@ import ( "container/list" + "context" "sync" + "time" "tailscale.com/net/flowtrack" "tailscale.com/net/packet" + "tailscale.com/tstime/mono" ) // PacketAction may modify the packet. @@ -48,7 +51,7 @@ type FlowData struct { type cachedFlow struct { data FlowData // user-defined tuples and actions for both directions - // lastSeen time.Time // tracks when the flow was last hit for expiration management + lastSeen mono.Time // tracks when the flow was last hit for expiration management // onRemove func() // fires on removal/expiration (e.g. update watchers, send RST to client) } @@ -59,22 +62,81 @@ type cachedFlow struct { // of entries is specified in calls to [NewFlowTable]. FlowTable has // its own mutex and is safe for concurrent use. type FlowTable struct { + maxEntries int + idleTimeout time.Duration + sweepInterval time.Duration + maxRemovedPerSweep int + mu sync.Mutex fromTunCache map[flowtrack.Tuple]*list.Element fromWGCache map[flowtrack.Tuple]*list.Element lru *list.List - maxEntries int +} + +const ( + // DefaultFlowIdleTimeout is the default idle timeout for a flow. + // See also [WithFlowIdleTimeout]. + DefaultFlowIdleTimeout = 5 * time.Minute + // DefaultFlowSweepInterval is the default sweep interval for + // automatically removing expired flows. See also [WithFlowSweepInterval]. + DefaultFlowSweepInterval = 3 * time.Minute + // DefaultMaxRemovedFlowsPerSweep is the default maximum number of + // flows removed per sweep. It can be used to tune how long the table + // mutex is held during sweeps. See also [WithMaxRemovedFlowsPerSweep]. + DefaultMaxRemovedFlowsPerSweep = 1000 +) + +// FlowTableOption configures options for use with [NewFlowTable]. +type FlowTableOption func(ft *FlowTable) + +// WithFlowIdleTimeout sets the threshold duration for flow idle time +// before it is eligible for removal. A flow is considered idle for the +// time that elapses since its creation or last lookup. A duration of +// 0 means that expiration is disabled. If WithFlowIdleTimeout is not +// passed to [NewFlowTable], then [DefaultFlowIdleTimeout] is used. +func WithFlowIdleTimeout(timeout time.Duration) FlowTableOption { + return func(ft *FlowTable) { + ft.idleTimeout = timeout + } +} + +// WithFlowSweepInterval sets the interval to automatically +// remove idle flows that exceed the idle timeout. A value of 0 +// disables automatic sweeping. If WithFlowSweepInterval is not +// passed to [NewFlowTable], then [DefaultFlowSweepInterval] is used. +func WithFlowSweepInterval(ival time.Duration) FlowTableOption { + return func(ft *FlowTable) { + ft.sweepInterval = ival + } +} + +// WithMaxRemovedFlowsPerSweep sets maximum number of expired +// flows that can be removed per sweep. A value of 0 means no +// maximum. If WithMaxRemovedFlowsPerSweep is not passed to +// [NewFlowTable], then [DefaultMaxRemovedFlowsPerSweep] is used. +func WithMaxRemovedFlowsPerSweep(maxPer int) FlowTableOption { + return func(ft *FlowTable) { + ft.maxRemovedPerSweep = maxPer + } } // NewFlowTable returns a [FlowTable] with maxEntries maximum entries. // A maxEntries of 0 indicates no maximum. See also [FlowTable]. -func NewFlowTable(maxEntries int) *FlowTable { - return &FlowTable{ - fromTunCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), - fromWGCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), - lru: list.New(), - maxEntries: maxEntries, +func NewFlowTable(maxEntries int, opts ...FlowTableOption) *FlowTable { + ft := &FlowTable{ + maxEntries: maxEntries, + idleTimeout: DefaultFlowIdleTimeout, + sweepInterval: DefaultFlowSweepInterval, + maxRemovedPerSweep: DefaultMaxRemovedFlowsPerSweep, + fromTunCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), + fromWGCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), + lru: list.New(), } + + for _, o := range opts { + o(ft) + } + return ft } // LookupFromTunDevice looks up a [PacketAction] that is valid to run on packets @@ -123,7 +185,7 @@ func (t *FlowTable) lookup(k flowtrack.Tuple, dir Origin) (PacketAction, bool) { // Support LRU. t.lru.MoveToFront(ele) - // TODO(mzb): Update flow.lastSeen. + flow.lastSeen = mono.Now() return action, true } @@ -143,8 +205,8 @@ func (t *FlowTable) NewFlow(data FlowData) error { t.removeFlowLocked(t.fromWGCache[data.FromWG.Tuple]) flow := &cachedFlow{ - data: data, - // Populate lastSeen + data: data, + lastSeen: mono.Now(), // Populate onRemove() } @@ -159,6 +221,51 @@ func (t *FlowTable) NewFlow(data FlowData) error { return nil } +// StartExpiredSweeper starts a sweeper that removes idle flows that have +// not been created or looked up for a duration greater than the configured +// idle timeout. See [WithFlowIdleTimeout]. The sweep runs at the configured +// sweep interval. See [WithFlowSweepInterval]. The sweeper stops when ctx +// is canceled. +func (t *FlowTable) StartExpiredSweeper(ctx context.Context) { + if t.sweepInterval == 0 || t.idleTimeout == 0 { + return + } + + ticker := time.NewTicker(t.sweepInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.removeIdle(mono.Now()) + } + } +} + +func (t *FlowTable) removeIdle(now mono.Time) int { + if t.idleTimeout == 0 { + return 0 + } + + t.mu.Lock() + defer t.mu.Unlock() + + removed := 0 + for ele := t.lru.Back(); ele != nil; ele = t.lru.Back() { + if t.maxRemovedPerSweep > 0 && removed >= t.maxRemovedPerSweep { + break + } + flow := ele.Value.(*cachedFlow) + if now.Sub(flow.lastSeen) <= t.idleTimeout { + break + } + t.removeFlowLocked(ele) + removed++ + } + return removed +} + func (t *FlowTable) removeFlowLocked(ele *list.Element) { if ele == nil { return diff --git a/feature/conn25/flowtable_test.go b/feature/conn25/flowtable_test.go index ae3d27144..f8a8f0f0c 100644 --- a/feature/conn25/flowtable_test.go +++ b/feature/conn25/flowtable_test.go @@ -4,11 +4,15 @@ package conn25 import ( + "fmt" "net/netip" "testing" + "testing/synctest" + "time" "tailscale.com/net/flowtrack" "tailscale.com/net/packet" + "tailscale.com/tstime/mono" "tailscale.com/types/ipproto" ) @@ -44,6 +48,17 @@ func mkFlow(fromTun, fromWG flowtrack.Tuple) FlowData { } } +func mkFlows(n int) []FlowData { + flows := make([]FlowData, n) + for i := range n { + flows[i] = mkFlow( + mkTuple(fmt.Sprintf("1.0.%d.%d:1000", (i>>8)&0xff, i&0xff), "2.0.0.1:80"), + mkTuple(fmt.Sprintf("3.0.%d.%d:1000", (i>>8)&0xff, i&0xff), "4.0.0.1:80"), + ) + } + return flows +} + func mustInstallFlow(t *testing.T, ft *FlowTable, flow FlowData) { t.Helper() if err := ft.NewFlow(flow); err != nil { @@ -229,3 +244,138 @@ func TestFlowTable_Eviction(t *testing.T) { assertFlowHit(t, ft, FromTun, dTun) assertFlowHit(t, ft, FromWireGuard, dWG) } + +func syncSubtest(t *testing.T, name string, f func(*testing.T)) { + t.Helper() + t.Run(name, func(t *testing.T) { + synctest.Test(t, f) + }) +} + +func TestFlowTable_removeIdle(t *testing.T) { + type flowSpec struct { + installAt time.Duration // wall-clock time of install, from test start + wantRemoved bool // do we expect this flow to be removed by the sweep + } + + tests := []struct { + name string + idleTimeout time.Duration + maxRemovedPerSweep int + flowSpecs []flowSpec + removalAt time.Duration // wall-clock time of sweep, from test start + }{ + { + name: "one-expired-flow", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 120s + }, + removalAt: 2 * time.Minute, + }, + { + name: "one-not-expired-flow", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: false}, // age at sweep = 30s + }, + removalAt: 30 * time.Second, + }, + { + name: "two-flows-one-expired", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 75s + {installAt: 30 * time.Second, wantRemoved: false}, // age at sweep = 45s + }, + removalAt: 75 * time.Second, + }, + { + name: "two-flows-both-expired", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 120s + {installAt: 30 * time.Second, wantRemoved: true}, // age at sweep = 90s + }, + removalAt: 2 * time.Minute, + }, + { + // Both flows are time-expired, but maxRemovedPerSweep=1 caps removal at 1. + // Flow 0 is at the back of the LRU (installed first) and is removed; flow 1 stays. + name: "two-flows-both-expired-but-max-count-equal-one", + idleTimeout: 1 * time.Minute, + maxRemovedPerSweep: 1, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 120s, removed (under cap) + {installAt: 30 * time.Second, wantRemoved: false}, // age at sweep = 90s, kept (cap reached) + }, + removalAt: 2 * time.Minute, + }, + { + name: "zero-idle-timeout-means-no-expiration", + idleTimeout: 0, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: false}, + {installAt: 30 * time.Second, wantRemoved: false}, + }, + removalAt: 2 * time.Minute, + }, + } + + for _, tt := range tests { + syncSubtest(t, tt.name, func(t *testing.T) { + ft := NewFlowTable( + 0, // turn off LRU for these tests + WithFlowIdleTimeout(tt.idleTimeout), + WithMaxRemovedFlowsPerSweep(tt.maxRemovedPerSweep), + ) + + start := time.Now() + flows := mkFlows(len(tt.flowSpecs)) + for i, spec := range tt.flowSpecs { + time.Sleep(time.Until(start.Add(spec.installAt))) + mustInstallFlow(t, ft, flows[i]) + } + + var wantRemovedCount int + for _, spec := range tt.flowSpecs { + if spec.wantRemoved { + wantRemovedCount++ + } + } + + time.Sleep(time.Until(start.Add(tt.removalAt))) + + gotRemovedCount := ft.removeIdle(mono.Now()) + if wantRemovedCount != gotRemovedCount { + t.Errorf("unexpected remove idle count: want %d, got %d", wantRemovedCount, gotRemovedCount) + } + + for i, spec := range tt.flowSpecs { + if spec.wantRemoved { + assertFlowMiss(t, ft, FromTun, flows[i].FromTun.Tuple) + } else { + assertFlowHit(t, ft, FromTun, flows[i].FromTun.Tuple) + } + } + }) + } + + syncSubtest(t, "lookup-resets-lastseen", func(t *testing.T) { + ft := NewFlowTable(0, WithFlowIdleTimeout(time.Minute)) + flows := mkFlows(2) + + mustInstallFlow(t, ft, flows[0]) // t=0 (flow 0 install) + time.Sleep(30 * time.Second) // + mustInstallFlow(t, ft, flows[1]) // t=30s (flow 1 install) + time.Sleep(60 * time.Second) // + assertFlowHit(t, ft, FromTun, flows[0].FromTun.Tuple) // t=90s (flow 0 looked up, lastSeen bumped) + time.Sleep(15 * time.Second) // + + if got := ft.removeIdle(mono.Now()); got != 1 { + t.Errorf("removeIdle returned %d, want 1", got) + } + assertFlowHit(t, ft, FromTun, flows[0].FromTun.Tuple) + assertFlowMiss(t, ft, FromTun, flows[1].FromTun.Tuple) + }) +}