Capture disconnect/close in Worker

Currently, if a disconnect/close is captured in a worker during a syncRequest,
that specific request is terminated, but the error doesn't bubble up. The worker
remains alive and will subsequently block in a perform, with no connection alive
to wake it up.

In this commit, when disconnect/close is received, inbox.terminate is set to
true. This flag is checked (in syncRequest and http_client.tick) and
error.ClientDisconnected is returned.

(Also, on network shutdown, always broadcast the cdp_unregister since there's
no harm in sending an extra signal even if nothing was removed).
This commit is contained in:
Karl Seguin
2026-05-21 10:38:52 +08:00
parent bdf28c51cd
commit 88b98e705f
3 changed files with 24 additions and 12 deletions

View File

@@ -39,6 +39,13 @@ const Inbox = @This();
mutex: std.Thread.Mutex = .{},
queue: DoublyLinkedList = .{},
// One-way latch, set by the worker's drainInbox the first time it
// observes a .disconnect (or .close) and never cleared. Ensures that, on
// multiple drains, the terminated state is preserved / communicated. This is
// specifically meant to handle the case where a disconnect is captured during
// a syncRequest and we want the following non-nested tick to pick it up again.
terminated: bool = false,
pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void {
self.mutex.lock();
defer self.mutex.unlock();

View File

@@ -398,6 +398,10 @@ pub fn abortRequests(_: *Client, owner: *Owner) void {
const DrainMode = enum { all, sync_wait };
pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void {
if (self.inbox.terminated) {
return error.ClientDisconnected;
}
try self.drainQueue();
try self.perform(@intCast(timeout_ms));
// perform/processMessages just released a batch of connections back to
@@ -536,6 +540,10 @@ const SyncContext = struct {
};
pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncResponse {
if (self.inbox.terminated) {
return error.ClientDisconnected;
}
var sync_ctx = SyncContext{ .allocator = allocator, .body = .empty };
errdefer sync_ctx.body.deinit(allocator);
@@ -719,10 +727,12 @@ fn drainInbox(self: *Client, mode: DrainMode) !void {
.close => {
cdp.onClose();
cdp.onDisconnect(null);
self.inbox.terminated = true;
return error.ClientDisconnected;
},
.disconnect => |err| {
cdp.onDisconnect(err);
self.inbox.terminated = true;
return error.ClientDisconnected;
},
}

View File

@@ -695,21 +695,17 @@ fn shutdownCdpLinks(self: *Network) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
var any_removed = false;
var it = self.cdp_links.first;
while (it) |node| {
const next = node.next;
const link: *CdpLink = @fieldParentPtr("node", node);
if (link.state == .live) {
self.dropCdp(link, null, true);
any_removed = true;
}
it = next;
}
if (any_removed) {
self.cdp_unregister.broadcast();
}
self.cdp_unregister.broadcast();
}
pub fn run(self: *Network) void {
@@ -790,12 +786,8 @@ pub fn run(self: *Network) void {
self.fireTicks();
if (self.shutdown.load(.acquire)) {
// Force-disconnect any still-live CDP clients so their worker
// threads wake from curl_multi_poll and exit. A worker is woken
// only by this (Network) thread; if the loop exits with links
// still live, those workers block forever and Server.deinit()
// spins on active_threads (issue #2510). Idempotent — a no-op
// once the links are drained.
// Drain any live CDP links so their workers can exit (issue #2510).
// Idempo tent — no-op once drained, safe to call every iteration
self.shutdownCdpLinks();
if (running_handles == 0) {
@@ -805,7 +797,10 @@ pub fn run(self: *Network) void {
self.submission_mutex.lock();
const has_pending = self.submission_queue.first != null;
self.submission_mutex.unlock();
if (!has_pending) break;
if (!has_pending) {
break;
}
}
}
}