net/portmapper: put mappings and releases into an execqueue

We had an issue where releasing mappings would cause a deadlock when the
UPnP gateway did not respond. Instead of working on mappings sync, put
it into an exec queue and add a timeout on releasing the mappings.

Updates tailscale/corp#33888
Updates tailscale/corp#33619

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
Claus Lensbøl
2026-01-15 09:04:20 -05:00
parent 8c17d871b3
commit da28a92b22
12 changed files with 299 additions and 53 deletions

View File

@@ -69,7 +69,7 @@ func runNetcheck(ctx context.Context, args []string) error {
if buildfeatures.HasPortMapper {
// Ensure that we close the portmapper after running a netcheck; this
// will release any port mappings created.
pm = portmappertype.HookNewPortMapper.Get()(logf, bus, netMon, nil, nil)
pm = portmappertype.HookNewPortMapper.Get()(ctx, logf, bus, netMon, nil, nil)
defer pm.Close()
}

View File

@@ -181,6 +181,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics
tailscale.com/util/dnsname from tailscale.com/cmd/tailscale/cli+
tailscale.com/util/eventbus from tailscale.com/client/local+
tailscale.com/util/execqueue from tailscale.com/net/portmapper
tailscale.com/util/groupmember from tailscale.com/client/web
💣 tailscale.com/util/hashx from tailscale.com/util/deephash
tailscale.com/util/httpm from tailscale.com/client/tailscale+

View File

@@ -109,10 +109,11 @@ func serveDebugPortmap(h *localapi.Handler, w http.ResponseWriter, r *http.Reque
var c *portmapper.Client
c = portmapper.NewClient(portmapper.Config{
Logf: logger.WithPrefix(logf, "portmapper: "),
NetMon: h.LocalBackend().NetMon(),
DebugKnobs: debugKnobs,
EventBus: h.LocalBackend().EventBus(),
ShutdownCtx: ctx,
Logf: logger.WithPrefix(logf, "portmapper: "),
NetMon: h.LocalBackend().NetMon(),
DebugKnobs: debugKnobs,
EventBus: h.LocalBackend().EventBus(),
OnChange: func() {
logf("portmapping changed.")
logf("have mapping: %v", c.HaveMapping())

View File

@@ -6,6 +6,8 @@
package portmapper
import (
"context"
"tailscale.com/feature"
"tailscale.com/net/netmon"
"tailscale.com/net/portmapper"
@@ -20,16 +22,18 @@ func init() {
}
func newPortMapper(
ctx context.Context,
logf logger.Logf,
bus *eventbus.Bus,
netMon *netmon.Monitor,
disableUPnPOrNil func() bool,
onlyTCP443OrNil func() bool) portmappertype.Client {
onlyTCP443OrNil func() bool,
) portmappertype.Client {
pm := portmapper.NewClient(portmapper.Config{
EventBus: bus,
Logf: logf,
NetMon: netMon,
ShutdownCtx: ctx,
EventBus: bus,
Logf: logf,
NetMon: netMon,
DebugKnobs: &portmapper.DebugKnobs{
DisableAll: onlyTCP443OrNil,
DisableUPnPFunc: disableUPnPOrNil,

View File

@@ -574,7 +574,6 @@ func nodeMight6(n *tailcfg.DERPNode) bool {
}
ip, _ := netip.ParseAddr(n.IPv6)
return ip.Is6()
}
// nodeMight4 reports whether n might reply to STUN over IPv4 based on
@@ -722,14 +721,14 @@ func (rs *reportState) setOptBool(b *opt.Bool, v bool) {
b.Set(v)
}
func (rs *reportState) probePortMapServices() {
func (rs *reportState) probePortMapServices(ctx context.Context) {
defer rs.waitPortMap.Done()
rs.setOptBool(&rs.report.UPnP, false)
rs.setOptBool(&rs.report.PMP, false)
rs.setOptBool(&rs.report.PCP, false)
res, err := rs.c.PortMapper.Probe(context.Background())
res, err := rs.c.PortMapper.Probe(ctx)
if err != nil {
if !errors.Is(err, portmappertype.ErrGatewayRange) {
// "skipping portmap; gateway range likely lacks support"
@@ -900,7 +899,7 @@ func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap, opts *GetRe
if !c.SkipExternalNetwork && c.PortMapper != nil {
rs.waitPortMap.Add(1)
go rs.probePortMapServices()
go rs.probePortMapServices(ctx)
}
var plan probePlan

View File

@@ -20,6 +20,7 @@
"tailscale.com/tstest"
"tailscale.com/types/logger"
"tailscale.com/util/eventbus"
"tailscale.com/util/eventbus/eventbustest"
"tailscale.com/util/testenv"
)
@@ -213,7 +214,6 @@ func (d *TestIGD) handlePMPQuery(pkt []byte, src netip.AddrPort) {
return
}
d.inc(&d.counters.numPMPPublicAddrRecv)
}
// TODO
}
@@ -266,15 +266,15 @@ func (d *TestIGD) handlePCPQuery(pkt []byte, src netip.AddrPort) {
// A cleanup for the resulting client is also added to t.
func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client {
if bus == nil {
bus = eventbus.New()
bus = eventbustest.NewBus(t)
t.Log("Created empty event bus for test client")
t.Cleanup(bus.Close)
}
var c *Client
c = NewClient(Config{
Logf: tstest.WhileTestRunningLogger(t),
NetMon: netmon.NewStatic(),
EventBus: bus,
ShutdownCtx: t.Context(),
Logf: tstest.WhileTestRunningLogger(t),
NetMon: netmon.NewStatic(),
EventBus: bus,
OnChange: func() { // TODO(creachadair): Remove.
t.Logf("port map changed")
t.Logf("have mapping: %v", c.HaveMapping())

View File

@@ -31,6 +31,7 @@
"tailscale.com/types/nettype"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
"tailscale.com/util/execqueue"
)
var (
@@ -40,6 +41,8 @@
ErrPortMappingDisabled = portmappertype.ErrPortMappingDisabled
)
var releaseTimeout = 10 * time.Second
var disablePortMapperEnv = envknob.RegisterBool("TS_DISABLE_PORTMAPPER")
// DebugKnobs contains debug configuration that can be provided when creating a
@@ -121,6 +124,7 @@ type Client struct {
debug DebugKnobs
testPxPPort uint16 // if non-zero, pxpPort to use for tests
testUPnPPort uint16 // if non-zero, uPnPPort to use for tests
shutdownCtx context.Context
mu syncs.Mutex // guards following, and all fields thereof
@@ -151,6 +155,8 @@ type Client struct {
localPort uint16
mapping mapping // non-nil if we have a mapping
actionQueue execqueue.ExecQueue
}
var _ portmappertype.Client = (*Client)(nil)
@@ -250,6 +256,9 @@ type Config struct {
// OnChange is called to run in a new goroutine whenever the port mapping
// status has changed. If nil, no callback is issued.
OnChange func()
// Context, which must be non-nil holds a shutdown context to derive other contexts from.
ShutdownCtx context.Context
}
// NewClient constructs a new portmapping [Client] from c. It will panic if any
@@ -262,9 +271,10 @@ func NewClient(c Config) *Client {
panic("nil EventBus")
}
ret := &Client{
logf: c.Logf,
netMon: c.NetMon,
onChange: c.OnChange,
logf: c.Logf,
netMon: c.NetMon,
onChange: c.OnChange,
shutdownCtx: c.ShutdownCtx,
}
if buildfeatures.HasPortMapper {
// TODO(bradfitz): move this to method on netMon
@@ -391,13 +401,28 @@ func (c *Client) listenPacket(ctx context.Context, network, addr string) (nettyp
return pc.(*net.UDPConn), nil
}
func (c *Client) invalidateMappingsLocked(releaseOld bool) {
func (c *Client) releaseActiveMappingAsyncLocked(releaseOld bool) {
if c.mapping != nil {
if releaseOld {
c.mapping.Release(context.Background())
}
mapping := c.mapping
c.mapping = nil
if releaseOld {
c.actionQueue.Add(func() {
ctx, cancel := context.WithTimeout(c.shutdownCtx, releaseTimeout)
defer cancel()
mapping.Release(ctx)
c.updates.Publish(portmappertype.Mapping{
External: mapping.External(),
GoodUntil: mapping.GoodUntil(),
Type: mapping.MappingType(),
Status: portmappertype.StatusRemovedFromGateway,
})
})
}
}
}
func (c *Client) invalidateMappingsLocked(releaseOld bool) {
c.releaseActiveMappingAsyncLocked(releaseOld)
c.pmpPubIP = netip.Addr{}
c.pmpPubIPTime = time.Time{}
@@ -499,12 +524,12 @@ func (c *Client) GetCachedMappingOrStartCreatingOne() (external netip.AddrPort,
func (c *Client) maybeStartMappingLocked() {
if !c.runningCreate {
c.runningCreate = true
go c.createMapping()
c.actionQueue.Add(c.createMapping)
}
}
func (c *Client) createMapping() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(c.shutdownCtx, 5*time.Second)
defer cancel()
defer func() {
@@ -531,6 +556,7 @@ func (c *Client) createMapping() {
External: mapping.External(),
Type: mapping.MappingType(),
GoodUntil: mapping.GoodUntil(),
Status: portmappertype.StatusCreated,
})
// TODO(creachadair): Remove this entirely once there are no longer any
// places where the callback is set.
@@ -630,20 +656,28 @@ func (c *Client) createOrGetMapping(ctx context.Context) (mapping mapping, exter
prevPort = m.External().Port()
}
if c.debug.DisablePCP() && c.debug.DisablePMP() {
c.mu.Unlock()
if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return nil, external, nil
}
c.vlogf("fallback to UPnP due to PCP and PMP being disabled failed")
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
pxpDisabled := c.debug.DisablePCP() && c.debug.DisablePMP()
// If we just did a Probe (e.g. via netchecker) but didn't
// find a PMP service, bail out early rather than probing
// again. Cuts down latency for most clients.
haveRecentPMP := c.sawPMPRecentlyLocked()
haveRecentPCP := c.sawPCPRecentlyLocked()
noRecentPXP := c.lastProbe.After(now.Add(-5*time.Second)) &&
!haveRecentPMP && !haveRecentPCP
if pxpDisabled || noRecentPXP {
c.mu.Unlock()
if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return nil, external, nil
}
if pxpDisabled {
c.vlogf("fallback to UPnP due to PCP and PMP being disabled failed")
} else { // noRecentPXP
c.vlogf("fallback to UPnP due to no PCP and PMP failed")
}
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
// Since PMP mapping may require multiple calls, and it's not clear from the outset
// whether we're doing a PCP or PMP call, initialize the PMP mapping here,
@@ -659,15 +693,6 @@ func (c *Client) createOrGetMapping(ctx context.Context) (mapping mapping, exter
if haveRecentPMP {
m.external = netip.AddrPortFrom(c.pmpPubIP, m.external.Port())
}
if c.lastProbe.After(now.Add(-5*time.Second)) && !haveRecentPMP && !haveRecentPCP {
c.mu.Unlock()
// fallback to UPnP portmapping
if external, ok := c.getUPnPPortMapping(ctx, gw, internalAddr, prevPort); ok {
return nil, external, nil
}
c.vlogf("fallback to UPnP due to no PCP and PMP failed")
return nil, netip.AddrPort{}, NoMappingError{ErrNoPortMappingServices}
}
c.mu.Unlock()
uc, err := c.listenPacket(ctx, "udp4", ":0")
@@ -888,7 +913,7 @@ func (c *Client) Probe(ctx context.Context) (res portmappertype.ProbeResult, err
}
}()
uc, err := c.listenPacket(context.Background(), "udp4", ":0")
uc, err := c.listenPacket(ctx, "udp4", ":0")
if err != nil {
c.logf("ProbePCP: %v", err)
return res, err

View File

@@ -102,7 +102,7 @@ func TestPCPIntegration(t *testing.T) {
defer igd.Close()
c := newTestClient(t, igd, nil)
res, err := c.Probe(context.Background())
res, err := c.Probe(t.Context())
if err != nil {
t.Fatalf("probe failed: %v", err)
}
@@ -113,7 +113,7 @@ func TestPCPIntegration(t *testing.T) {
t.Fatalf("probe did not see pcp: %+v", res)
}
_, external, err := c.createOrGetMapping(context.Background())
_, external, err := c.createOrGetMapping(t.Context())
if err != nil {
t.Fatalf("failed to get mapping: %v", err)
}

View File

@@ -19,7 +19,8 @@
// HookNewPortMapper is a hook to install the portmapper creation function.
// It must be set by an init function when buildfeatures.HasPortmapper is true.
var HookNewPortMapper feature.Hook[func(logf logger.Logf,
var HookNewPortMapper feature.Hook[func(ctx context.Context,
logf logger.Logf,
bus *eventbus.Bus,
netMon *netmon.Monitor,
disableUPnPOrNil,
@@ -82,7 +83,13 @@ type Client interface {
type Mapping struct {
External netip.AddrPort
Type string
Status string
GoodUntil time.Time
// TODO(creachadair): Record whether we reused an existing mapping?
}
const (
StatusCreated = "StatusCreated"
StatusRemovedFromGateway = "StatusRemovedFromGateway"
)

View File

@@ -110,6 +110,7 @@ func (u *upnpMapping) MappingDebug() string {
u.renewAfter.Unix(), u.goodUntil.Unix(),
u.loc)
}
func (u *upnpMapping) Release(ctx context.Context) {
u.client.DeletePortMappingCtx(ctx, "", u.external.Port(), upnpProtocolUDP)
}

View File

@@ -17,9 +17,11 @@
"slices"
"sync/atomic"
"testing"
"time"
"tailscale.com/net/portmapper/portmappertype"
"tailscale.com/tstest"
"tailscale.com/util/eventbus/eventbustest"
)
// Google Wifi
@@ -628,6 +630,201 @@ func TestGetUPnPPortMapping(t *testing.T) {
}
}
func TestReleaseUPnPPortMapping(t *testing.T) {
t.Run("GoodRouterReponse", func(t *testing.T) {
igd, err := NewTestIGD(t, TestIGDOptions{UPnP: true})
if err != nil {
t.Fatal(err)
}
defer igd.Close()
// This is a very basic fake UPnP server handler.
var sawRequestWithLease atomic.Bool
handlers := map[string]any{
"AddPortMapping": func(body []byte) (int, string) {
// Decode a minimal body to determine whether we skip the request or not.
var req struct {
Protocol string `xml:"NewProtocol"`
InternalPort string `xml:"NewInternalPort"`
ExternalPort string `xml:"NewExternalPort"`
InternalClient string `xml:"NewInternalClient"`
LeaseDuration string `xml:"NewLeaseDuration"`
}
if err := xml.Unmarshal(body, &req); err != nil {
t.Errorf("bad request: %v", err)
return http.StatusBadRequest, "bad request"
}
if req.Protocol != "UDP" {
t.Errorf(`got Protocol=%q, want "UDP"`, req.Protocol)
}
if req.LeaseDuration != "0" {
// Return a fake error to ensure that we fall back to a permanent lease.
sawRequestWithLease.Store(true)
return http.StatusOK, testAddPortMappingPermanentLease
}
// Success!
return http.StatusOK, testAddPortMappingResponse
},
"GetExternalIPAddress": testGetExternalIPAddressResponse,
"GetStatusInfo": testGetStatusInfoResponse,
"DeletePortMapping": testDeletePortMappingResponse,
}
ctx := t.Context()
rootDesc := testRootDesc
igd.SetUPnPHandler(&upnpServer{
t: t,
Desc: rootDesc,
Control: map[string]map[string]any{
"/ctl/IPConn": handlers,
"/upnp/control/yomkmsnooi/wanipconn-1": handlers,
},
})
bus := eventbustest.NewBus(t)
c := newTestClient(t, igd, bus)
t.Logf("Listening on upnp=%v", c.testUPnPPort)
c.debug.VerboseLogs = true
var port uint16
sawRequestWithLease.Store(false)
mustProbeUPnP(t, ctx, c)
gw, myIP, ok := c.gatewayAndSelfIP()
if !ok {
t.Fatalf("could not get gateway and self IP")
}
t.Logf("gw=%v myIP=%v", gw, myIP)
ext, ok := c.getUPnPPortMapping(ctx, gw, netip.AddrPortFrom(myIP, 12345), port)
if !ok {
t.Fatal("could not get UPnP port mapping")
}
if got, want := ext.Addr(), netip.MustParseAddr("123.123.123.123"); got != want {
t.Errorf("bad external address; got %v want %v", got, want)
}
if !sawRequestWithLease.Load() {
t.Errorf("wanted request with lease, but didn't see one")
}
port = ext.Port()
t.Logf("external IP:port : %v:%v", ext, port)
tw := eventbustest.NewWatcher(t, bus)
c.mu.Lock()
c.invalidateMappingsLocked(true)
c.mu.Unlock()
err = eventbustest.Expect(tw, eventbustest.Type[portmappertype.Mapping]())
if err != nil {
t.Errorf("failed to release UPnP mapping: %v", err)
}
})
t.Run("NoRouterReponse", func(t *testing.T) {
igd, err := NewTestIGD(t, TestIGDOptions{UPnP: true})
if err != nil {
t.Fatal(err)
}
defer igd.Close()
deleteWaitDoneCh := make(chan any)
// This is a very basic fake UPnP server handler.
var sawRequestWithLease atomic.Bool
handlers := map[string]any{
"AddPortMapping": func(body []byte) (int, string) {
// Decode a minimal body to determine whether we skip the request or not.
var req struct {
Protocol string `xml:"NewProtocol"`
InternalPort string `xml:"NewInternalPort"`
ExternalPort string `xml:"NewExternalPort"`
InternalClient string `xml:"NewInternalClient"`
LeaseDuration string `xml:"NewLeaseDuration"`
}
if err := xml.Unmarshal(body, &req); err != nil {
t.Errorf("bad request: %v", err)
return http.StatusBadRequest, "bad request"
}
if req.Protocol != "UDP" {
t.Errorf(`got Protocol=%q, want "UDP"`, req.Protocol)
}
if req.LeaseDuration != "0" {
// Return a fake error to ensure that we fall back to a permanent lease.
sawRequestWithLease.Store(true)
return http.StatusOK, testAddPortMappingPermanentLease
}
// Success!
return http.StatusOK, testAddPortMappingResponse
},
"GetExternalIPAddress": testGetExternalIPAddressResponse,
"GetStatusInfo": testGetStatusInfoResponse,
"DeletePortMapping": func(body []byte) (int, string) {
<-deleteWaitDoneCh
// return will happen after the end of the test, so just return OK
return http.StatusOK, ""
},
}
ctx := t.Context()
rootDesc := testRootDesc
igd.SetUPnPHandler(&upnpServer{
t: t,
Desc: rootDesc,
Control: map[string]map[string]any{
"/ctl/IPConn": handlers,
"/upnp/control/yomkmsnooi/wanipconn-1": handlers,
},
})
bus := eventbustest.NewBus(t)
c := newTestClient(t, igd, bus)
releaseTimeout = 10 * time.Millisecond
t.Logf("Listening on upnp=%v", c.testUPnPPort)
c.debug.VerboseLogs = true
var port uint16
sawRequestWithLease.Store(false)
mustProbeUPnP(t, ctx, c)
gw, myIP, ok := c.gatewayAndSelfIP()
if !ok {
t.Fatalf("could not get gateway and self IP")
}
t.Logf("gw=%v myIP=%v", gw, myIP)
ext, ok := c.getUPnPPortMapping(ctx, gw, netip.AddrPortFrom(myIP, 12345), port)
if !ok {
t.Fatal("could not get UPnP port mapping")
}
if got, want := ext.Addr(), netip.MustParseAddr("123.123.123.123"); got != want {
t.Errorf("bad external address; got %v want %v", got, want)
}
if !sawRequestWithLease.Load() {
t.Errorf("wanted request with lease, but didn't see one")
}
port = ext.Port()
t.Logf("external IP:port : %v:%v", ext, port)
tw := eventbustest.NewWatcher(t, bus)
c.mu.Lock()
c.invalidateMappingsLocked(true)
c.mu.Unlock()
err = eventbustest.Expect(tw, eventbustest.Type[portmappertype.Mapping]())
deleteWaitDoneCh <- nil
if err != nil {
t.Errorf("failed to release UPnP mapping: %v", err)
}
})
}
func TestGetUPnPPortMapping_LeaseDuration(t *testing.T) {
testCases := []struct {
name string
@@ -639,7 +836,6 @@ func TestGetUPnPPortMapping_LeaseDuration(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// This is a very basic fake UPnP server handler.
var sawRequestWithLease atomic.Bool
handlers := map[string]any{
@@ -1177,6 +1373,14 @@ func mustProbeUPnP(tb testing.TB, ctx context.Context, c *Client) portmappertype
</s:Envelope>
`
const testDeletePortMappingResponse = `<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body><m:DeletePortMappingResponse xmlns:m="urn:schemas-upnp-org:service:WANIPConnection:1">
</m:DeletePortMappingResponse>
</s:Body>
</s:Envelope>
`
const testLegacyAddPortMappingResponse = `<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>

View File

@@ -720,7 +720,7 @@ func NewConn(opts Options) (*Conn, error) {
}
newPortMapper, ok := portmappertype.HookNewPortMapper.GetOk()
if ok {
c.portMapper = newPortMapper(portmapperLogf, opts.EventBus, opts.NetMon, disableUPnP, c.onlyTCP443.Load)
c.portMapper = newPortMapper(c.connCtx, portmapperLogf, opts.EventBus, opts.NetMon, disableUPnP, c.onlyTCP443.Load)
}
// If !ok, the HookNewPortMapper hook is not set (so feature/portmapper
// isn't linked), but the build tag to explicitly omit the portmapper
@@ -3453,7 +3453,11 @@ func (c *Conn) shouldDoPeriodicReSTUNLocked() bool {
return true
}
func (c *Conn) onPortMapChanged(portmappertype.Mapping) { c.ReSTUN("portmap-changed") }
func (c *Conn) onPortMapChanged(m portmappertype.Mapping) {
if m.Status != portmappertype.StatusRemovedFromGateway {
c.ReSTUN("portmap-changed")
}
}
// ReSTUN triggers an address discovery.
// The provided why string is for debug logging only.