Add watchdog to Network thread for abandoned & stuck workers

If a worker is in some heavy JS e.g. `for(;;)` it will be stuck forever, even
if the peer closes the CDP connection.

With CDP reads now owned by the network thread, we now correctly detect the
disconnect and simply need to force the worker to shutdown. To achieve this,
on socket close, the CdpLink held by the network is given a terminate_ms (five
seconds from now) and added to a linked list. On every wakeup, the network
thread can check the list + timestamp and, if necessary, call Isolate::Terminate
(which is safe to call on a different thread).
This commit is contained in:
Karl Seguin
2026-05-21 19:22:12 +08:00
parent 972be65db7
commit 8da7657d4c
2 changed files with 69 additions and 2 deletions

View File

@@ -168,6 +168,13 @@ pub fn onLinkDisconnect(self: *CDP, err: ?anyerror) void {
self.browser.http_client.inbox.push(arena, .{ .disconnect = err });
}
// Called by Network to try to force the Worker to shutdown. This is only called
// after a close/disconnect message was sent and the worker lingered. Presumably
// it's stuck in JS code.
pub fn terminateFromNetwork(self: *CDP) void {
self.browser.env.terminate();
}
// Called in the Worker to dispatch a single CDP message bubbled up by
// HttpClient.drainInbox. The Network thread already parsed the JSON
// when it pushed the message to the inbox, so we skip straight to

View File

@@ -22,6 +22,7 @@ const builtin = @import("builtin");
const App = @import("../App.zig");
const Config = @import("../Config.zig");
const datetime = @import("../datetime.zig");
const CDP = @import("../cdp/CDP.zig");
const libcurl = @import("../sys/libcurl.zig");
@@ -65,11 +66,23 @@ pub const CdpLink = struct {
handles: http.Handles,
node: DoublyLinkedList.Node = .{},
// Network thread will call cdp.terminateFromNetwork() when this is reached.
// The timing guarantee is poor, but it will be at some point AFTER this
// time and before an accept (ensure abandoned workers can't block new
// connections)
terminate_ms: u64 = std.math.maxInt(u64),
pub const State = enum {
live,
// Worker called unregisterCdp; Network will drop the link on
// its next loop iteration and signal cdp_unregister.
unregistering,
// Network dropped the socket and parked the link in
// closed_cdp_linkss. Worker's unregisterCdp OR the watchdog
// deadline transitions it to .removed. Must NOT be .removed
// while still in closed_cdp_linkss — worker would free out
// from under the next tick.
terminating,
// Network has dropped the link from its poll set. The worker
// can safely free anything the link's callbacks closed over.
removed,
@@ -126,6 +139,13 @@ callbacks_mutex: std.Thread.Mutex = .{},
cdp_links: DoublyLinkedList = .{},
cdp_mutex: std.Thread.Mutex = .{},
cdp_unregister: std.Thread.Condition = .{},
// After the CDP socket is closed, the CdpLink is moved from cdp_links to
// closed_cdp_links. At that point we act as a watchdog. If the worker doesn't
// unregister the CDP link within 5 seconds (e.g. stuck in a endless JS loop?)
// we can force a termination from the worker. (This is a bit like FetchTerminator
// in main)
closed_cdp_links: DoublyLinkedList = .{},
// Per-iteration snapshot of CdpLinks whose sockets are in pollfds.
// Sized at maxConnections at init time so we never allocate inside
// run(). Parallel to pollfds[cdp_start..cdp_start + cdp_poll_count].
@@ -519,6 +539,14 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void {
self.wakeupPoll();
}
if (link.state == .terminating) {
// Network parked us in closed_cdp_links. Unpark ourselves and
// skip the watchdog path.
self.closed_cdp_links.remove(&link.node);
link.state = .removed;
return;
}
while (link.state != .removed) {
// condition variable, waiting for a signal
self.cdp_unregister.wait(&self.cdp_mutex);
@@ -533,9 +561,14 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void {
// - The worker is woken via curl_multi_wakeup either way.
fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void {
self.cdp_links.remove(&link.node);
link.state = .removed;
self.cdp_dirty = true;
if (notify) {
// Worker hasn't been told yet — park the link with a watchdog
// deadline in case it's stuck in JS and never observes the inbox.
link.state = .terminating;
self.closed_cdp_links.append(&link.node);
link.terminate_ms = datetime.milliTimestamp(.monotonic) + 5000;
// notify=true means the worker hasn't been told yet — push the
// disconnect into the inbox and break it out of curl_multi_poll.
// notify=false paths have already woken the worker (close frame
@@ -545,6 +578,8 @@ fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void {
link.handles.wakeup() catch |e| {
lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e });
};
} else {
link.state = .removed;
}
}
@@ -745,10 +780,13 @@ pub fn run(self: *Network) void {
// check wakeup pipe
if (poll_fd.revents != 0) {
poll_fd.revents = 0;
while (true)
while (true) {
_ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break;
}
}
self.terminateAbandonedWorkers();
// accept new connections
if (listen_fd.revents != 0) {
listen_fd.revents = 0;
@@ -831,6 +869,28 @@ fn drainQueue(self: *Network) void {
}
}
fn terminateAbandonedWorkers(self: *Network) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
const now = datetime.milliTimestamp(.monotonic);
var node = self.closed_cdp_links.first;
while (node) |n| {
const link: *CdpLink = @fieldParentPtr("node", n);
if (link.terminate_ms > now) {
// added in order, if this is in the future, than all the ones
// after must be in the future too
break;
}
node = n.next;
log.warn(.cdp, "watchdog terminate", .{});
link.cdp.terminateFromNetwork();
self.closed_cdp_links.remove(n);
link.state = .removed;
}
}
pub fn stop(self: *Network) void {
self.shutdown.store(true, .release);
self.wakeupPoll();