From 1deb6a844937bb801629bc0425f441276d80ddca Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 9 Jun 2026 19:05:40 +0000 Subject: [PATCH] ipn: add no-disconnect in-process bus subscribers Add NotifyInProcessNoDisconnect for in-process IPN bus subscribers that must apply every bus update. When such a subscriber falls behind, block Notify production instead of sending the terminal fell-behind message and closing the watch. This is intentionally not available over LocalAPI, where a slow or stuck out-of-process client should still be disconnected rather than allowed to stall tailscaled. In-process callers that use the bit must keep their callbacks fast and must not call back into LocalBackend from the callback. Updates #20062 Change-Id: I730ad61a07475243bb226fba2262c1a3ded211ae Signed-off-by: Brad Fitzpatrick --- ipn/backend.go | 15 +++++++ ipn/backend_test.go | 4 ++ ipn/ipnlocal/local.go | 23 +++++++++-- ipn/ipnlocal/local_test.go | 75 +++++++++++++++++++++++++++++++++++ ipn/localapi/localapi.go | 4 ++ ipn/localapi/localapi_test.go | 6 +++ 6 files changed, 124 insertions(+), 3 deletions(-) diff --git a/ipn/backend.go b/ipn/backend.go index f6d229a9a..e387e6d96 100644 --- a/ipn/backend.go +++ b/ipn/backend.go @@ -156,6 +156,21 @@ type EngineStatus struct { // promotes any patch into a full-Node entry in [Notify.PeersChanged] // for this session, at the cost of bandwidth. NotifyPeerPatches NotifyWatchOpt = 1 << 15 + + // NotifyInProcessNoDisconnect, if set, marks this watcher as an + // in-process subscriber that must not be disconnected for falling behind + // on its notification queue. Instead, if its queue fills, Notify + // production blocks until the watcher catches up. + // + // Callers using this bit must receive and process notifications promptly. + // Their callbacks must not call back into LocalBackend or wait on work that + // might call back into LocalBackend, because the producer might be holding + // LocalBackend's mutex while waiting for the watcher to catch up. + // + // This bit is only valid for in-process callers of + // LocalBackend.WatchNotificationsAs. LocalAPI WatchIPNBus clients must + // not request it. + NotifyInProcessNoDisconnect NotifyWatchOpt = 1 << 16 ) // NotifyRateLimitIncompatibleBits is the set of new-style IPN bus diff --git a/ipn/backend_test.go b/ipn/backend_test.go index 38159c821..1957e21e6 100644 --- a/ipn/backend_test.go +++ b/ipn/backend_test.go @@ -55,6 +55,10 @@ func TestValidateNotifyWatchOpt(t *testing.T) { name: "peer-changes-without-rate-limit", mask: NotifyPeerChanges | NotifyPeerPatches | NotifyNoNetMap | NotifyInitialStatus, }, + { + name: "in-process-no-disconnect", + mask: NotifyInProcessNoDisconnect | NotifyPeerChanges, + }, { name: "rate-limit-with-peer-changes", mask: NotifyRateLimit | NotifyPeerChanges, diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 773656e6b..a49608eef 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -167,6 +167,7 @@ func (b *LocalBackend) ListenSSH(ln net.Listener, logf logger.Logf) (net.Listene // and sessionID as required to close targeted buses. type watchSession struct { ch chan *ipn.Notify + ctx context.Context owner ipnauth.Actor // or nil sessionID string cancel context.CancelFunc // to shut down the session @@ -3588,7 +3589,10 @@ func applyConfigToHostinfo(hi *tailcfg.Hostinfo, c *conffile.Config) { // fn returns false, the watch also stops. // // Failure to consume many notifications in a row will result in one final -// notification with ErrMessage set, followed by the watch closing. +// notification with ErrMessage set, followed by the watch closing, unless mask +// includes [ipn.NotifyInProcessNoDisconnect]. Watchers using +// NotifyInProcessNoDisconnect must not call back into LocalBackend from fn or +// wait on work that might call back into LocalBackend. func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, onWatchAdded func(), fn func(roNotify *ipn.Notify) (keepGoing bool)) { b.WatchNotificationsAs(ctx, nil, mask, onWatchAdded, fn) } @@ -3684,6 +3688,7 @@ func (b *LocalBackend) WatchNotificationsAs(ctx context.Context, actor ipnauth.A session := &watchSession{ ch: ch, + ctx: ctx, owner: actor, sessionID: sessionID, cancel: cancel, @@ -3822,8 +3827,10 @@ func (b *LocalBackend) DebugForcePreferDERP(n int) { // send delivers n to the connected frontend and any API watchers from // LocalBackend.WatchNotifications (via the LocalAPI). // -// If no frontend is connected or API watchers are backed up, the notification -// is dropped without being delivered. +// If no frontend is connected, the notification is dropped without being +// delivered. If an out-of-process watcher is backed up, it is disconnected. If +// an in-process no-disconnect watcher is backed up, sending blocks until the +// watcher catches up. // // If n contains Prefs, those will be sanitized before being delivered. // @@ -3903,6 +3910,9 @@ func (b *LocalBackend) sendTo(n ipn.Notify, recipient notificationTarget) { } // sendToLocked is like [LocalBackend.sendTo], but assumes b.mu is already held. +// If a [ipn.NotifyInProcessNoDisconnect] watcher falls behind, sendToLocked +// blocks until that watcher catches up or closes. Such watchers must not call +// back into LocalBackend from their notification callback. func (b *LocalBackend) sendToLocked(n ipn.Notify, recipient notificationTarget) { if n.Prefs != nil { n.Prefs = new(stripKeysFromPrefs(*n.Prefs)) @@ -3923,6 +3933,13 @@ func (b *LocalBackend) sendToLocked(n ipn.Notify, recipient notificationTarget) select { case sess.ch <- nForSess: default: + if sess.mask&ipn.NotifyInProcessNoDisconnect != 0 { + select { + case sess.ch <- nForSess: + case <-sess.ctx.Done(): + } + continue + } b.closeLaggingWatchSessionLocked(sess) } } diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 8aaa20691..2c5a39dd3 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -2176,6 +2176,81 @@ func TestWatchNotificationsClosesSlowConsumer(t *testing.T) { } } +func TestWatchNotificationsInProcessNoDisconnectBlocksSender(t *testing.T) { + b := newTestLocalBackend(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + watchAdded := make(chan struct{}) + firstNotify := make(chan struct{}, 1) + releaseFirstNotify := make(chan struct{}) + terminalMessage := make(chan string, 1) + done := make(chan struct{}) + + go func() { + defer close(done) + b.WatchNotificationsAs(ctx, nil, ipn.NotifyInProcessNoDisconnect, func() { close(watchAdded) }, func(n *ipn.Notify) bool { + if n.ErrMessage != nil { + terminalMessage <- *n.ErrMessage + return true + } + select { + case firstNotify <- struct{}{}: + <-releaseFirstNotify + default: + } + return true + }) + }() + <-watchAdded + + state := ipn.Running + b.send(ipn.Notify{State: &state}) + select { + case <-firstNotify: + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for first notification") + } + + // Fill the 128-slot queue. The next send should block until the + // subscriber catches up, not disconnect the subscriber. + for range 128 { + b.send(ipn.Notify{State: &state}) + } + + sendDone := make(chan struct{}) + go func() { + b.send(ipn.Notify{State: &state}) + close(sendDone) + }() + select { + case <-sendDone: + t.Fatal("send returned before the subscriber caught up") + case <-time.After(100 * time.Millisecond): + } + + select { + case got := <-terminalMessage: + close(releaseFirstNotify) + t.Fatalf("got terminal message %q; want none", got) + default: + } + + close(releaseFirstNotify) + select { + case <-sendDone: + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for blocked send to complete") + } + + cancel() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for watcher to close") + } +} + // TestNotifyForSessionPeerVisibility verifies the per-session masking // logic in [LocalBackend.notifyForSessionLocked] for the // NotifyPeerChanges / NotifyPeerPatches flag pair: diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 8fb83a594..7b4f5aa52 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -902,6 +902,10 @@ func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) { } mask = ipn.NotifyWatchOpt(v) } + if mask&ipn.NotifyInProcessNoDisconnect != 0 { + http.Error(w, "NotifyInProcessNoDisconnect is only valid for in-process IPN bus subscribers", http.StatusBadRequest) + return + } if err := ipn.ValidateNotifyWatchOpt(mask); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/ipn/localapi/localapi_test.go b/ipn/localapi/localapi_test.go index cd51ab620..cc5a6b6a7 100644 --- a/ipn/localapi/localapi_test.go +++ b/ipn/localapi/localapi_test.go @@ -661,6 +661,12 @@ func TestServeWatchIPNBus(t *testing.T) { mask: ipn.NotifyRateLimit | ipn.NotifyPeerChanges, wantStatus: http.StatusBadRequest, }, + { + desc: "in-process-no-disconnect-forbidden", + permitRead: true, + mask: ipn.NotifyInProcessNoDisconnect, + wantStatus: http.StatusBadRequest, + }, } for _, tt := range tests {