ipn: add no-disconnect in-process bus subscribers

Add NotifyInProcessNoDisconnect for in-process IPN bus subscribers that
must apply every bus update. When such a subscriber falls behind, block
Notify production instead of sending the terminal fell-behind message and
closing the watch.

This is intentionally not available over LocalAPI, where a slow or stuck
out-of-process client should still be disconnected rather than allowed to
stall tailscaled. In-process callers that use the bit must keep their
callbacks fast and must not call back into LocalBackend from the callback.

Updates #20062

Change-Id: I730ad61a07475243bb226fba2262c1a3ded211ae
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick
2026-06-09 19:05:40 +00:00
committed by Brad Fitzpatrick
parent 913df7e6ea
commit 1deb6a8449
6 changed files with 124 additions and 3 deletions

View File

@@ -156,6 +156,21 @@ type EngineStatus struct {
// promotes any patch into a full-Node entry in [Notify.PeersChanged]
// for this session, at the cost of bandwidth.
NotifyPeerPatches NotifyWatchOpt = 1 << 15
// NotifyInProcessNoDisconnect, if set, marks this watcher as an
// in-process subscriber that must not be disconnected for falling behind
// on its notification queue. Instead, if its queue fills, Notify
// production blocks until the watcher catches up.
//
// Callers using this bit must receive and process notifications promptly.
// Their callbacks must not call back into LocalBackend or wait on work that
// might call back into LocalBackend, because the producer might be holding
// LocalBackend's mutex while waiting for the watcher to catch up.
//
// This bit is only valid for in-process callers of
// LocalBackend.WatchNotificationsAs. LocalAPI WatchIPNBus clients must
// not request it.
NotifyInProcessNoDisconnect NotifyWatchOpt = 1 << 16
)
// NotifyRateLimitIncompatibleBits is the set of new-style IPN bus

View File

@@ -55,6 +55,10 @@ func TestValidateNotifyWatchOpt(t *testing.T) {
name: "peer-changes-without-rate-limit",
mask: NotifyPeerChanges | NotifyPeerPatches | NotifyNoNetMap | NotifyInitialStatus,
},
{
name: "in-process-no-disconnect",
mask: NotifyInProcessNoDisconnect | NotifyPeerChanges,
},
{
name: "rate-limit-with-peer-changes",
mask: NotifyRateLimit | NotifyPeerChanges,

View File

@@ -167,6 +167,7 @@ func (b *LocalBackend) ListenSSH(ln net.Listener, logf logger.Logf) (net.Listene
// and sessionID as required to close targeted buses.
type watchSession struct {
ch chan *ipn.Notify
ctx context.Context
owner ipnauth.Actor // or nil
sessionID string
cancel context.CancelFunc // to shut down the session
@@ -3588,7 +3589,10 @@ func applyConfigToHostinfo(hi *tailcfg.Hostinfo, c *conffile.Config) {
// fn returns false, the watch also stops.
//
// Failure to consume many notifications in a row will result in one final
// notification with ErrMessage set, followed by the watch closing.
// notification with ErrMessage set, followed by the watch closing, unless mask
// includes [ipn.NotifyInProcessNoDisconnect]. Watchers using
// NotifyInProcessNoDisconnect must not call back into LocalBackend from fn or
// wait on work that might call back into LocalBackend.
func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, onWatchAdded func(), fn func(roNotify *ipn.Notify) (keepGoing bool)) {
b.WatchNotificationsAs(ctx, nil, mask, onWatchAdded, fn)
}
@@ -3684,6 +3688,7 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A
session := &watchSession{
ch: ch,
ctx: ctx,
owner: actor,
sessionID: sessionID,
cancel: cancel,
@@ -3822,8 +3827,10 @@ func (b *LocalBackend) DebugForcePreferDERP(n int) {
// send delivers n to the connected frontend and any API watchers from
// LocalBackend.WatchNotifications (via the LocalAPI).
//
// If no frontend is connected or API watchers are backed up, the notification
// is dropped without being delivered.
// If no frontend is connected, the notification is dropped without being
// delivered. If an out-of-process watcher is backed up, it is disconnected. If
// an in-process no-disconnect watcher is backed up, sending blocks until the
// watcher catches up.
//
// If n contains Prefs, those will be sanitized before being delivered.
//
@@ -3903,6 +3910,9 @@ func (b *LocalBackend) sendTo(n ipn.Notify, recipient notificationTarget) {
}
// sendToLocked is like [LocalBackend.sendTo], but assumes b.mu is already held.
// If a [ipn.NotifyInProcessNoDisconnect] watcher falls behind, sendToLocked
// blocks until that watcher catches up or closes. Such watchers must not call
// back into LocalBackend from their notification callback.
func (b *LocalBackend) sendToLocked(n ipn.Notify, recipient notificationTarget) {
if n.Prefs != nil {
n.Prefs = new(stripKeysFromPrefs(*n.Prefs))
@@ -3923,6 +3933,13 @@ func (b *LocalBackend) sendToLocked(n ipn.Notify, recipient notificationTarget)
select {
case sess.ch <- nForSess:
default:
if sess.mask&ipn.NotifyInProcessNoDisconnect != 0 {
select {
case sess.ch <- nForSess:
case <-sess.ctx.Done():
}
continue
}
b.closeLaggingWatchSessionLocked(sess)
}
}

View File

@@ -2176,6 +2176,81 @@ func TestWatchNotificationsClosesSlowConsumer(t *testing.T) {
}
}
func TestWatchNotificationsInProcessNoDisconnectBlocksSender(t *testing.T) {
b := newTestLocalBackend(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watchAdded := make(chan struct{})
firstNotify := make(chan struct{}, 1)
releaseFirstNotify := make(chan struct{})
terminalMessage := make(chan string, 1)
done := make(chan struct{})
go func() {
defer close(done)
b.WatchNotificationsAs(ctx, nil, ipn.NotifyInProcessNoDisconnect, func() { close(watchAdded) }, func(n *ipn.Notify) bool {
if n.ErrMessage != nil {
terminalMessage <- *n.ErrMessage
return true
}
select {
case firstNotify <- struct{}{}:
<-releaseFirstNotify
default:
}
return true
})
}()
<-watchAdded
state := ipn.Running
b.send(ipn.Notify{State: &state})
select {
case <-firstNotify:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for first notification")
}
// Fill the 128-slot queue. The next send should block until the
// subscriber catches up, not disconnect the subscriber.
for range 128 {
b.send(ipn.Notify{State: &state})
}
sendDone := make(chan struct{})
go func() {
b.send(ipn.Notify{State: &state})
close(sendDone)
}()
select {
case <-sendDone:
t.Fatal("send returned before the subscriber caught up")
case <-time.After(100 * time.Millisecond):
}
select {
case got := <-terminalMessage:
close(releaseFirstNotify)
t.Fatalf("got terminal message %q; want none", got)
default:
}
close(releaseFirstNotify)
select {
case <-sendDone:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for blocked send to complete")
}
cancel()
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for watcher to close")
}
}
// TestNotifyForSessionPeerVisibility verifies the per-session masking
// logic in [LocalBackend.notifyForSessionLocked] for the
// NotifyPeerChanges / NotifyPeerPatches flag pair:

View File

@@ -902,6 +902,10 @@ func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) {
}
mask = ipn.NotifyWatchOpt(v)
}
if mask&ipn.NotifyInProcessNoDisconnect != 0 {
http.Error(w, "NotifyInProcessNoDisconnect is only valid for in-process IPN bus subscribers", http.StatusBadRequest)
return
}
if err := ipn.ValidateNotifyWatchOpt(mask); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return

View File

@@ -661,6 +661,12 @@ func TestServeWatchIPNBus(t *testing.T) {
mask: ipn.NotifyRateLimit | ipn.NotifyPeerChanges,
wantStatus: http.StatusBadRequest,
},
{
desc: "in-process-no-disconnect-forbidden",
permitRead: true,
mask: ipn.NotifyInProcessNoDisconnect,
wantStatus: http.StatusBadRequest,
},
}
for _, tt := range tests {