Revert "Add watchdog to Network thread for abandoned & stuck workers"

This reverts commit 8da7657d4c.
This commit is contained in:
Karl Seguin
2026-05-22 07:09:57 +08:00
parent 8da7657d4c
commit 3fc75dfe85
2 changed files with 2 additions and 69 deletions

View File

@@ -168,13 +168,6 @@ pub fn onLinkDisconnect(self: *CDP, err: ?anyerror) void {
self.browser.http_client.inbox.push(arena, .{ .disconnect = err });
}
// Called by Network to try to force the Worker to shutdown. This is only called
// after a close/disconnect message was sent and the worker lingered. Presumably
// it's stuck in JS code.
pub fn terminateFromNetwork(self: *CDP) void {
self.browser.env.terminate();
}
// Called in the Worker to dispatch a single CDP message bubbled up by
// HttpClient.drainInbox. The Network thread already parsed the JSON
// when it pushed the message to the inbox, so we skip straight to

View File

@@ -22,7 +22,6 @@ const builtin = @import("builtin");
const App = @import("../App.zig");
const Config = @import("../Config.zig");
const datetime = @import("../datetime.zig");
const CDP = @import("../cdp/CDP.zig");
const libcurl = @import("../sys/libcurl.zig");
@@ -66,23 +65,11 @@ pub const CdpLink = struct {
handles: http.Handles,
node: DoublyLinkedList.Node = .{},
// Network thread will call cdp.terminateFromNetwork() when this is reached.
// The timing guarantee is poor, but it will be at some point AFTER this
// time and before an accept (ensure abandoned workers can't block new
// connections)
terminate_ms: u64 = std.math.maxInt(u64),
pub const State = enum {
live,
// Worker called unregisterCdp; Network will drop the link on
// its next loop iteration and signal cdp_unregister.
unregistering,
// Network dropped the socket and parked the link in
// closed_cdp_linkss. Worker's unregisterCdp OR the watchdog
// deadline transitions it to .removed. Must NOT be .removed
// while still in closed_cdp_linkss — worker would free out
// from under the next tick.
terminating,
// Network has dropped the link from its poll set. The worker
// can safely free anything the link's callbacks closed over.
removed,
@@ -139,13 +126,6 @@ callbacks_mutex: std.Thread.Mutex = .{},
cdp_links: DoublyLinkedList = .{},
cdp_mutex: std.Thread.Mutex = .{},
cdp_unregister: std.Thread.Condition = .{},
// After the CDP socket is closed, the CdpLink is moved from cdp_links to
// closed_cdp_links. At that point we act as a watchdog. If the worker doesn't
// unregister the CDP link within 5 seconds (e.g. stuck in a endless JS loop?)
// we can force a termination from the worker. (This is a bit like FetchTerminator
// in main)
closed_cdp_links: DoublyLinkedList = .{},
// Per-iteration snapshot of CdpLinks whose sockets are in pollfds.
// Sized at maxConnections at init time so we never allocate inside
// run(). Parallel to pollfds[cdp_start..cdp_start + cdp_poll_count].
@@ -539,14 +519,6 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void {
self.wakeupPoll();
}
if (link.state == .terminating) {
// Network parked us in closed_cdp_links. Unpark ourselves and
// skip the watchdog path.
self.closed_cdp_links.remove(&link.node);
link.state = .removed;
return;
}
while (link.state != .removed) {
// condition variable, waiting for a signal
self.cdp_unregister.wait(&self.cdp_mutex);
@@ -561,14 +533,9 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void {
// - The worker is woken via curl_multi_wakeup either way.
fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void {
self.cdp_links.remove(&link.node);
link.state = .removed;
self.cdp_dirty = true;
if (notify) {
// Worker hasn't been told yet — park the link with a watchdog
// deadline in case it's stuck in JS and never observes the inbox.
link.state = .terminating;
self.closed_cdp_links.append(&link.node);
link.terminate_ms = datetime.milliTimestamp(.monotonic) + 5000;
// notify=true means the worker hasn't been told yet — push the
// disconnect into the inbox and break it out of curl_multi_poll.
// notify=false paths have already woken the worker (close frame
@@ -578,8 +545,6 @@ fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void {
link.handles.wakeup() catch |e| {
lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e });
};
} else {
link.state = .removed;
}
}
@@ -780,13 +745,10 @@ pub fn run(self: *Network) void {
// check wakeup pipe
if (poll_fd.revents != 0) {
poll_fd.revents = 0;
while (true) {
while (true)
_ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break;
}
}
self.terminateAbandonedWorkers();
// accept new connections
if (listen_fd.revents != 0) {
listen_fd.revents = 0;
@@ -869,28 +831,6 @@ fn drainQueue(self: *Network) void {
}
}
fn terminateAbandonedWorkers(self: *Network) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
const now = datetime.milliTimestamp(.monotonic);
var node = self.closed_cdp_links.first;
while (node) |n| {
const link: *CdpLink = @fieldParentPtr("node", n);
if (link.terminate_ms > now) {
// added in order, if this is in the future, than all the ones
// after must be in the future too
break;
}
node = n.next;
log.warn(.cdp, "watchdog terminate", .{});
link.cdp.terminateFromNetwork();
self.closed_cdp_links.remove(n);
link.state = .removed;
}
}
pub fn stop(self: *Network) void {
self.shutdown.store(true, .release);
self.wakeupPoll();