Files
browser/src/Inbox.zig
Karl Seguin 88b98e705f Capture disconnect/close in Worker
Currently, if a disconnect/close is captured in a worker during a syncRequest,
that specific request is terminated, but the error doesn't bubble up. The worker
remains alive and will subsequently block in a perform, with no connection alive
to wake it up.

In this commit, when disconnect/close is received, inbox.terminate is set to
true. This flag is checked (in syncRequest and http_client.tick) and
error.ClientDisconnected is returned.

(Also, on network shutdown, always broadcast the cdp_unregister since there's
no harm in sending an extra signal even if nothing was removed).
2026-05-21 10:38:52 +08:00

330 lines
10 KiB
Zig

// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
// Thread-safe FIFO of Messages. Producer pushes from one thread,
// consumer pops from another. No wake mechanism is bundled — callers
// arrange that themselves (e.g. curl_multi_wakeup on the consumer's
// curl multi handle).
//
// Backed by a DoublyLinkedList so that pop is O(1) and the
// allowlist-during-sync-wait drain can cherry-pick messages out of
// the middle in O(1) given a node pointer.
const std = @import("std");
const CDP = @import("cdp/CDP.zig");
const ArenaPool = @import("ArenaPool.zig");
const Allocator = std.mem.Allocator;
const DoublyLinkedList = std.DoublyLinkedList;
const Inbox = @This();
mutex: std.Thread.Mutex = .{},
queue: DoublyLinkedList = .{},
// One-way latch, set by the worker's drainInbox the first time it
// observes a .disconnect (or .close) and never cleared. Ensures that, on
// multiple drains, the terminated state is preserved / communicated. This is
// specifically meant to handle the case where a disconnect is captured during
// a syncRequest and we want the following non-nested tick to pick it up again.
terminated: bool = false,
pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.queue.popFirst()) |node| {
const msg: *Message = @fieldParentPtr("node", node);
msg.deinit(arena_pool);
}
}
pub fn push(self: *Inbox, arena: Allocator, payload: Message.Payload) void {
const msg = arena.create(Message) catch |err| switch (err) {
error.OutOfMemory => @panic("OOM"),
};
msg.* = .{ .payload = payload, .arena = arena };
self.mutex.lock();
defer self.mutex.unlock();
self.queue.append(&msg.node);
}
pub fn pop(self: *Inbox) ?*Message {
self.mutex.lock();
defer self.mutex.unlock();
const node = self.queue.popFirst() orelse return null;
return @fieldParentPtr("node", node);
}
// Cherry-pick the first message for which `predicate(msg)` returns
// true, removing it from the queue. Walks the queue in FIFO order;
// non-matching messages stay in place. Used to dispatch only the
// safe subset of messages during sync-wait paths (the allowlist),
// while leaving unsafe ones to be drained at the next safe point.
pub fn popIf(self: *Inbox, predicate: *const fn (*Message) bool) ?*Message {
self.mutex.lock();
defer self.mutex.unlock();
var it = self.queue.first;
while (it) |node| : (it = node.next) {
const msg: *Message = @fieldParentPtr("node", node);
if (predicate(msg)) {
self.queue.remove(node);
return msg;
}
}
return null;
}
pub const Message = struct {
arena: Allocator,
payload: Payload,
node: DoublyLinkedList.Node = .{},
pub const Payload = union(enum) {
// A CDP text/binary frame, parsed on the Network thread. `raw`
// is the original JSON bytes (owned). `arena` holds any
// auxiliary allocations from parseFromSliceLeaky (typically
// empty for unescaped messages, but slices in `input` may
// reference it). `input` is the parsed view; its string
// slices reference `raw` or `arena`. Both must outlive the
// consumer's use of `input`.
cdp: Cdp,
// WS ping frame body (≤125 bytes per spec). Consumer is
// expected to echo via pong on its thread.
ping: []u8,
// Peer-initiated close frame. Consumer is expected to send a
// close reply and tear the connection down. The peer's close
// body is dropped — historically we always reply CLOSE_NORMAL
// (status 1000) regardless of what the peer sent.
close: void,
// No allocation; conveys "no more messages will arrive on
// this inbox" plus an optional reason. The Network thread
// pushes this on peer EOF, fatal WS framing error, or
// (now) JSON parse failure.
disconnect: ?anyerror,
};
pub const Cdp = struct {
raw: []u8,
input: CDP.InputMessage,
};
pub fn deinit(self: *const Message, pool: *ArenaPool) void {
pool.release(self.arena);
}
};
const testing = @import("testing.zig");
test "Inbox: push then pop returns FIFO order" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(&testing.test_app.arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
}
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
}
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .disconnect = null });
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("first", m.payload.ping);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("second", m.payload.ping);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual(@as(?anyerror, null), m.payload.disconnect);
}
try testing.expect(inbox.pop() == null);
}
test "Inbox: deinit frees remaining items" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "leftover") });
}
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .disconnect = error.PeerClosed });
}
inbox.deinit(&testing.test_app.arena_pool);
// Memory leaks would be caught by the test runner.
}
fn testAlwaysTrue(_: *Message) bool {
return true;
}
fn testAlwaysFalse(_: *Message) bool {
return false;
}
fn testIsPing(msg: *Message) bool {
return msg.payload == .ping;
}
test "Inbox: popIf on empty queue returns null" {
var inbox = Inbox{};
defer inbox.deinit(&testing.test_app.arena_pool);
try testing.expect(inbox.popIf(testAlwaysTrue) == null);
}
test "Inbox: popIf with no match leaves queue intact" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
}
try testing.expect(inbox.popIf(testAlwaysFalse) == null);
// Original FIFO order preserved.
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("first", m.payload.ping);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("second", m.payload.ping);
}
try testing.expect(inbox.pop() == null);
}
test "Inbox: popIf with always-true predicate behaves like pop" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "a") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "b") });
}
{
const m = inbox.popIf(testAlwaysTrue).?;
defer m.deinit(arena_pool);
try testing.expectEqual("a", m.payload.ping);
}
{
const m = inbox.popIf(testAlwaysTrue).?;
defer m.deinit(arena_pool);
try testing.expectEqual("b", m.payload.ping);
}
try testing.expect(inbox.popIf(testAlwaysTrue) == null);
}
test "Inbox: popIf cherry-picks middle, preserves order of remainder" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .disconnect = null });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "middle") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .disconnect = error.PeerClosed });
}
// testIsPing skips the disconnect at the head and picks the middle.
{
const m = inbox.popIf(testIsPing).?;
defer m.deinit(arena_pool);
try testing.expectEqual("middle", m.payload.ping);
}
// Remaining two disconnects pop in original order.
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expect(m.payload.disconnect == null);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expect(m.payload.disconnect.? == error.PeerClosed);
}
try testing.expect(inbox.pop() == null);
}
test "Inbox: popIf picks first match in FIFO order" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .disconnect = null });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
}
const m = inbox.popIf(testIsPing).?;
defer m.deinit(arena_pool);
try testing.expectEqual("first", m.payload.ping);
}