diff --git a/src/Inbox.zig b/src/Inbox.zig index ce9c7d78..1828f0f6 100644 --- a/src/Inbox.zig +++ b/src/Inbox.zig @@ -39,6 +39,13 @@ const Inbox = @This(); mutex: std.Thread.Mutex = .{}, queue: DoublyLinkedList = .{}, +// One-way latch, set by the worker's drainInbox the first time it +// observes a .disconnect (or .close) and never cleared. Ensures that, on +// multiple drains, the terminated state is preserved / communicated. This is +// specifically meant to handle the case where a disconnect is captured during +// a syncRequest and we want the following non-nested tick to pick it up again. +terminated: bool = false, + pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void { self.mutex.lock(); defer self.mutex.unlock(); diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 21714f3c..fee0817e 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -398,6 +398,10 @@ pub fn abortRequests(_: *Client, owner: *Owner) void { const DrainMode = enum { all, sync_wait }; pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void { + if (self.inbox.terminated) { + return error.ClientDisconnected; + } + try self.drainQueue(); try self.perform(@intCast(timeout_ms)); // perform/processMessages just released a batch of connections back to @@ -544,6 +548,13 @@ const SyncContext = struct { }; pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncResponse { + if (self.inbox.terminated) { + // request() takes ownership of req.headers on every path; we return + // before calling it, so free the curl_slist here to avoid leaking it. + req.headers.deinit(); + return error.ClientDisconnected; + } + var sync_ctx = SyncContext{ .allocator = allocator, .body = .empty }; errdefer sync_ctx.body.deinit(allocator); @@ -728,10 +739,12 @@ fn drainInbox(self: *Client, mode: DrainMode) !void { .close => { cdp.onClose(); cdp.onDisconnect(null); + self.inbox.terminated = true; return error.ClientDisconnected; }, .disconnect => |err| { cdp.onDisconnect(err); + self.inbox.terminated = true; return error.ClientDisconnected; }, } diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index d1a0fb91..e41e0a27 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -1321,3 +1321,60 @@ test "cdp: STARTUP sessionId" { try ctx.expectSentResult(null, .{ .id = 4, .index = 2, .session_id = "STARTUP" }); } } + +test "cdp: disconnect latches so the worker keeps exiting" { + var ctx = try testing.context(); + defer ctx.deinit(); + + const client = &ctx.cdp().browser.http_client; + + // Simulate the Network thread delivering a peer disconnect into the + // worker's inbox — the dropCdp(notify=true) path used on peer EOF and, + // since #2510, on shutdown via shutdownCdpLinks. + { + const arena = try client.arena_pool.acquire(.tiny, "test disconnect"); + client.inbox.push(arena, .{ .disconnect = null }); + } + + // First tick drains the .disconnect and tears the link down. + try testing.expectError(error.ClientDisconnected, client.tick(0, .all)); + + // The inbox is now empty. Without the latch this second tick would fall + // through to perform/poll with no producer left to wake it, so the worker + // would never exit and Server.deinit() would spin on active_threads + // (#2510). The latch keeps the terminal state sticky so the worker exits. + try testing.expectError(error.ClientDisconnected, client.tick(0, .all)); +} + +test "cdp: syncRequest short-circuits after disconnect" { + var ctx = try testing.context(); + defer ctx.deinit(); + + const client = &ctx.cdp().browser.http_client; + + // Latch terminated via a drained disconnect (as above). + { + const arena = try client.arena_pool.acquire(.tiny, "test disconnect"); + client.inbox.push(arena, .{ .disconnect = null }); + } + try testing.expectError(error.ClientDisconnected, client.tick(0, .all)); + + // A synchronous fetch attempted after the latch returns ClientDisconnected + // without starting the request. syncRequest also frees req.headers on this + // early-return path (it returns before request() takes ownership); that + // free isn't asserted here because curl_slist is C-allocated and escapes the + // per-test leak check, so it's verified by review. The latch check returns + // before any other req field is read, so the rest are placeholders. + const headers = try client.newHeaders(); + try testing.expectError(error.ClientDisconnected, client.syncRequest(testing.allocator, .{ + .frame_id = 0, + .loader_id = 0, + .method = .GET, + .url = "http://127.0.0.1:9582/", + .headers = headers, + .cookie_jar = null, + .cookie_origin = "", + .resource_type = .fetch, + .notification = undefined, + })); +} diff --git a/src/network/Network.zig b/src/network/Network.zig index 1e51f042..dc798506 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -683,6 +683,30 @@ fn processCdpEvents(self: *Network) void { } } +// On shutdown, force-disconnect every still-live CDP link. Each link's +// worker thread blocks in curl_multi_poll and is woken ONLY by this +// (Network) thread via dropCdp -> handles.wakeup(). If the run loop +// exits with links still live, those workers never wake and +// Server.deinit() spins on active_threads forever (issue #2510). +// Mirrors the peer-EOF path in processCdpEvents: dropCdp(notify=true) +// pushes a .disconnect into the worker's inbox and wakes it, so +// cdp.tick() returns false and the worker exits. +fn shutdownCdpLinks(self: *Network) void { + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + + var it = self.cdp_links.first; + while (it) |node| { + it = node.next; + const link: *CdpLink = @fieldParentPtr("node", node); + if (link.state == .live) { + self.dropCdp(link, null, true); + } + } + + self.cdp_unregister.broadcast(); +} + pub fn run(self: *Network) void { var drain_buf: [64]u8 = undefined; var running_handles: c_int = 0; @@ -767,13 +791,23 @@ pub fn run(self: *Network) void { self.fireTicks(); - if (self.shutdown.load(.acquire) and running_handles == 0) { - // Check if fireTicks submitted new requests (e.g. telemetry flush). - // If so, continue the loop to drain and send them before exiting. - self.submission_mutex.lock(); - const has_pending = self.submission_queue.first != null; - self.submission_mutex.unlock(); - if (!has_pending) break; + if (self.shutdown.load(.acquire)) { + // Drain any live CDP links so their workers can exit (issue #2510). + // Idempotent — no-op once drained, safe to call every iteration + self.shutdownCdpLinks(); + + if (running_handles == 0) { + // Check if fireTicks submitted new requests (e.g. telemetry + // flush). If so, continue the loop to drain and send them + // before exiting. + self.submission_mutex.lock(); + const has_pending = self.submission_queue.first != null; + self.submission_mutex.unlock(); + + if (!has_pending) { + break; + } + } } }