From 8da7657d4c5fcb23be63d56545096ea064f88116 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 21 May 2026 19:22:12 +0800 Subject: [PATCH] Add watchdog to Network thread for abandoned & stuck workers If a worker is in some heavy JS e.g. `for(;;)` it will be stuck forever, even if the peer closes the CDP connection. With CDP reads now owned by the network thread, we now correctly detect the disconnect and simply need to force the worker to shutdown. To achieve this, on socket close, the CdpLink held by the network is given a terminate_ms (five seconds from now) and added to a linked list. On every wakeup, the network thread can check the list + timestamp and, if necessary, call Isolate::Terminate (which is safe to call on a different thread). --- src/cdp/CDP.zig | 7 +++++ src/network/Network.zig | 64 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index d1a0fb91..6675e004 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -168,6 +168,13 @@ pub fn onLinkDisconnect(self: *CDP, err: ?anyerror) void { self.browser.http_client.inbox.push(arena, .{ .disconnect = err }); } +// Called by Network to try to force the Worker to shutdown. This is only called +// after a close/disconnect message was sent and the worker lingered. Presumably +// it's stuck in JS code. +pub fn terminateFromNetwork(self: *CDP) void { + self.browser.env.terminate(); +} + // Called in the Worker to dispatch a single CDP message bubbled up by // HttpClient.drainInbox. The Network thread already parsed the JSON // when it pushed the message to the inbox, so we skip straight to diff --git a/src/network/Network.zig b/src/network/Network.zig index 1e51f042..f04b64c0 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -22,6 +22,7 @@ const builtin = @import("builtin"); const App = @import("../App.zig"); const Config = @import("../Config.zig"); +const datetime = @import("../datetime.zig"); const CDP = @import("../cdp/CDP.zig"); const libcurl = @import("../sys/libcurl.zig"); @@ -65,11 +66,23 @@ pub const CdpLink = struct { handles: http.Handles, node: DoublyLinkedList.Node = .{}, + // Network thread will call cdp.terminateFromNetwork() when this is reached. + // The timing guarantee is poor, but it will be at some point AFTER this + // time and before an accept (ensure abandoned workers can't block new + // connections) + terminate_ms: u64 = std.math.maxInt(u64), + pub const State = enum { live, // Worker called unregisterCdp; Network will drop the link on // its next loop iteration and signal cdp_unregister. unregistering, + // Network dropped the socket and parked the link in + // closed_cdp_linkss. Worker's unregisterCdp OR the watchdog + // deadline transitions it to .removed. Must NOT be .removed + // while still in closed_cdp_linkss — worker would free out + // from under the next tick. + terminating, // Network has dropped the link from its poll set. The worker // can safely free anything the link's callbacks closed over. removed, @@ -126,6 +139,13 @@ callbacks_mutex: std.Thread.Mutex = .{}, cdp_links: DoublyLinkedList = .{}, cdp_mutex: std.Thread.Mutex = .{}, cdp_unregister: std.Thread.Condition = .{}, +// After the CDP socket is closed, the CdpLink is moved from cdp_links to +// closed_cdp_links. At that point we act as a watchdog. If the worker doesn't +// unregister the CDP link within 5 seconds (e.g. stuck in a endless JS loop?) +// we can force a termination from the worker. (This is a bit like FetchTerminator +// in main) +closed_cdp_links: DoublyLinkedList = .{}, + // Per-iteration snapshot of CdpLinks whose sockets are in pollfds. // Sized at maxConnections at init time so we never allocate inside // run(). Parallel to pollfds[cdp_start..cdp_start + cdp_poll_count]. @@ -519,6 +539,14 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void { self.wakeupPoll(); } + if (link.state == .terminating) { + // Network parked us in closed_cdp_links. Unpark ourselves and + // skip the watchdog path. + self.closed_cdp_links.remove(&link.node); + link.state = .removed; + return; + } + while (link.state != .removed) { // condition variable, waiting for a signal self.cdp_unregister.wait(&self.cdp_mutex); @@ -533,9 +561,14 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void { // - The worker is woken via curl_multi_wakeup either way. fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void { self.cdp_links.remove(&link.node); - link.state = .removed; self.cdp_dirty = true; if (notify) { + // Worker hasn't been told yet — park the link with a watchdog + // deadline in case it's stuck in JS and never observes the inbox. + link.state = .terminating; + self.closed_cdp_links.append(&link.node); + link.terminate_ms = datetime.milliTimestamp(.monotonic) + 5000; + // notify=true means the worker hasn't been told yet — push the // disconnect into the inbox and break it out of curl_multi_poll. // notify=false paths have already woken the worker (close frame @@ -545,6 +578,8 @@ fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void { link.handles.wakeup() catch |e| { lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e }); }; + } else { + link.state = .removed; } } @@ -745,10 +780,13 @@ pub fn run(self: *Network) void { // check wakeup pipe if (poll_fd.revents != 0) { poll_fd.revents = 0; - while (true) + while (true) { _ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break; + } } + self.terminateAbandonedWorkers(); + // accept new connections if (listen_fd.revents != 0) { listen_fd.revents = 0; @@ -831,6 +869,28 @@ fn drainQueue(self: *Network) void { } } +fn terminateAbandonedWorkers(self: *Network) void { + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + + const now = datetime.milliTimestamp(.monotonic); + var node = self.closed_cdp_links.first; + while (node) |n| { + const link: *CdpLink = @fieldParentPtr("node", n); + if (link.terminate_ms > now) { + // added in order, if this is in the future, than all the ones + // after must be in the future too + break; + } + + node = n.next; + log.warn(.cdp, "watchdog terminate", .{}); + link.cdp.terminateFromNetwork(); + self.closed_cdp_links.remove(n); + link.state = .removed; + } +} + pub fn stop(self: *Network) void { self.shutdown.store(true, .release); self.wakeupPoll();