add DeferringLayer

This commit is contained in:
Muki Kiboigo
2026-05-12 20:39:07 -07:00
parent 9712e4171e
commit 9ab82d2761
3 changed files with 381 additions and 8 deletions

View File

@@ -33,12 +33,6 @@ const Network = @import("../network/Network.zig");
const CDP = @import("../cdp/CDP.zig");
const Inbox = @import("../Inbox.zig");
const CachedResponse = @import("../network/cache/Cache.zig").CachedResponse;
pub const CacheLayer = @import("../network/layer/CacheLayer.zig");
pub const RobotsLayer = @import("../network/layer/RobotsLayer.zig");
pub const WebBotAuthLayer = @import("../network/layer/WebBotAuthLayer.zig");
pub const InterceptionLayer = @import("../network/layer/InterceptionLayer.zig");
const log = lp.log;
const posix = std.posix;
@@ -50,6 +44,13 @@ pub const Method = http.Method;
pub const Headers = http.Headers;
pub const ResponseHead = http.ResponseHead;
pub const HeaderIterator = http.HeaderIterator;
const CachedResponse = @import("../network/cache/Cache.zig").CachedResponse;
pub const CacheLayer = @import("../network/layer/CacheLayer.zig");
pub const RobotsLayer = @import("../network/layer/RobotsLayer.zig");
pub const WebBotAuthLayer = @import("../network/layer/WebBotAuthLayer.zig");
pub const InterceptionLayer = @import("../network/layer/InterceptionLayer.zig");
pub const DeferringLayer = @import("../network/layer/DeferringLayer.zig");
// This is loosely tied to a browser Frame. Loading all the <scripts>, doing
// XHR requests, and loading imports all happens through here. Sine the app
@@ -159,10 +160,13 @@ inbox: Inbox,
max_response_size: usize,
blocking_requests: std.AutoHashMapUnmanaged(u32, u32) = .empty,
cache_layer: CacheLayer,
robots_layer: RobotsLayer,
web_bot_auth_layer: WebBotAuthLayer,
interception_layer: InterceptionLayer,
deferring_layer: DeferringLayer,
entry_layer: Layer,
pub const Layer = struct {
@@ -217,6 +221,7 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP)
.robots_layer = .{ .allocator = allocator, .network = network },
.web_bot_auth_layer = .{},
.interception_layer = .{},
.deferring_layer = .{ .allocator = allocator, .network = network },
.entry_layer = undefined,
.arena_pool = &network.app.arena_pool,
};
@@ -239,6 +244,8 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP)
next = layerWith(&self.web_bot_auth_layer, next);
}
next = layerWith(&self.deferring_layer, next);
self.entry_layer = next;
}
@@ -258,6 +265,8 @@ pub fn deinit(self: *Client) void {
self.clearUserAgentOverride();
self.robots_layer.deinit(self.allocator);
self.deferring_layer.deinit();
self.blocking_requests.deinit(self.allocator);
self.transfers.deinit(self.allocator);
self.inbox.deinit(self.arena_pool);
}
@@ -427,6 +436,7 @@ pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void {
}
try self.drainNextTickQueue();
self.deferring_layer.flushUnblocked(&self.blocking_requests);
try self.drainQueue();
try self.perform(@intCast(timeout_ms));
// perform/processMessages just released a batch of connections back to
@@ -723,6 +733,11 @@ pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncRespo
var sync_ctx = SyncContext{ .allocator = allocator, .body = .empty };
errdefer sync_ctx.body.deinit(allocator);
const expected_id = self.nextReqId();
const frame_id = req.frame_id;
try self.blocking_requests.putNoClobber(self.allocator, frame_id, expected_id);
defer _ = self.blocking_requests.remove(frame_id);
var r = req;
r.ctx = &sync_ctx;
r.header_callback = SyncContext.headerCallback;
@@ -1274,12 +1289,28 @@ pub const FulfilledResponse = struct {
}
};
pub const StableResponse = struct {
ctx: *anyopaque,
status: u16,
url: [:0]const u8,
headers: []const http.Header,
body: ?[]const u8,
pub fn contentType(self: *const StableResponse) ?[]const u8 {
for (self.headers) |hdr| {
if (std.ascii.eqlIgnoreCase(hdr.name, "content-type")) return hdr.value;
}
return null;
}
};
pub const Response = struct {
ctx: *anyopaque,
inner: union(enum) {
transfer: *Transfer,
cached: *const CachedResponse,
fulfilled: *const FulfilledResponse,
stable: *const StableResponse,
},
pub fn fromTransfer(transfer: *Transfer) Response {
@@ -1294,11 +1325,16 @@ pub const Response = struct {
return .{ .ctx = ctx, .inner = .{ .fulfilled = fulfilled } };
}
pub fn fromStable(stable: *const StableResponse) Response {
return .{ .ctx = stable.ctx, .inner = .{ .stable = stable } };
}
pub fn status(self: Response) ?u16 {
return switch (self.inner) {
.transfer => |t| if (t.res.header) |rh| rh.status else null,
.cached => |c| c.metadata.status,
.fulfilled => |f| f.status,
.stable => |s| s.status,
};
}
@@ -1307,6 +1343,7 @@ pub const Response = struct {
.transfer => |t| if (t.res.header) |*rh| rh.contentType() else null,
.cached => |c| c.metadata.content_type,
.fulfilled => |f| f.contentType(),
.stable => |s| s.contentType(),
};
}
@@ -1318,13 +1355,14 @@ pub const Response = struct {
.file => |f| @intCast(f.len),
},
.fulfilled => |f| if (f.body) |b| @intCast(b.len) else null,
.stable => |s| if (s.body) |b| @intCast(b.len) else null,
};
}
pub fn redirectCount(self: Response) ?u32 {
return switch (self.inner) {
.transfer => |t| if (t.res.header) |rh| rh.redirect_count else null,
.cached, .fulfilled => 0,
.cached, .fulfilled, .stable => 0,
};
}
@@ -1333,6 +1371,7 @@ pub const Response = struct {
.transfer => |t| t.req.url,
.cached => |c| c.metadata.url,
.fulfilled => |f| f.url,
.stable => |s| s.url,
};
}
@@ -1341,13 +1380,14 @@ pub const Response = struct {
.transfer => |t| t.responseHeaderIterator(),
.cached => |c| HeaderIterator{ .list = .{ .list = c.metadata.headers } },
.fulfilled => |f| HeaderIterator{ .list = .{ .list = f.headers } },
.stable => |s| HeaderIterator{ .list = .{ .list = s.headers } },
};
}
pub fn abort(self: Response, err: anyerror) void {
switch (self.inner) {
.transfer => |t| t.abort(err),
.cached, .fulfilled => {},
.cached, .fulfilled, .stable => {},
}
}
@@ -1356,6 +1396,28 @@ pub const Response = struct {
.transfer => |t| try t.format(writer),
.cached => |c| try c.format(writer),
.fulfilled => |f| try writer.print("fulfilled {s}", .{f.url}),
.stable => |s| writer.print("stable {s}", .{s.url}),
};
}
pub fn toStable(self: Response, arena: std.mem.Allocator) !StableResponse {
const new_url = try arena.dupeZ(u8, self.url());
var headers: std.ArrayListUnmanaged(http.Header) = .{};
var it = self.headerIterator();
while (it.next()) |hdr| {
try headers.append(arena, .{
.name = try arena.dupe(u8, hdr.name),
.value = try arena.dupe(u8, hdr.value),
});
}
return .{
.ctx = self.ctx,
.status = self.status() orelse 0,
.url = new_url,
.headers = headers.items,
.body = null,
};
}
};

View File

@@ -389,6 +389,18 @@ pub fn getAsyncImport(self: *ScriptManagerBase, url: [:0]const u8, cb: ImportAsy
pub fn staticScriptsDone(self: *ScriptManagerBase) void {
lp.assert(self.static_scripts_done == false, "ScriptManagerBase.staticScriptsDone", .{});
self.static_scripts_done = true;
const frame_id = self.owner.frameId();
if (comptime IS_DEBUG) {
const blocking_request_id = self.client.blocking_requests.get(frame_id);
lp.assert(
blocking_request_id == null,
"there should be no blocking request on this frame",
.{ .value = blocking_request_id },
);
}
self.client.deferring_layer.flushFrame(frame_id);
self.evaluate();
}

View File

@@ -0,0 +1,299 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const lp = @import("lightpanda");
const log = lp.log;
const Client = @import("../../browser/HttpClient.zig").Client;
const Network = @import("../Network.zig");
const Transfer = @import("../../browser/HttpClient.zig").Transfer;
const Request = @import("../../browser/HttpClient.zig").Request;
const Response = @import("../../browser/HttpClient.zig").Response;
const Layer = @import("../../browser/HttpClient.zig").Layer;
const StableResponse = @import("../../browser/HttpClient.zig").StableResponse;
const Forward = @import("Forward.zig");
const DeferringLayer = @This();
allocator: std.mem.Allocator,
network: *Network,
next: Layer = undefined,
active: std.DoublyLinkedList = .{},
pub fn layer(self: *DeferringLayer) Layer {
return .{
.ptr = self,
.vtable = &.{ .request = request },
};
}
pub fn deinit(self: *DeferringLayer) void {
self.drainAll();
}
fn request(ptr: *anyopaque, transfer: *Transfer) anyerror!void {
const self: *DeferringLayer = @ptrCast(@alignCast(ptr));
const arena = try self.network.app.arena_pool.acquire(.small, "DeferringContext");
errdefer self.network.app.arena_pool.release(arena);
const ctx = try arena.create(DeferredContext);
ctx.* = .{
.arena = arena,
.layer = self,
.transfer = transfer,
.forward = Forward.capture(&transfer.req),
.node = .{},
};
self.active.append(&ctx.node);
errdefer self.active.remove(&ctx.node);
transfer.req.ctx = ctx;
transfer.req.start_callback = if (ctx.forward.start != null) DeferredContext.startCallback else null;
transfer.req.header_callback = DeferredContext.headerCallback;
transfer.req.data_callback = DeferredContext.dataCallback;
transfer.req.done_callback = DeferredContext.doneCallback;
transfer.req.error_callback = DeferredContext.errorCallback;
transfer.req.shutdown_callback = if (ctx.forward.shutdown != null) DeferredContext.shutdownCallback else null;
return self.next.request(transfer);
}
pub fn flushUnblocked(
self: *DeferringLayer,
blocking_requests: *const std.AutoHashMapUnmanaged(u32, u32),
) void {
var node = self.active.first;
while (node) |n| {
node = n.next;
const ctx: *DeferredContext = @fieldParentPtr("node", n);
if (!ctx.deferring or !ctx.terminal) continue;
const deferred_req = ctx.transfer.req;
const frame_id = deferred_req.frame_id;
if (!blocking_requests.contains(frame_id)) {
self.active.remove(n);
ctx.fire();
}
}
}
pub fn flushFrame(self: *DeferringLayer, frame_id: u32) void {
var node = self.active.first;
while (node) |n| {
node = n.next;
const ctx: *DeferredContext = @fieldParentPtr("node", n);
if (!ctx.deferring or !ctx.terminal) continue;
const deferred_req = ctx.transfer.req;
if (deferred_req.frame_id == frame_id) {
self.active.remove(n);
ctx.fire();
}
}
}
pub fn drainAll(self: *DeferringLayer) void {
while (self.active.popFirst()) |node| {
const ctx: *DeferredContext = @fieldParentPtr("node", node);
ctx.deinit();
}
}
const DeferredContext = struct {
arena: std.mem.Allocator,
layer: *DeferringLayer,
transfer: *Transfer,
forward: Forward,
node: std.DoublyLinkedList.Node,
buffered: std.ArrayListUnmanaged(BufferedEvent) = .{},
deferring: bool = false,
terminal: bool = false,
stable_resp: ?StableResponse = null,
const BufferedEvent = union(enum) {
start,
header,
data: []const u8,
done,
err: anyerror,
shutdown,
};
fn deinit(self: *DeferredContext) void {
self.layer.network.app.arena_pool.release(self.arena);
}
fn shouldDefer(self: *DeferredContext) bool {
const req = self.transfer.req;
const blocking_id = self.transfer.client.blocking_requests.get(req.frame_id) orelse return false;
return self.transfer.id != blocking_id;
}
fn startCallback(response: Response) anyerror!void {
const self: *DeferredContext = @ptrCast(@alignCast(response.ctx));
const req = self.transfer.req;
if (!self.deferring and !self.shouldDefer()) {
return self.forward.forwardStart(response);
}
log.debug(.http, "deferring start callback", .{ .url = req.url });
self.stable_resp = try response.toStable(self.arena);
self.deferring = true;
try self.buffered.append(self.arena, .start);
}
fn headerCallback(response: Response) anyerror!bool {
const self: *DeferredContext = @ptrCast(@alignCast(response.ctx));
const req = self.transfer.req;
if (!self.deferring and !self.shouldDefer()) {
return self.forward.forwardHeader(response);
}
log.debug(.http, "deferring header callback", .{ .url = req.url });
self.stable_resp = try response.toStable(self.arena);
self.deferring = true;
try self.buffered.append(self.arena, .header);
return true;
}
fn dataCallback(response: Response, chunk: []const u8) anyerror!void {
const self: *DeferredContext = @ptrCast(@alignCast(response.ctx));
const req = self.transfer.req;
if (!self.deferring and !self.shouldDefer()) {
return self.forward.forwardData(response, chunk);
}
log.debug(.http, "deferring data callback", .{ .url = req.url });
self.deferring = true;
try self.buffered.append(self.arena, .{ .data = try self.arena.dupe(u8, chunk) });
}
fn doneCallback(ctx: *anyopaque) anyerror!void {
const self: *DeferredContext = @ptrCast(@alignCast(ctx));
const req = self.transfer.req;
if (!self.deferring and !self.shouldDefer()) {
defer self.deinit();
self.layer.active.remove(&self.node);
return self.forward.forwardDone();
}
log.debug(.http, "deferring done callback", .{ .url = req.url });
self.deferring = true;
self.terminal = true;
try self.buffered.append(self.arena, .done);
}
fn errorCallback(ctx: *anyopaque, err: anyerror) void {
const self: *DeferredContext = @ptrCast(@alignCast(ctx));
const req = self.transfer.req;
if (!self.deferring and !self.shouldDefer()) {
defer self.deinit();
self.layer.active.remove(&self.node);
self.forward.forwardErr(err);
return;
}
log.debug(.http, "deferring error callback", .{ .url = req.url, .err = err });
self.deferring = true;
self.terminal = true;
self.buffered.append(self.arena, .{ .err = err }) catch {};
}
fn shutdownCallback(ctx: *anyopaque) void {
const self: *DeferredContext = @ptrCast(@alignCast(ctx));
const req = self.transfer.req;
if (!self.deferring and !self.shouldDefer()) {
defer self.deinit();
self.layer.active.remove(&self.node);
self.forward.forwardShutdown();
return;
}
log.debug(.http, "deferring shutdown callback", .{ .url = req.url });
self.deferring = true;
self.terminal = true;
self.buffered.append(self.arena, .shutdown) catch {};
}
// Replay all buffered events in order, then clean up.
fn fire(self: *DeferredContext) void {
defer self.deinit();
const req = self.transfer.req;
const stable_response = self.stable_resp.?;
const response = Response.fromStable(&stable_response);
for (self.buffered.items) |event| {
switch (event) {
.start => {
self.forward.forwardStart(response) catch |err| {
log.err(.http, "deferred start callback", .{ .err = err, .url = req.url });
self.forward.forwardErr(err);
return;
};
},
.header => {
const proceed = self.forward.forwardHeader(response) catch |err| {
log.err(.http, "deferred header callback", .{ .err = err, .url = req.url });
self.forward.forwardErr(err);
return;
};
if (!proceed) {
self.forward.forwardErr(error.Abort);
}
},
.data => |chunk| {
self.forward.forwardData(response, chunk) catch |err| {
log.err(.http, "deferred data callback", .{ .err = err, .url = req.url });
self.forward.forwardErr(err);
return;
};
},
.done => {
self.forward.forwardDone() catch |err| {
log.err(.http, "deferred done callback", .{ .err = err, .url = req.url });
self.forward.forwardErr(err);
};
return;
},
.err => |err| {
self.forward.forwardErr(err);
return;
},
.shutdown => {
self.forward.forwardShutdown();
return;
},
}
}
}
};