mirror of
https://github.com/lightpanda-io/browser.git
synced 2026-06-11 01:25:53 -04:00
Capture disconnect/close in Worker
Currently, if a disconnect/close is captured in a worker during a syncRequest, that specific request is terminated, but the error doesn't bubble up. The worker remains alive and will subsequently block in a perform, with no connection alive to wake it up. In this commit, when disconnect/close is received, inbox.terminate is set to true. This flag is checked (in syncRequest and http_client.tick) and error.ClientDisconnected is returned. (Also, on network shutdown, always broadcast the cdp_unregister since there's no harm in sending an extra signal even if nothing was removed).
This commit is contained in:
@@ -39,6 +39,13 @@ const Inbox = @This();
|
||||
mutex: std.Thread.Mutex = .{},
|
||||
queue: DoublyLinkedList = .{},
|
||||
|
||||
// One-way latch, set by the worker's drainInbox the first time it
|
||||
// observes a .disconnect (or .close) and never cleared. Ensures that, on
|
||||
// multiple drains, the terminated state is preserved / communicated. This is
|
||||
// specifically meant to handle the case where a disconnect is captured during
|
||||
// a syncRequest and we want the following non-nested tick to pick it up again.
|
||||
terminated: bool = false,
|
||||
|
||||
pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void {
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
|
||||
@@ -398,6 +398,10 @@ pub fn abortRequests(_: *Client, owner: *Owner) void {
|
||||
const DrainMode = enum { all, sync_wait };
|
||||
|
||||
pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void {
|
||||
if (self.inbox.terminated) {
|
||||
return error.ClientDisconnected;
|
||||
}
|
||||
|
||||
try self.drainQueue();
|
||||
try self.perform(@intCast(timeout_ms));
|
||||
// perform/processMessages just released a batch of connections back to
|
||||
@@ -536,6 +540,10 @@ const SyncContext = struct {
|
||||
};
|
||||
|
||||
pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncResponse {
|
||||
if (self.inbox.terminated) {
|
||||
return error.ClientDisconnected;
|
||||
}
|
||||
|
||||
var sync_ctx = SyncContext{ .allocator = allocator, .body = .empty };
|
||||
errdefer sync_ctx.body.deinit(allocator);
|
||||
|
||||
@@ -719,10 +727,12 @@ fn drainInbox(self: *Client, mode: DrainMode) !void {
|
||||
.close => {
|
||||
cdp.onClose();
|
||||
cdp.onDisconnect(null);
|
||||
self.inbox.terminated = true;
|
||||
return error.ClientDisconnected;
|
||||
},
|
||||
.disconnect => |err| {
|
||||
cdp.onDisconnect(err);
|
||||
self.inbox.terminated = true;
|
||||
return error.ClientDisconnected;
|
||||
},
|
||||
}
|
||||
|
||||
@@ -695,21 +695,17 @@ fn shutdownCdpLinks(self: *Network) void {
|
||||
self.cdp_mutex.lock();
|
||||
defer self.cdp_mutex.unlock();
|
||||
|
||||
var any_removed = false;
|
||||
var it = self.cdp_links.first;
|
||||
while (it) |node| {
|
||||
const next = node.next;
|
||||
const link: *CdpLink = @fieldParentPtr("node", node);
|
||||
if (link.state == .live) {
|
||||
self.dropCdp(link, null, true);
|
||||
any_removed = true;
|
||||
}
|
||||
it = next;
|
||||
}
|
||||
|
||||
if (any_removed) {
|
||||
self.cdp_unregister.broadcast();
|
||||
}
|
||||
self.cdp_unregister.broadcast();
|
||||
}
|
||||
|
||||
pub fn run(self: *Network) void {
|
||||
@@ -790,12 +786,8 @@ pub fn run(self: *Network) void {
|
||||
self.fireTicks();
|
||||
|
||||
if (self.shutdown.load(.acquire)) {
|
||||
// Force-disconnect any still-live CDP clients so their worker
|
||||
// threads wake from curl_multi_poll and exit. A worker is woken
|
||||
// only by this (Network) thread; if the loop exits with links
|
||||
// still live, those workers block forever and Server.deinit()
|
||||
// spins on active_threads (issue #2510). Idempotent — a no-op
|
||||
// once the links are drained.
|
||||
// Drain any live CDP links so their workers can exit (issue #2510).
|
||||
// Idempo tent — no-op once drained, safe to call every iteration
|
||||
self.shutdownCdpLinks();
|
||||
|
||||
if (running_handles == 0) {
|
||||
@@ -805,7 +797,10 @@ pub fn run(self: *Network) void {
|
||||
self.submission_mutex.lock();
|
||||
const has_pending = self.submission_queue.first != null;
|
||||
self.submission_mutex.unlock();
|
||||
if (!has_pending) break;
|
||||
|
||||
if (!has_pending) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user