diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index d1a0fb91..6675e004 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -168,6 +168,13 @@ pub fn onLinkDisconnect(self: *CDP, err: ?anyerror) void { self.browser.http_client.inbox.push(arena, .{ .disconnect = err }); } +// Called by Network to try to force the Worker to shutdown. This is only called +// after a close/disconnect message was sent and the worker lingered. Presumably +// it's stuck in JS code. +pub fn terminateFromNetwork(self: *CDP) void { + self.browser.env.terminate(); +} + // Called in the Worker to dispatch a single CDP message bubbled up by // HttpClient.drainInbox. The Network thread already parsed the JSON // when it pushed the message to the inbox, so we skip straight to diff --git a/src/network/Network.zig b/src/network/Network.zig index 1e51f042..f04b64c0 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -22,6 +22,7 @@ const builtin = @import("builtin"); const App = @import("../App.zig"); const Config = @import("../Config.zig"); +const datetime = @import("../datetime.zig"); const CDP = @import("../cdp/CDP.zig"); const libcurl = @import("../sys/libcurl.zig"); @@ -65,11 +66,23 @@ pub const CdpLink = struct { handles: http.Handles, node: DoublyLinkedList.Node = .{}, + // Network thread will call cdp.terminateFromNetwork() when this is reached. + // The timing guarantee is poor, but it will be at some point AFTER this + // time and before an accept (ensure abandoned workers can't block new + // connections) + terminate_ms: u64 = std.math.maxInt(u64), + pub const State = enum { live, // Worker called unregisterCdp; Network will drop the link on // its next loop iteration and signal cdp_unregister. unregistering, + // Network dropped the socket and parked the link in + // closed_cdp_linkss. Worker's unregisterCdp OR the watchdog + // deadline transitions it to .removed. Must NOT be .removed + // while still in closed_cdp_linkss — worker would free out + // from under the next tick. + terminating, // Network has dropped the link from its poll set. The worker // can safely free anything the link's callbacks closed over. removed, @@ -126,6 +139,13 @@ callbacks_mutex: std.Thread.Mutex = .{}, cdp_links: DoublyLinkedList = .{}, cdp_mutex: std.Thread.Mutex = .{}, cdp_unregister: std.Thread.Condition = .{}, +// After the CDP socket is closed, the CdpLink is moved from cdp_links to +// closed_cdp_links. At that point we act as a watchdog. If the worker doesn't +// unregister the CDP link within 5 seconds (e.g. stuck in a endless JS loop?) +// we can force a termination from the worker. (This is a bit like FetchTerminator +// in main) +closed_cdp_links: DoublyLinkedList = .{}, + // Per-iteration snapshot of CdpLinks whose sockets are in pollfds. // Sized at maxConnections at init time so we never allocate inside // run(). Parallel to pollfds[cdp_start..cdp_start + cdp_poll_count]. @@ -519,6 +539,14 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void { self.wakeupPoll(); } + if (link.state == .terminating) { + // Network parked us in closed_cdp_links. Unpark ourselves and + // skip the watchdog path. + self.closed_cdp_links.remove(&link.node); + link.state = .removed; + return; + } + while (link.state != .removed) { // condition variable, waiting for a signal self.cdp_unregister.wait(&self.cdp_mutex); @@ -533,9 +561,14 @@ pub fn unregisterCdp(self: *Network, link: *CdpLink) void { // - The worker is woken via curl_multi_wakeup either way. fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void { self.cdp_links.remove(&link.node); - link.state = .removed; self.cdp_dirty = true; if (notify) { + // Worker hasn't been told yet — park the link with a watchdog + // deadline in case it's stuck in JS and never observes the inbox. + link.state = .terminating; + self.closed_cdp_links.append(&link.node); + link.terminate_ms = datetime.milliTimestamp(.monotonic) + 5000; + // notify=true means the worker hasn't been told yet — push the // disconnect into the inbox and break it out of curl_multi_poll. // notify=false paths have already woken the worker (close frame @@ -545,6 +578,8 @@ fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void { link.handles.wakeup() catch |e| { lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e }); }; + } else { + link.state = .removed; } } @@ -745,10 +780,13 @@ pub fn run(self: *Network) void { // check wakeup pipe if (poll_fd.revents != 0) { poll_fd.revents = 0; - while (true) + while (true) { _ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break; + } } + self.terminateAbandonedWorkers(); + // accept new connections if (listen_fd.revents != 0) { listen_fd.revents = 0; @@ -831,6 +869,28 @@ fn drainQueue(self: *Network) void { } } +fn terminateAbandonedWorkers(self: *Network) void { + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + + const now = datetime.milliTimestamp(.monotonic); + var node = self.closed_cdp_links.first; + while (node) |n| { + const link: *CdpLink = @fieldParentPtr("node", n); + if (link.terminate_ms > now) { + // added in order, if this is in the future, than all the ones + // after must be in the future too + break; + } + + node = n.next; + log.warn(.cdp, "watchdog terminate", .{}); + link.cdp.terminateFromNetwork(); + self.closed_cdp_links.remove(n); + link.state = .removed; + } +} + pub fn stop(self: *Network) void { self.shutdown.store(true, .release); self.wakeupPoll();