mirror of
https://github.com/lightpanda-io/browser.git
synced 2026-06-11 17:46:32 -04:00
Currently, if a disconnect/close is captured in a worker during a syncRequest, that specific request is terminated, but the error doesn't bubble up. The worker remains alive and will subsequently block in a perform, with no connection alive to wake it up. In this commit, when disconnect/close is received, inbox.terminate is set to true. This flag is checked (in syncRequest and http_client.tick) and error.ClientDisconnected is returned. (Also, on network shutdown, always broadcast the cdp_unregister since there's no harm in sending an extra signal even if nothing was removed).
330 lines
10 KiB
Zig
330 lines
10 KiB
Zig
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
|
|
//
|
|
// Francis Bouvier <francis@lightpanda.io>
|
|
// Pierre Tachoire <pierre@lightpanda.io>
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as
|
|
// published by the Free Software Foundation, either version 3 of the
|
|
// License, or (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
// Thread-safe FIFO of Messages. Producer pushes from one thread,
|
|
// consumer pops from another. No wake mechanism is bundled — callers
|
|
// arrange that themselves (e.g. curl_multi_wakeup on the consumer's
|
|
// curl multi handle).
|
|
//
|
|
// Backed by a DoublyLinkedList so that pop is O(1) and the
|
|
// allowlist-during-sync-wait drain can cherry-pick messages out of
|
|
// the middle in O(1) given a node pointer.
|
|
|
|
const std = @import("std");
|
|
|
|
const CDP = @import("cdp/CDP.zig");
|
|
|
|
const ArenaPool = @import("ArenaPool.zig");
|
|
|
|
const Allocator = std.mem.Allocator;
|
|
const DoublyLinkedList = std.DoublyLinkedList;
|
|
|
|
const Inbox = @This();
|
|
|
|
mutex: std.Thread.Mutex = .{},
|
|
queue: DoublyLinkedList = .{},
|
|
|
|
// One-way latch, set by the worker's drainInbox the first time it
|
|
// observes a .disconnect (or .close) and never cleared. Ensures that, on
|
|
// multiple drains, the terminated state is preserved / communicated. This is
|
|
// specifically meant to handle the case where a disconnect is captured during
|
|
// a syncRequest and we want the following non-nested tick to pick it up again.
|
|
terminated: bool = false,
|
|
|
|
pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void {
|
|
self.mutex.lock();
|
|
defer self.mutex.unlock();
|
|
while (self.queue.popFirst()) |node| {
|
|
const msg: *Message = @fieldParentPtr("node", node);
|
|
msg.deinit(arena_pool);
|
|
}
|
|
}
|
|
|
|
pub fn push(self: *Inbox, arena: Allocator, payload: Message.Payload) void {
|
|
const msg = arena.create(Message) catch |err| switch (err) {
|
|
error.OutOfMemory => @panic("OOM"),
|
|
};
|
|
|
|
msg.* = .{ .payload = payload, .arena = arena };
|
|
self.mutex.lock();
|
|
defer self.mutex.unlock();
|
|
self.queue.append(&msg.node);
|
|
}
|
|
|
|
pub fn pop(self: *Inbox) ?*Message {
|
|
self.mutex.lock();
|
|
defer self.mutex.unlock();
|
|
const node = self.queue.popFirst() orelse return null;
|
|
return @fieldParentPtr("node", node);
|
|
}
|
|
|
|
// Cherry-pick the first message for which `predicate(msg)` returns
|
|
// true, removing it from the queue. Walks the queue in FIFO order;
|
|
// non-matching messages stay in place. Used to dispatch only the
|
|
// safe subset of messages during sync-wait paths (the allowlist),
|
|
// while leaving unsafe ones to be drained at the next safe point.
|
|
pub fn popIf(self: *Inbox, predicate: *const fn (*Message) bool) ?*Message {
|
|
self.mutex.lock();
|
|
defer self.mutex.unlock();
|
|
var it = self.queue.first;
|
|
while (it) |node| : (it = node.next) {
|
|
const msg: *Message = @fieldParentPtr("node", node);
|
|
if (predicate(msg)) {
|
|
self.queue.remove(node);
|
|
return msg;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
pub const Message = struct {
|
|
arena: Allocator,
|
|
payload: Payload,
|
|
node: DoublyLinkedList.Node = .{},
|
|
|
|
pub const Payload = union(enum) {
|
|
// A CDP text/binary frame, parsed on the Network thread. `raw`
|
|
// is the original JSON bytes (owned). `arena` holds any
|
|
// auxiliary allocations from parseFromSliceLeaky (typically
|
|
// empty for unescaped messages, but slices in `input` may
|
|
// reference it). `input` is the parsed view; its string
|
|
// slices reference `raw` or `arena`. Both must outlive the
|
|
// consumer's use of `input`.
|
|
cdp: Cdp,
|
|
|
|
// WS ping frame body (≤125 bytes per spec). Consumer is
|
|
// expected to echo via pong on its thread.
|
|
ping: []u8,
|
|
|
|
// Peer-initiated close frame. Consumer is expected to send a
|
|
// close reply and tear the connection down. The peer's close
|
|
// body is dropped — historically we always reply CLOSE_NORMAL
|
|
// (status 1000) regardless of what the peer sent.
|
|
close: void,
|
|
|
|
// No allocation; conveys "no more messages will arrive on
|
|
// this inbox" plus an optional reason. The Network thread
|
|
// pushes this on peer EOF, fatal WS framing error, or
|
|
// (now) JSON parse failure.
|
|
disconnect: ?anyerror,
|
|
};
|
|
|
|
pub const Cdp = struct {
|
|
raw: []u8,
|
|
input: CDP.InputMessage,
|
|
};
|
|
|
|
pub fn deinit(self: *const Message, pool: *ArenaPool) void {
|
|
pool.release(self.arena);
|
|
}
|
|
};
|
|
|
|
const testing = @import("testing.zig");
|
|
test "Inbox: push then pop returns FIFO order" {
|
|
const arena_pool = &testing.test_app.arena_pool;
|
|
|
|
var inbox = Inbox{};
|
|
defer inbox.deinit(&testing.test_app.arena_pool);
|
|
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "inbox test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
|
|
}
|
|
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "inbox test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
|
|
}
|
|
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "inbox test");
|
|
inbox.push(arena, .{ .disconnect = null });
|
|
}
|
|
|
|
{
|
|
const m = inbox.pop().?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("first", m.payload.ping);
|
|
}
|
|
{
|
|
const m = inbox.pop().?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("second", m.payload.ping);
|
|
}
|
|
{
|
|
const m = inbox.pop().?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual(@as(?anyerror, null), m.payload.disconnect);
|
|
}
|
|
try testing.expect(inbox.pop() == null);
|
|
}
|
|
|
|
test "Inbox: deinit frees remaining items" {
|
|
const arena_pool = &testing.test_app.arena_pool;
|
|
|
|
var inbox = Inbox{};
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "inbox test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "leftover") });
|
|
}
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "inbox test");
|
|
inbox.push(arena, .{ .disconnect = error.PeerClosed });
|
|
}
|
|
|
|
inbox.deinit(&testing.test_app.arena_pool);
|
|
// Memory leaks would be caught by the test runner.
|
|
}
|
|
|
|
fn testAlwaysTrue(_: *Message) bool {
|
|
return true;
|
|
}
|
|
|
|
fn testAlwaysFalse(_: *Message) bool {
|
|
return false;
|
|
}
|
|
|
|
fn testIsPing(msg: *Message) bool {
|
|
return msg.payload == .ping;
|
|
}
|
|
|
|
test "Inbox: popIf on empty queue returns null" {
|
|
var inbox = Inbox{};
|
|
defer inbox.deinit(&testing.test_app.arena_pool);
|
|
try testing.expect(inbox.popIf(testAlwaysTrue) == null);
|
|
}
|
|
|
|
test "Inbox: popIf with no match leaves queue intact" {
|
|
const arena_pool = &testing.test_app.arena_pool;
|
|
var inbox = Inbox{};
|
|
defer inbox.deinit(arena_pool);
|
|
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
|
|
}
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
|
|
}
|
|
|
|
try testing.expect(inbox.popIf(testAlwaysFalse) == null);
|
|
|
|
// Original FIFO order preserved.
|
|
{
|
|
const m = inbox.pop().?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("first", m.payload.ping);
|
|
}
|
|
{
|
|
const m = inbox.pop().?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("second", m.payload.ping);
|
|
}
|
|
try testing.expect(inbox.pop() == null);
|
|
}
|
|
|
|
test "Inbox: popIf with always-true predicate behaves like pop" {
|
|
const arena_pool = &testing.test_app.arena_pool;
|
|
var inbox = Inbox{};
|
|
defer inbox.deinit(arena_pool);
|
|
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "a") });
|
|
}
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "b") });
|
|
}
|
|
|
|
{
|
|
const m = inbox.popIf(testAlwaysTrue).?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("a", m.payload.ping);
|
|
}
|
|
{
|
|
const m = inbox.popIf(testAlwaysTrue).?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("b", m.payload.ping);
|
|
}
|
|
try testing.expect(inbox.popIf(testAlwaysTrue) == null);
|
|
}
|
|
|
|
test "Inbox: popIf cherry-picks middle, preserves order of remainder" {
|
|
const arena_pool = &testing.test_app.arena_pool;
|
|
var inbox = Inbox{};
|
|
defer inbox.deinit(arena_pool);
|
|
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .disconnect = null });
|
|
}
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "middle") });
|
|
}
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .disconnect = error.PeerClosed });
|
|
}
|
|
|
|
// testIsPing skips the disconnect at the head and picks the middle.
|
|
{
|
|
const m = inbox.popIf(testIsPing).?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("middle", m.payload.ping);
|
|
}
|
|
|
|
// Remaining two disconnects pop in original order.
|
|
{
|
|
const m = inbox.pop().?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expect(m.payload.disconnect == null);
|
|
}
|
|
{
|
|
const m = inbox.pop().?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expect(m.payload.disconnect.? == error.PeerClosed);
|
|
}
|
|
try testing.expect(inbox.pop() == null);
|
|
}
|
|
|
|
test "Inbox: popIf picks first match in FIFO order" {
|
|
const arena_pool = &testing.test_app.arena_pool;
|
|
var inbox = Inbox{};
|
|
defer inbox.deinit(arena_pool);
|
|
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
|
|
}
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .disconnect = null });
|
|
}
|
|
{
|
|
const arena = try arena_pool.acquire(.tiny, "popif test");
|
|
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
|
|
}
|
|
|
|
const m = inbox.popIf(testIsPing).?;
|
|
defer m.deinit(arena_pool);
|
|
try testing.expectEqual("first", m.payload.ping);
|
|
}
|