Merge branch 'main' into agent

This commit is contained in:
Adrià Arrufat
2026-05-14 07:41:04 +02:00
10 changed files with 197 additions and 80 deletions

View File

@@ -88,7 +88,6 @@ jobs:
matrix:
proxy: [true, false]
wba: [true, false]
cache: [true, false]
robotstxt: [true, false]
name: demo-runner
@@ -130,9 +129,6 @@ jobs:
go build
./proxy &
- if: matrix.cache == true
run: mkdir /tmp/lp-cache
- if: matrix.wba == true
run: echo "${{ secrets.WBA_PRIVATE_KEY_PEM }}" > private_key.pem
@@ -141,7 +137,6 @@ jobs:
run: |
args=""
[ "${{ matrix.proxy }}" = "true" ] && args="$args --http-proxy http://127.0.0.1:3000"
[ "${{ matrix.cache }}" = "true" ] && args="$args --http-cache-dir /tmp/lp-cache"
[ "${{ matrix.robotstxt }}" = "true" ] && args="$args --obey-robots"
[ "${{ matrix.wba }}" = "true" ] && args="$args --web-bot-auth-key-file private_key.pem"
[ "${{ matrix.wba }}" = "true" ] && args="$args --web-bot-auth-domain ${{ vars.WBA_DOMAIN }}"
@@ -159,7 +154,7 @@ jobs:
uses: actions/upload-artifact@v7
if: always()
with:
name: cdp-log-demo-runner-${{ matrix.proxy }}-${{ matrix.wba }}-${{ matrix.cache }}-${{ matrix.robotstxt }}
name: cdp-log-demo-runner-${{ matrix.proxy }}-${{ matrix.wba }}-${{ matrix.robotstxt }}
path: cdp.log
retention-days: 1
@@ -168,7 +163,6 @@ jobs:
fail-fast: false
matrix:
wba: [true, false]
cache: [true, false]
robotstxt: [true, false]
name: proxy-auth
@@ -209,9 +203,6 @@ jobs:
go build
./proxy &
- if: matrix.cache == true
run: mkdir /tmp/lp-cache
- if: matrix.wba == true
run: echo "${{ secrets.WBA_PRIVATE_KEY_PEM }}" > private_key.pem
@@ -219,7 +210,6 @@ jobs:
name: build LP args
run: |
args=""
[ "${{ matrix.cache }}" = "true" ] && args="$args --http-cache-dir /tmp/lp-cache"
[ "${{ matrix.robotstxt }}" = "true" ] && args="$args --obey-robots"
[ "${{ matrix.wba }}" = "true" ] && args="$args --web-bot-auth-key-file private_key.pem"
[ "${{ matrix.wba }}" = "true" ] && args="$args --web-bot-auth-domain ${{ vars.WBA_DOMAIN }}"
@@ -244,7 +234,7 @@ jobs:
uses: actions/upload-artifact@v7
if: always()
with:
name: cdp-log-proxy-auth-${{ matrix.wba }}-${{ matrix.cache }}-${{ matrix.robotstxt }}
name: cdp-log-proxy-auth-${{ matrix.wba }}-${{ matrix.robotstxt }}
path: cdp.log
retention-days: 1

View File

@@ -379,19 +379,27 @@ pub fn abortRequests(_: *Client, owner: *Owner) void {
}
pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
try self.drainQueue();
const status = try self.perform(@intCast(timeout_ms));
// perform/processMessages just released a batch of connections back to
// the pool. Drain again so queued transfers can use them this tick
// instead of waiting for the next runner iteration.
try self.drainQueue();
return status;
}
fn drainQueue(self: *Client) !void {
while (self.queue.popFirst()) |queue_node| {
const transfer: *Transfer = @fieldParentPtr("_node", queue_node);
const conn = self.network.getConnection() orelse {
self.queue.prepend(queue_node);
break;
return;
};
// Cleared only after we've successfully obtained a connection;
// if we put the node back, _queued stays true.
transfer._queued = false;
try self.makeRequest(conn, transfer);
}
return self.perform(@intCast(timeout_ms));
}
// last layer
@@ -611,7 +619,15 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer
return err;
};
}
_ = try self.perform(0);
// Start the request (and move along any other request). This used to call
// self.perform(0) but that can also execute callbacks. Normally, that
// wouldn't be so bad. But curl can synchronously fire callbacks for the
// request we JUST added, which we do not want (it results in incorrect
// execution).
self.performing = true;
defer self.performing = false;
_ = try self.handles.perform();
}
pub const PerformStatus = enum {

View File

@@ -212,7 +212,18 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
},
}
if (http_active == 0 and http_client.ws_active == 0 and (comptime is_cdp == false)) {
if (http_active == 0 and http_client.ws_active == 0 and http_client.queue.first == null and http_client.ready_queue.first == null and (comptime is_cdp == false)) {
// we don't need to consider http_client.intercepted here
// because is_cdp is false, and that can only be
// the case when interception isn't possible.
//
// ready_queue is also part of the check: makeRequest now
// wraps its handles.perform() in a performing=true window,
// and any synchronous libcurl callback that ends up
// calling trackConn during that window (e.g. JS creating
// a WebSocket) will append to ready_queue. Without this
// check we could observe it non-empty after
// http_client.tick returns.
// we don't need to consider http_client.intercepted here
// because is_cdp is false, and that can only be
// the case when interception isn't possible.

View File

@@ -329,60 +329,3 @@
});
}
</script>
<script id=xhr_in_worker type=module>
{
const state = await testing.async();
const worker = new Worker('./xhr-worker.js');
worker.onmessage = (e) => state.resolve(e.data);
setTimeout(() => worker.postMessage({ kind: 'basic' }), 100);
await state.done((data) => {
testing.expectTrue(data.ok, 'worker xhr error: ' + data.err);
testing.expectEqual(200, data.status);
testing.expectEqual('OK', data.status_text);
testing.expectEqual('http://127.0.0.1:9582/xhr', data.response_url);
testing.expectEqual(100, data.response_text_length);
testing.expectEqual('text/html; charset=utf-8', data.content_type);
testing.expectEqual(4, data.states.length);
testing.expectEqual(1, data.states[0]);
testing.expectEqual(2, data.states[1]);
testing.expectEqual(3, data.states[2]);
testing.expectEqual(4, data.states[3]);
});
}
</script>
<script id=xhr_arraybuffer_in_worker type=module>
{
const state = await testing.async();
const worker = new Worker('./xhr-worker.js');
worker.onmessage = (e) => state.resolve(e.data);
setTimeout(() => worker.postMessage({ kind: 'arraybuffer' }), 100);
await state.done((data) => {
testing.expectTrue(data.ok, 'worker xhr error: ' + data.err);
testing.expectEqual(200, data.status);
testing.expectEqual(7, data.byte_length);
testing.expectEqual(0, data.first);
testing.expectEqual(1, data.third);
testing.expectEqual(9, data.last);
testing.expectEqual('arraybuffer', data.response_type);
});
}
</script>
<script id=xhr_document_in_worker_unsupported type=module>
{
const state = await testing.async();
const worker = new Worker('./xhr-worker.js');
worker.onmessage = (e) => state.resolve(e.data);
setTimeout(() => worker.postMessage({ kind: 'document_unsupported' }), 100);
await state.done((data) => {
testing.expectTrue(data.ok, 'worker xhr error: ' + data.err);
testing.expectEqual(200, data.status);
testing.expectEqual(true, data.threw);
});
}
</script>

View File

@@ -0,0 +1,59 @@
<!DOCTYPE html>
<script src="../testing.js"></script>
<script id=xhr_in_worker type=module>
{
const state = await testing.async();
const worker = new Worker('./xhr-worker.js');
worker.onmessage = (e) => state.resolve(e.data);
worker.postMessage({ kind: 'basic' });
await state.done((data) => {
testing.expectTrue(data.ok, 'worker xhr error: ' + data.err);
testing.expectEqual(200, data.status);
testing.expectEqual('OK', data.status_text);
testing.expectEqual('http://127.0.0.1:9582/xhr', data.response_url);
testing.expectEqual(100, data.response_text_length);
testing.expectEqual('text/html; charset=utf-8', data.content_type);
testing.expectEqual(4, data.states.length);
testing.expectEqual(1, data.states[0]);
testing.expectEqual(2, data.states[1]);
testing.expectEqual(3, data.states[2]);
testing.expectEqual(4, data.states[3]);
});
}
</script>
<script id=xhr_arraybuffer_in_worker type=module>
{
const state = await testing.async();
const worker = new Worker('./xhr-worker.js');
worker.onmessage = (e) => state.resolve(e.data);
worker.postMessage({ kind: 'arraybuffer' });
await state.done((data) => {
testing.expectTrue(data.ok, 'worker xhr error: ' + data.err);
testing.expectEqual(200, data.status);
testing.expectEqual(7, data.byte_length);
testing.expectEqual(0, data.first);
testing.expectEqual(1, data.third);
testing.expectEqual(9, data.last);
testing.expectEqual('arraybuffer', data.response_type);
});
}
</script>
<script id=xhr_document_in_worker_unsupported type=module>
{
const state = await testing.async();
const worker = new Worker('./xhr-worker.js');
worker.onmessage = (e) => state.resolve(e.data);
worker.postMessage({ kind: 'document_unsupported' });
await state.done((data) => {
testing.expectTrue(data.ok, 'worker xhr error: ' + data.err);
testing.expectEqual(200, data.status);
testing.expectEqual(true, data.threw);
});
}
</script>

View File

@@ -337,3 +337,26 @@
});
}
</script>
<script id="worker_post_before_load" type=module>
// Per spec, messages posted before the worker has finished loading must
// be buffered and delivered once onmessage is registered. Without the
// pending-message queue this hangs: postMessage fires immediately after
// new Worker, while the worker script's HTTP fetch is still in flight,
// so by the time the worker dispatches the message there is no
// onmessage listener yet and the message gets silently dropped.
{
const state = await testing.async();
const worker = new Worker('./echo-worker.js');
worker.onmessage = function(event) {
state.resolve(event.data);
};
// No setTimeout — post right now while the script is still loading.
worker.postMessage({ greeting: 'before-load' });
await state.done((response) => {
testing.expectEqual('before-load', response.echo.greeting);
testing.expectEqual('worker', response.from);
});
}
</script>

View File

@@ -21,12 +21,12 @@ const lp = @import("lightpanda");
const js = @import("../js/js.zig");
const Page = @import("../Page.zig");
const Frame = @import("../Frame.zig");
const Node = @import("Node.zig");
const EventTarget = @import("EventTarget.zig");
const String = lp.String;
const Execution = js.Execution;
const Allocator = std.mem.Allocator;
pub const Event = @This();
@@ -264,7 +264,7 @@ pub fn getIsTrusted(self: *const Event) bool {
return self._is_trusted;
}
pub fn composedPath(self: *Event, frame: *Frame) ![]const *EventTarget {
pub fn composedPath(self: *Event, exec: *Execution) ![]const *EventTarget {
// Return empty array if event is not being dispatched
if (self._event_phase == .none) {
return &.{};
@@ -329,8 +329,13 @@ pub fn composedPath(self: *Event, frame: *Frame) ![]const *EventTarget {
// Add window at the end (unless we stopped at shadow boundary)
if (!stopped_at_shadow_boundary) {
if (path_len < path_buffer.len) {
path_buffer[path_len] = frame.window.asEventTarget();
path_len += 1;
switch (exec.context.global) {
.worker => {},
.frame => |frame| {
path_buffer[path_len] = frame.window.asEventTarget();
path_len += 1;
},
}
}
}
@@ -366,7 +371,7 @@ pub fn composedPath(self: *Event, frame: *Frame) ![]const *EventTarget {
const visible_path_len = if (path_len > visible_start_index) path_len - visible_start_index else 0;
// Allocate and return the visible path using call_arena (short-lived)
const path = try frame.call_arena.alloc(*EventTarget, visible_path_len);
const path = try exec.call_arena.alloc(*EventTarget, visible_path_len);
@memcpy(path, path_buffer[visible_start_index..path_len]);
return path;
}

View File

@@ -169,7 +169,6 @@ fn httpDataCallback(response: HttpClient.Response, data: []const u8) !void {
fn httpDoneCallback(ctx: *anyopaque) !void {
const self: *Worker = @ptrCast(@alignCast(ctx));
self._http_response = null;
self._script_loaded = true;
const url = self._url;
const script = self._script_buffer.items;
@@ -185,6 +184,23 @@ fn httpDoneCallback(ctx: *anyopaque) !void {
}
fn loadInitialScript(self: *Worker, script: []const u8) !void {
// Keep buffering throughout the entire outer eval (including any
// runMacrotasks pumped by importScripts via the synchronous CDP path,
// see WorkerGlobalScope.importScripts). The flip-and-drain happens
// via defer so it runs after eval AND after the trailing
// runMacrotasks below — by which point the outer script has had its
// only chance to register onmessage. drainPendingMessages enqueues
// messages in receive order, so pre-eval and during-eval messages
// are delivered FIFO on the next runner tick, matching the spec.
//
// On eval-throw the defer still fires; the messages get scheduled
// and then drop at the "no listener" check, mirroring the
// httpErrorCallback path.
defer {
self._script_loaded = true;
self._worker_scope.drainPendingMessages();
}
var ls: js.Local.Scope = undefined;
self._worker_scope.js.localScope(&ls);
defer ls.deinit();
@@ -227,6 +243,13 @@ fn httpErrorCallback(ctx: *anyopaque, err: anyerror) void {
.err = err,
});
// The worker will never load and onmessage will never be registered.
// Drain any buffered messages so they get dispatched (and silently
// dropped at the "no listener" check) rather than accumulating until
// worker teardown. Future postMessages then schedule normally.
self._script_loaded = true;
self._worker_scope.drainPendingMessages();
self.fireErrorEvent(@errorName(err), null);
}

View File

@@ -105,6 +105,13 @@ _location: WorkerLocation,
_timers: Timers = .{},
// Messages received before the worker script finished evaluating. Per the
// HTML spec, postMessage'd data is buffered while the worker is loading
// and delivered once the worker is ready (i.e. once onmessage can be set).
// Drained by drainPendingMessages, called from Worker.loadInitialScript
// after the initial script has been evaluated.
_pending_messages: std.ArrayList(?JS.Value.Temp) = .empty,
pub fn init(worker: *Worker, url: [:0]const u8) !*WorkerGlobalScope {
const arena = worker._arena;
const parent = worker._frame;
@@ -156,6 +163,14 @@ pub fn deinit(self: *WorkerGlobalScope) void {
browser.http_client.abortOwner(&self._http_owner);
// Release any messages that were buffered while waiting for the
// worker script to load but never got drained (e.g. worker script
// failed to fetch). Backing array lives on self.arena so the storage
// itself is freed with the arena.
for (self._pending_messages.items) |maybe_data| {
if (maybe_data) |d| d.release();
}
self._identity.deinit();
self._script_manager.deinit();
@@ -300,6 +315,20 @@ pub fn receiveMessage(self: *WorkerGlobalScope, data: JS.Value) !void {
break :blk cloned.temp() catch break :blk null;
};
if (!self._worker._script_loaded) {
// Buffer until Worker.loadInitialScript calls drainPendingMessages.
// Without this, postMessage'd data races against the worker's
// script load: if onmessage hasn't been registered yet (because
// the worker hasn't been evaluated), the dispatched event finds
// no listener and the message is silently dropped.
try self._pending_messages.append(self.arena, cloned_data);
return;
}
try self.scheduleMessage(cloned_data);
}
fn scheduleMessage(self: *WorkerGlobalScope, cloned_data: ?JS.Value.Temp) !void {
const session = self._session;
const message_arena = try session.getArena(.tiny, "WorkerGlobalScope.receiveMessage");
@@ -319,6 +348,20 @@ pub fn receiveMessage(self: *WorkerGlobalScope, data: JS.Value) !void {
});
}
// Called by Worker.loadInitialScript once the initial script has been
// evaluated and onmessage has had a chance to be registered. Any messages
// that arrived while the worker was loading are scheduled for delivery in
// the order they were received.
pub fn drainPendingMessages(self: *WorkerGlobalScope) void {
for (self._pending_messages.items) |cloned_data| {
self.scheduleMessage(cloned_data) catch |err| {
log.warn(.browser, "worker drain msg failed", .{ .err = err });
if (cloned_data) |d| d.release();
};
}
self._pending_messages.clearRetainingCapacity();
}
pub fn btoa(_: *const WorkerGlobalScope, input: JS.String.OneByte, exec: *JS.Execution) ![]const u8 {
return @import("encoding/base64.zig").encode(exec.call_arena, input.bytes);
}

View File

@@ -671,3 +671,7 @@ const testing = @import("../../../testing.zig");
test "WebApi: XHR" {
try testing.htmlRunner("net/xhr.html", .{});
}
test "WebApi: XHR in worker" {
try testing.htmlRunner("net/xhr_worker.html", .{});
}