From 88b98e705ff3e7e1d1d0e38aca1442615825377b Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 21 May 2026 10:38:52 +0800 Subject: [PATCH] Capture disconnect/close in Worker Currently, if a disconnect/close is captured in a worker during a syncRequest, that specific request is terminated, but the error doesn't bubble up. The worker remains alive and will subsequently block in a perform, with no connection alive to wake it up. In this commit, when disconnect/close is received, inbox.terminate is set to true. This flag is checked (in syncRequest and http_client.tick) and error.ClientDisconnected is returned. (Also, on network shutdown, always broadcast the cdp_unregister since there's no harm in sending an extra signal even if nothing was removed). --- src/Inbox.zig | 7 +++++++ src/browser/HttpClient.zig | 10 ++++++++++ src/network/Network.zig | 19 +++++++------------ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/Inbox.zig b/src/Inbox.zig index ce9c7d78..1828f0f6 100644 --- a/src/Inbox.zig +++ b/src/Inbox.zig @@ -39,6 +39,13 @@ const Inbox = @This(); mutex: std.Thread.Mutex = .{}, queue: DoublyLinkedList = .{}, +// One-way latch, set by the worker's drainInbox the first time it +// observes a .disconnect (or .close) and never cleared. Ensures that, on +// multiple drains, the terminated state is preserved / communicated. This is +// specifically meant to handle the case where a disconnect is captured during +// a syncRequest and we want the following non-nested tick to pick it up again. +terminated: bool = false, + pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void { self.mutex.lock(); defer self.mutex.unlock(); diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index df91ac42..d49fa484 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -398,6 +398,10 @@ pub fn abortRequests(_: *Client, owner: *Owner) void { const DrainMode = enum { all, sync_wait }; pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void { + if (self.inbox.terminated) { + return error.ClientDisconnected; + } + try self.drainQueue(); try self.perform(@intCast(timeout_ms)); // perform/processMessages just released a batch of connections back to @@ -536,6 +540,10 @@ const SyncContext = struct { }; pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncResponse { + if (self.inbox.terminated) { + return error.ClientDisconnected; + } + var sync_ctx = SyncContext{ .allocator = allocator, .body = .empty }; errdefer sync_ctx.body.deinit(allocator); @@ -719,10 +727,12 @@ fn drainInbox(self: *Client, mode: DrainMode) !void { .close => { cdp.onClose(); cdp.onDisconnect(null); + self.inbox.terminated = true; return error.ClientDisconnected; }, .disconnect => |err| { cdp.onDisconnect(err); + self.inbox.terminated = true; return error.ClientDisconnected; }, } diff --git a/src/network/Network.zig b/src/network/Network.zig index 54b19d5d..addb1796 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -695,21 +695,17 @@ fn shutdownCdpLinks(self: *Network) void { self.cdp_mutex.lock(); defer self.cdp_mutex.unlock(); - var any_removed = false; var it = self.cdp_links.first; while (it) |node| { const next = node.next; const link: *CdpLink = @fieldParentPtr("node", node); if (link.state == .live) { self.dropCdp(link, null, true); - any_removed = true; } it = next; } - if (any_removed) { - self.cdp_unregister.broadcast(); - } + self.cdp_unregister.broadcast(); } pub fn run(self: *Network) void { @@ -790,12 +786,8 @@ pub fn run(self: *Network) void { self.fireTicks(); if (self.shutdown.load(.acquire)) { - // Force-disconnect any still-live CDP clients so their worker - // threads wake from curl_multi_poll and exit. A worker is woken - // only by this (Network) thread; if the loop exits with links - // still live, those workers block forever and Server.deinit() - // spins on active_threads (issue #2510). Idempotent — a no-op - // once the links are drained. + // Drain any live CDP links so their workers can exit (issue #2510). + // Idempo tent — no-op once drained, safe to call every iteration self.shutdownCdpLinks(); if (running_handles == 0) { @@ -805,7 +797,10 @@ pub fn run(self: *Network) void { self.submission_mutex.lock(); const has_pending = self.submission_queue.first != null; self.submission_mutex.unlock(); - if (!has_pending) break; + + if (!has_pending) { + break; + } } } }