feature/conn25: expire idle flows from FlowTable

Track lastSeen on each cached flow and add a sweeper goroutine
that periodically removes flows idle past the idle timeout.

Introduce tunables for idle timeout, maximum flows removed per sweep (to
limit mutex hold time), and the sweeper interval.

Also cap the previously-unlimited tables: 10k client flows, 100k
connector flows.

Updates tailscale/corp#38630

Signed-off-by: Michael Ben-Ami <mzb@tailscale.com>
This commit is contained in:
Michael Ben-Ami
2026-05-22 12:01:11 -04:00
committed by mzbenami
parent 65a117184b
commit 618b606b46
4 changed files with 286 additions and 17 deletions

View File

@@ -134,6 +134,7 @@ func (e *extension) Init(host ipnext.Host) error {
ctx, cancel := context.WithCancelCause(context.Background())
e.ctxCancel = cancel
go e.sendLoop(ctx)
dph.StartFlowExpirySweepers(ctx)
return nil
}

View File

@@ -4,6 +4,7 @@
package conn25
import (
"context"
"errors"
"net/netip"
@@ -83,19 +84,29 @@ type datapathHandler struct {
debugLogging bool
}
const (
maxClientFlows = 10_000
maxConnectorFlows = 100_000
)
func newDatapathHandler(ipMapper IPMapper, logf logger.Logf) *datapathHandler {
return &datapathHandler{
ipMapper: ipMapper,
// TODO(mzb): Figure out sensible default max size for flow tables.
// Don't do any LRU eviction until we figure out deletion and expiration.
clientFlowTable: NewFlowTable(0),
connectorFlowTable: NewFlowTable(0),
ipMapper: ipMapper,
clientFlowTable: NewFlowTable(maxClientFlows),
connectorFlowTable: NewFlowTable(maxConnectorFlows),
logf: logf,
debugLogging: envknob.Bool("TS_CONN25_DATAPATH_DEBUG"),
}
}
// StartFlowExpirySweepers starts the sweepers that remove expired flows
// for both the client and connector flow tables. Each sweeper runs in
// its own new goroutine.
func (dh *datapathHandler) StartFlowExpirySweepers(ctx context.Context) {
go dh.clientFlowTable.StartExpiredSweeper(ctx)
go dh.connectorFlowTable.StartExpiredSweeper(ctx)
}
// HandlePacketFromWireGuard inspects packets coming from WireGuard, and performs
// appropriate DNAT or SNAT actions for Connectors 2025. Returning [filter.Accept] signals
// that the packet should pass through subsequent stages of the datapath pipeline.

View File

@@ -5,10 +5,13 @@
import (
"container/list"
"context"
"sync"
"time"
"tailscale.com/net/flowtrack"
"tailscale.com/net/packet"
"tailscale.com/tstime/mono"
)
// PacketAction may modify the packet.
@@ -48,7 +51,7 @@ type FlowData struct {
type cachedFlow struct {
data FlowData // user-defined tuples and actions for both directions
// lastSeen time.Time // tracks when the flow was last hit for expiration management
lastSeen mono.Time // tracks when the flow was last hit for expiration management
// onRemove func() // fires on removal/expiration (e.g. update watchers, send RST to client)
}
@@ -59,22 +62,81 @@ type cachedFlow struct {
// of entries is specified in calls to [NewFlowTable]. FlowTable has
// its own mutex and is safe for concurrent use.
type FlowTable struct {
maxEntries int
idleTimeout time.Duration
sweepInterval time.Duration
maxRemovedPerSweep int
mu sync.Mutex
fromTunCache map[flowtrack.Tuple]*list.Element
fromWGCache map[flowtrack.Tuple]*list.Element
lru *list.List
maxEntries int
}
const (
// DefaultFlowIdleTimeout is the default idle timeout for a flow.
// See also [WithFlowIdleTimeout].
DefaultFlowIdleTimeout = 5 * time.Minute
// DefaultFlowSweepInterval is the default sweep interval for
// automatically removing expired flows. See also [WithFlowSweepInterval].
DefaultFlowSweepInterval = 3 * time.Minute
// DefaultMaxRemovedFlowsPerSweep is the default maximum number of
// flows removed per sweep. It can be used to tune how long the table
// mutex is held during sweeps. See also [WithMaxRemovedFlowsPerSweep].
DefaultMaxRemovedFlowsPerSweep = 1000
)
// FlowTableOption configures options for use with [NewFlowTable].
type FlowTableOption func(ft *FlowTable)
// WithFlowIdleTimeout sets the threshold duration for flow idle time
// before it is eligible for removal. A flow is considered idle for the
// time that elapses since its creation or last lookup. A duration of
// 0 means that expiration is disabled. If WithFlowIdleTimeout is not
// passed to [NewFlowTable], then [DefaultFlowIdleTimeout] is used.
func WithFlowIdleTimeout(timeout time.Duration) FlowTableOption {
return func(ft *FlowTable) {
ft.idleTimeout = timeout
}
}
// WithFlowSweepInterval sets the interval to automatically
// remove idle flows that exceed the idle timeout. A value of 0
// disables automatic sweeping. If WithFlowSweepInterval is not
// passed to [NewFlowTable], then [DefaultFlowSweepInterval] is used.
func WithFlowSweepInterval(ival time.Duration) FlowTableOption {
return func(ft *FlowTable) {
ft.sweepInterval = ival
}
}
// WithMaxRemovedFlowsPerSweep sets maximum number of expired
// flows that can be removed per sweep. A value of 0 means no
// maximum. If WithMaxRemovedFlowsPerSweep is not passed to
// [NewFlowTable], then [DefaultMaxRemovedFlowsPerSweep] is used.
func WithMaxRemovedFlowsPerSweep(maxPer int) FlowTableOption {
return func(ft *FlowTable) {
ft.maxRemovedPerSweep = maxPer
}
}
// NewFlowTable returns a [FlowTable] with maxEntries maximum entries.
// A maxEntries of 0 indicates no maximum. See also [FlowTable].
func NewFlowTable(maxEntries int) *FlowTable {
return &FlowTable{
fromTunCache: make(map[flowtrack.Tuple]*list.Element, maxEntries),
fromWGCache: make(map[flowtrack.Tuple]*list.Element, maxEntries),
lru: list.New(),
maxEntries: maxEntries,
func NewFlowTable(maxEntries int, opts ...FlowTableOption) *FlowTable {
ft := &FlowTable{
maxEntries: maxEntries,
idleTimeout: DefaultFlowIdleTimeout,
sweepInterval: DefaultFlowSweepInterval,
maxRemovedPerSweep: DefaultMaxRemovedFlowsPerSweep,
fromTunCache: make(map[flowtrack.Tuple]*list.Element, maxEntries),
fromWGCache: make(map[flowtrack.Tuple]*list.Element, maxEntries),
lru: list.New(),
}
for _, o := range opts {
o(ft)
}
return ft
}
// LookupFromTunDevice looks up a [PacketAction] that is valid to run on packets
@@ -123,7 +185,7 @@ func (t *FlowTable) lookup(k flowtrack.Tuple, dir Origin) (PacketAction, bool) {
// Support LRU.
t.lru.MoveToFront(ele)
// TODO(mzb): Update flow.lastSeen.
flow.lastSeen = mono.Now()
return action, true
}
@@ -143,8 +205,8 @@ func (t *FlowTable) NewFlow(data FlowData) error {
t.removeFlowLocked(t.fromWGCache[data.FromWG.Tuple])
flow := &cachedFlow{
data: data,
// Populate lastSeen
data: data,
lastSeen: mono.Now(),
// Populate onRemove()
}
@@ -159,6 +221,51 @@ func (t *FlowTable) NewFlow(data FlowData) error {
return nil
}
// StartExpiredSweeper starts a sweeper that removes idle flows that have
// not been created or looked up for a duration greater than the configured
// idle timeout. See [WithFlowIdleTimeout]. The sweep runs at the configured
// sweep interval. See [WithFlowSweepInterval]. The sweeper stops when ctx
// is canceled.
func (t *FlowTable) StartExpiredSweeper(ctx context.Context) {
if t.sweepInterval == 0 || t.idleTimeout == 0 {
return
}
ticker := time.NewTicker(t.sweepInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
t.removeIdle(mono.Now())
}
}
}
func (t *FlowTable) removeIdle(now mono.Time) int {
if t.idleTimeout == 0 {
return 0
}
t.mu.Lock()
defer t.mu.Unlock()
removed := 0
for ele := t.lru.Back(); ele != nil; ele = t.lru.Back() {
if t.maxRemovedPerSweep > 0 && removed >= t.maxRemovedPerSweep {
break
}
flow := ele.Value.(*cachedFlow)
if now.Sub(flow.lastSeen) <= t.idleTimeout {
break
}
t.removeFlowLocked(ele)
removed++
}
return removed
}
func (t *FlowTable) removeFlowLocked(ele *list.Element) {
if ele == nil {
return

View File

@@ -4,11 +4,15 @@
package conn25
import (
"fmt"
"net/netip"
"testing"
"testing/synctest"
"time"
"tailscale.com/net/flowtrack"
"tailscale.com/net/packet"
"tailscale.com/tstime/mono"
"tailscale.com/types/ipproto"
)
@@ -44,6 +48,17 @@ func mkFlow(fromTun, fromWG flowtrack.Tuple) FlowData {
}
}
func mkFlows(n int) []FlowData {
flows := make([]FlowData, n)
for i := range n {
flows[i] = mkFlow(
mkTuple(fmt.Sprintf("1.0.%d.%d:1000", (i>>8)&0xff, i&0xff), "2.0.0.1:80"),
mkTuple(fmt.Sprintf("3.0.%d.%d:1000", (i>>8)&0xff, i&0xff), "4.0.0.1:80"),
)
}
return flows
}
func mustInstallFlow(t *testing.T, ft *FlowTable, flow FlowData) {
t.Helper()
if err := ft.NewFlow(flow); err != nil {
@@ -229,3 +244,138 @@ func TestFlowTable_Eviction(t *testing.T) {
assertFlowHit(t, ft, FromTun, dTun)
assertFlowHit(t, ft, FromWireGuard, dWG)
}
func syncSubtest(t *testing.T, name string, f func(*testing.T)) {
t.Helper()
t.Run(name, func(t *testing.T) {
synctest.Test(t, f)
})
}
func TestFlowTable_removeIdle(t *testing.T) {
type flowSpec struct {
installAt time.Duration // wall-clock time of install, from test start
wantRemoved bool // do we expect this flow to be removed by the sweep
}
tests := []struct {
name string
idleTimeout time.Duration
maxRemovedPerSweep int
flowSpecs []flowSpec
removalAt time.Duration // wall-clock time of sweep, from test start
}{
{
name: "one-expired-flow",
idleTimeout: 1 * time.Minute,
flowSpecs: []flowSpec{
{installAt: 0, wantRemoved: true}, // age at sweep = 120s
},
removalAt: 2 * time.Minute,
},
{
name: "one-not-expired-flow",
idleTimeout: 1 * time.Minute,
flowSpecs: []flowSpec{
{installAt: 0, wantRemoved: false}, // age at sweep = 30s
},
removalAt: 30 * time.Second,
},
{
name: "two-flows-one-expired",
idleTimeout: 1 * time.Minute,
flowSpecs: []flowSpec{
{installAt: 0, wantRemoved: true}, // age at sweep = 75s
{installAt: 30 * time.Second, wantRemoved: false}, // age at sweep = 45s
},
removalAt: 75 * time.Second,
},
{
name: "two-flows-both-expired",
idleTimeout: 1 * time.Minute,
flowSpecs: []flowSpec{
{installAt: 0, wantRemoved: true}, // age at sweep = 120s
{installAt: 30 * time.Second, wantRemoved: true}, // age at sweep = 90s
},
removalAt: 2 * time.Minute,
},
{
// Both flows are time-expired, but maxRemovedPerSweep=1 caps removal at 1.
// Flow 0 is at the back of the LRU (installed first) and is removed; flow 1 stays.
name: "two-flows-both-expired-but-max-count-equal-one",
idleTimeout: 1 * time.Minute,
maxRemovedPerSweep: 1,
flowSpecs: []flowSpec{
{installAt: 0, wantRemoved: true}, // age at sweep = 120s, removed (under cap)
{installAt: 30 * time.Second, wantRemoved: false}, // age at sweep = 90s, kept (cap reached)
},
removalAt: 2 * time.Minute,
},
{
name: "zero-idle-timeout-means-no-expiration",
idleTimeout: 0,
flowSpecs: []flowSpec{
{installAt: 0, wantRemoved: false},
{installAt: 30 * time.Second, wantRemoved: false},
},
removalAt: 2 * time.Minute,
},
}
for _, tt := range tests {
syncSubtest(t, tt.name, func(t *testing.T) {
ft := NewFlowTable(
0, // turn off LRU for these tests
WithFlowIdleTimeout(tt.idleTimeout),
WithMaxRemovedFlowsPerSweep(tt.maxRemovedPerSweep),
)
start := time.Now()
flows := mkFlows(len(tt.flowSpecs))
for i, spec := range tt.flowSpecs {
time.Sleep(time.Until(start.Add(spec.installAt)))
mustInstallFlow(t, ft, flows[i])
}
var wantRemovedCount int
for _, spec := range tt.flowSpecs {
if spec.wantRemoved {
wantRemovedCount++
}
}
time.Sleep(time.Until(start.Add(tt.removalAt)))
gotRemovedCount := ft.removeIdle(mono.Now())
if wantRemovedCount != gotRemovedCount {
t.Errorf("unexpected remove idle count: want %d, got %d", wantRemovedCount, gotRemovedCount)
}
for i, spec := range tt.flowSpecs {
if spec.wantRemoved {
assertFlowMiss(t, ft, FromTun, flows[i].FromTun.Tuple)
} else {
assertFlowHit(t, ft, FromTun, flows[i].FromTun.Tuple)
}
}
})
}
syncSubtest(t, "lookup-resets-lastseen", func(t *testing.T) {
ft := NewFlowTable(0, WithFlowIdleTimeout(time.Minute))
flows := mkFlows(2)
mustInstallFlow(t, ft, flows[0]) // t=0 (flow 0 install)
time.Sleep(30 * time.Second) //
mustInstallFlow(t, ft, flows[1]) // t=30s (flow 1 install)
time.Sleep(60 * time.Second) //
assertFlowHit(t, ft, FromTun, flows[0].FromTun.Tuple) // t=90s (flow 0 looked up, lastSeen bumped)
time.Sleep(15 * time.Second) //
if got := ft.removeIdle(mono.Now()); got != 1 {
t.Errorf("removeIdle returned %d, want 1", got)
}
assertFlowHit(t, ft, FromTun, flows[0].FromTun.Tuple)
assertFlowMiss(t, ft, FromTun, flows[1].FromTun.Tuple)
})
}