diff --git a/feature/conn25/conn25.go b/feature/conn25/conn25.go index b30fbb8f5..784b8a0c5 100644 --- a/feature/conn25/conn25.go +++ b/feature/conn25/conn25.go @@ -134,6 +134,7 @@ func (e *extension) Init(host ipnext.Host) error { ctx, cancel := context.WithCancelCause(context.Background()) e.ctxCancel = cancel go e.sendLoop(ctx) + dph.StartFlowExpirySweepers(ctx) return nil } diff --git a/feature/conn25/datapath.go b/feature/conn25/datapath.go index ca78a4adc..43daa89db 100644 --- a/feature/conn25/datapath.go +++ b/feature/conn25/datapath.go @@ -4,6 +4,7 @@ package conn25 import ( + "context" "errors" "net/netip" @@ -83,19 +84,29 @@ type datapathHandler struct { debugLogging bool } +const ( + maxClientFlows = 10_000 + maxConnectorFlows = 100_000 +) + func newDatapathHandler(ipMapper IPMapper, logf logger.Logf) *datapathHandler { return &datapathHandler{ - ipMapper: ipMapper, - - // TODO(mzb): Figure out sensible default max size for flow tables. - // Don't do any LRU eviction until we figure out deletion and expiration. - clientFlowTable: NewFlowTable(0), - connectorFlowTable: NewFlowTable(0), + ipMapper: ipMapper, + clientFlowTable: NewFlowTable(maxClientFlows), + connectorFlowTable: NewFlowTable(maxConnectorFlows), logf: logf, debugLogging: envknob.Bool("TS_CONN25_DATAPATH_DEBUG"), } } +// StartFlowExpirySweepers starts the sweepers that remove expired flows +// for both the client and connector flow tables. Each sweeper runs in +// its own new goroutine. +func (dh *datapathHandler) StartFlowExpirySweepers(ctx context.Context) { + go dh.clientFlowTable.StartExpiredSweeper(ctx) + go dh.connectorFlowTable.StartExpiredSweeper(ctx) +} + // HandlePacketFromWireGuard inspects packets coming from WireGuard, and performs // appropriate DNAT or SNAT actions for Connectors 2025. Returning [filter.Accept] signals // that the packet should pass through subsequent stages of the datapath pipeline. diff --git a/feature/conn25/flowtable.go b/feature/conn25/flowtable.go index bf07064f2..22d5654bb 100644 --- a/feature/conn25/flowtable.go +++ b/feature/conn25/flowtable.go @@ -5,10 +5,13 @@ import ( "container/list" + "context" "sync" + "time" "tailscale.com/net/flowtrack" "tailscale.com/net/packet" + "tailscale.com/tstime/mono" ) // PacketAction may modify the packet. @@ -48,7 +51,7 @@ type FlowData struct { type cachedFlow struct { data FlowData // user-defined tuples and actions for both directions - // lastSeen time.Time // tracks when the flow was last hit for expiration management + lastSeen mono.Time // tracks when the flow was last hit for expiration management // onRemove func() // fires on removal/expiration (e.g. update watchers, send RST to client) } @@ -59,22 +62,81 @@ type cachedFlow struct { // of entries is specified in calls to [NewFlowTable]. FlowTable has // its own mutex and is safe for concurrent use. type FlowTable struct { + maxEntries int + idleTimeout time.Duration + sweepInterval time.Duration + maxRemovedPerSweep int + mu sync.Mutex fromTunCache map[flowtrack.Tuple]*list.Element fromWGCache map[flowtrack.Tuple]*list.Element lru *list.List - maxEntries int +} + +const ( + // DefaultFlowIdleTimeout is the default idle timeout for a flow. + // See also [WithFlowIdleTimeout]. + DefaultFlowIdleTimeout = 5 * time.Minute + // DefaultFlowSweepInterval is the default sweep interval for + // automatically removing expired flows. See also [WithFlowSweepInterval]. + DefaultFlowSweepInterval = 3 * time.Minute + // DefaultMaxRemovedFlowsPerSweep is the default maximum number of + // flows removed per sweep. It can be used to tune how long the table + // mutex is held during sweeps. See also [WithMaxRemovedFlowsPerSweep]. + DefaultMaxRemovedFlowsPerSweep = 1000 +) + +// FlowTableOption configures options for use with [NewFlowTable]. +type FlowTableOption func(ft *FlowTable) + +// WithFlowIdleTimeout sets the threshold duration for flow idle time +// before it is eligible for removal. A flow is considered idle for the +// time that elapses since its creation or last lookup. A duration of +// 0 means that expiration is disabled. If WithFlowIdleTimeout is not +// passed to [NewFlowTable], then [DefaultFlowIdleTimeout] is used. +func WithFlowIdleTimeout(timeout time.Duration) FlowTableOption { + return func(ft *FlowTable) { + ft.idleTimeout = timeout + } +} + +// WithFlowSweepInterval sets the interval to automatically +// remove idle flows that exceed the idle timeout. A value of 0 +// disables automatic sweeping. If WithFlowSweepInterval is not +// passed to [NewFlowTable], then [DefaultFlowSweepInterval] is used. +func WithFlowSweepInterval(ival time.Duration) FlowTableOption { + return func(ft *FlowTable) { + ft.sweepInterval = ival + } +} + +// WithMaxRemovedFlowsPerSweep sets maximum number of expired +// flows that can be removed per sweep. A value of 0 means no +// maximum. If WithMaxRemovedFlowsPerSweep is not passed to +// [NewFlowTable], then [DefaultMaxRemovedFlowsPerSweep] is used. +func WithMaxRemovedFlowsPerSweep(maxPer int) FlowTableOption { + return func(ft *FlowTable) { + ft.maxRemovedPerSweep = maxPer + } } // NewFlowTable returns a [FlowTable] with maxEntries maximum entries. // A maxEntries of 0 indicates no maximum. See also [FlowTable]. -func NewFlowTable(maxEntries int) *FlowTable { - return &FlowTable{ - fromTunCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), - fromWGCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), - lru: list.New(), - maxEntries: maxEntries, +func NewFlowTable(maxEntries int, opts ...FlowTableOption) *FlowTable { + ft := &FlowTable{ + maxEntries: maxEntries, + idleTimeout: DefaultFlowIdleTimeout, + sweepInterval: DefaultFlowSweepInterval, + maxRemovedPerSweep: DefaultMaxRemovedFlowsPerSweep, + fromTunCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), + fromWGCache: make(map[flowtrack.Tuple]*list.Element, maxEntries), + lru: list.New(), } + + for _, o := range opts { + o(ft) + } + return ft } // LookupFromTunDevice looks up a [PacketAction] that is valid to run on packets @@ -123,7 +185,7 @@ func (t *FlowTable) lookup(k flowtrack.Tuple, dir Origin) (PacketAction, bool) { // Support LRU. t.lru.MoveToFront(ele) - // TODO(mzb): Update flow.lastSeen. + flow.lastSeen = mono.Now() return action, true } @@ -143,8 +205,8 @@ func (t *FlowTable) NewFlow(data FlowData) error { t.removeFlowLocked(t.fromWGCache[data.FromWG.Tuple]) flow := &cachedFlow{ - data: data, - // Populate lastSeen + data: data, + lastSeen: mono.Now(), // Populate onRemove() } @@ -159,6 +221,51 @@ func (t *FlowTable) NewFlow(data FlowData) error { return nil } +// StartExpiredSweeper starts a sweeper that removes idle flows that have +// not been created or looked up for a duration greater than the configured +// idle timeout. See [WithFlowIdleTimeout]. The sweep runs at the configured +// sweep interval. See [WithFlowSweepInterval]. The sweeper stops when ctx +// is canceled. +func (t *FlowTable) StartExpiredSweeper(ctx context.Context) { + if t.sweepInterval == 0 || t.idleTimeout == 0 { + return + } + + ticker := time.NewTicker(t.sweepInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.removeIdle(mono.Now()) + } + } +} + +func (t *FlowTable) removeIdle(now mono.Time) int { + if t.idleTimeout == 0 { + return 0 + } + + t.mu.Lock() + defer t.mu.Unlock() + + removed := 0 + for ele := t.lru.Back(); ele != nil; ele = t.lru.Back() { + if t.maxRemovedPerSweep > 0 && removed >= t.maxRemovedPerSweep { + break + } + flow := ele.Value.(*cachedFlow) + if now.Sub(flow.lastSeen) <= t.idleTimeout { + break + } + t.removeFlowLocked(ele) + removed++ + } + return removed +} + func (t *FlowTable) removeFlowLocked(ele *list.Element) { if ele == nil { return diff --git a/feature/conn25/flowtable_test.go b/feature/conn25/flowtable_test.go index ae3d27144..f8a8f0f0c 100644 --- a/feature/conn25/flowtable_test.go +++ b/feature/conn25/flowtable_test.go @@ -4,11 +4,15 @@ package conn25 import ( + "fmt" "net/netip" "testing" + "testing/synctest" + "time" "tailscale.com/net/flowtrack" "tailscale.com/net/packet" + "tailscale.com/tstime/mono" "tailscale.com/types/ipproto" ) @@ -44,6 +48,17 @@ func mkFlow(fromTun, fromWG flowtrack.Tuple) FlowData { } } +func mkFlows(n int) []FlowData { + flows := make([]FlowData, n) + for i := range n { + flows[i] = mkFlow( + mkTuple(fmt.Sprintf("1.0.%d.%d:1000", (i>>8)&0xff, i&0xff), "2.0.0.1:80"), + mkTuple(fmt.Sprintf("3.0.%d.%d:1000", (i>>8)&0xff, i&0xff), "4.0.0.1:80"), + ) + } + return flows +} + func mustInstallFlow(t *testing.T, ft *FlowTable, flow FlowData) { t.Helper() if err := ft.NewFlow(flow); err != nil { @@ -229,3 +244,138 @@ func TestFlowTable_Eviction(t *testing.T) { assertFlowHit(t, ft, FromTun, dTun) assertFlowHit(t, ft, FromWireGuard, dWG) } + +func syncSubtest(t *testing.T, name string, f func(*testing.T)) { + t.Helper() + t.Run(name, func(t *testing.T) { + synctest.Test(t, f) + }) +} + +func TestFlowTable_removeIdle(t *testing.T) { + type flowSpec struct { + installAt time.Duration // wall-clock time of install, from test start + wantRemoved bool // do we expect this flow to be removed by the sweep + } + + tests := []struct { + name string + idleTimeout time.Duration + maxRemovedPerSweep int + flowSpecs []flowSpec + removalAt time.Duration // wall-clock time of sweep, from test start + }{ + { + name: "one-expired-flow", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 120s + }, + removalAt: 2 * time.Minute, + }, + { + name: "one-not-expired-flow", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: false}, // age at sweep = 30s + }, + removalAt: 30 * time.Second, + }, + { + name: "two-flows-one-expired", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 75s + {installAt: 30 * time.Second, wantRemoved: false}, // age at sweep = 45s + }, + removalAt: 75 * time.Second, + }, + { + name: "two-flows-both-expired", + idleTimeout: 1 * time.Minute, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 120s + {installAt: 30 * time.Second, wantRemoved: true}, // age at sweep = 90s + }, + removalAt: 2 * time.Minute, + }, + { + // Both flows are time-expired, but maxRemovedPerSweep=1 caps removal at 1. + // Flow 0 is at the back of the LRU (installed first) and is removed; flow 1 stays. + name: "two-flows-both-expired-but-max-count-equal-one", + idleTimeout: 1 * time.Minute, + maxRemovedPerSweep: 1, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: true}, // age at sweep = 120s, removed (under cap) + {installAt: 30 * time.Second, wantRemoved: false}, // age at sweep = 90s, kept (cap reached) + }, + removalAt: 2 * time.Minute, + }, + { + name: "zero-idle-timeout-means-no-expiration", + idleTimeout: 0, + flowSpecs: []flowSpec{ + {installAt: 0, wantRemoved: false}, + {installAt: 30 * time.Second, wantRemoved: false}, + }, + removalAt: 2 * time.Minute, + }, + } + + for _, tt := range tests { + syncSubtest(t, tt.name, func(t *testing.T) { + ft := NewFlowTable( + 0, // turn off LRU for these tests + WithFlowIdleTimeout(tt.idleTimeout), + WithMaxRemovedFlowsPerSweep(tt.maxRemovedPerSweep), + ) + + start := time.Now() + flows := mkFlows(len(tt.flowSpecs)) + for i, spec := range tt.flowSpecs { + time.Sleep(time.Until(start.Add(spec.installAt))) + mustInstallFlow(t, ft, flows[i]) + } + + var wantRemovedCount int + for _, spec := range tt.flowSpecs { + if spec.wantRemoved { + wantRemovedCount++ + } + } + + time.Sleep(time.Until(start.Add(tt.removalAt))) + + gotRemovedCount := ft.removeIdle(mono.Now()) + if wantRemovedCount != gotRemovedCount { + t.Errorf("unexpected remove idle count: want %d, got %d", wantRemovedCount, gotRemovedCount) + } + + for i, spec := range tt.flowSpecs { + if spec.wantRemoved { + assertFlowMiss(t, ft, FromTun, flows[i].FromTun.Tuple) + } else { + assertFlowHit(t, ft, FromTun, flows[i].FromTun.Tuple) + } + } + }) + } + + syncSubtest(t, "lookup-resets-lastseen", func(t *testing.T) { + ft := NewFlowTable(0, WithFlowIdleTimeout(time.Minute)) + flows := mkFlows(2) + + mustInstallFlow(t, ft, flows[0]) // t=0 (flow 0 install) + time.Sleep(30 * time.Second) // + mustInstallFlow(t, ft, flows[1]) // t=30s (flow 1 install) + time.Sleep(60 * time.Second) // + assertFlowHit(t, ft, FromTun, flows[0].FromTun.Tuple) // t=90s (flow 0 looked up, lastSeen bumped) + time.Sleep(15 * time.Second) // + + if got := ft.removeIdle(mono.Now()); got != 1 { + t.Errorf("removeIdle returned %d, want 1", got) + } + assertFlowHit(t, ft, FromTun, flows[0].FromTun.Tuple) + assertFlowMiss(t, ft, FromTun, flows[1].FromTun.Tuple) + }) +}