Merge pull request #2540 from lightpanda-io/websocket_worker

make websocket work in worker
This commit is contained in:
Pierre Tachoire
2026-05-26 09:37:16 +02:00
committed by GitHub
5 changed files with 127 additions and 51 deletions

View File

@@ -986,13 +986,11 @@ pub const WorkerJsApis = flattenTypes(&.{
@import("../webapi/canvas/OffscreenCanvasRenderingContext2D.zig"),
@import("../webapi/net/XMLHttpRequest.zig"),
@import("../webapi/net/XMLHttpRequestEventTarget.zig"),
@import("../webapi/net/WebSocket.zig"),
@import("../webapi/FileReader.zig"),
@import("../webapi/ImageData.zig"),
@import("../webapi/Performance.zig"),
@import("../webapi/PerformanceObserver.zig"),
// EventCounts is reachable only via Performance.eventCounts, which is
// [Exposed=Window] (pruned from Worker by Snapshot.pruneExposed). The
// type itself is in PageJsApis via Performance.registerTypes().
});
// Master list of ALL JS APIs across all contexts.

View File

@@ -0,0 +1,46 @@
// Exercises the WebSocket API inside a worker. Posts 'ready' once the message
// handler is wired so the page knows it can send a command without racing
// worker startup. On command, opens a WebSocket to the test echo server,
// sends a message, and reports the echoed reply plus the close code/reason
// back to the page.
self.onmessage = function(e) {
const cmd = e.data;
try {
if (cmd.kind === 'echo') {
const received = [];
const ws = new WebSocket('ws://127.0.0.1:9584/');
ws.addEventListener('open', () => {
ws.send('from-worker');
});
ws.addEventListener('message', (ev) => {
received.push(ev.data);
ws.close(1000, 'bye');
});
ws.addEventListener('close', (ev) => {
postMessage({
ok: true,
received,
url: ws.url,
ready_state: ws.readyState,
code: ev.code,
reason: ev.reason,
was_clean: ev.wasClean,
});
});
ws.addEventListener('error', () => {
postMessage({ ok: false, err: 'websocket error' });
});
return;
}
postMessage({ ok: false, err: 'unknown command' });
} catch (err) {
postMessage({ ok: false, err: String(err), stack: err.stack });
}
};
postMessage({ ready: true });

View File

@@ -0,0 +1,28 @@
<!DOCTYPE html>
<script src="../testing.js"></script>
<script id=websocket_in_worker type=module>
{
const state = await testing.async();
const worker = new Worker('./websocket-worker.js');
// The worker posts {ready:true} once its onmessage handler is wired.
// Wait for that before sending the command to avoid racing startup.
worker.onmessage = (e) => {
if (e.data && e.data.ready) {
worker.postMessage({ kind: 'echo' });
return;
}
state.resolve(e.data);
};
await state.done((data) => {
testing.expectTrue(data.ok, 'worker websocket error: ' + data.err);
testing.expectEqual(['echo-from-worker'], data.received);
testing.expectEqual('ws://127.0.0.1:9584/', data.url);
testing.expectEqual(3, data.ready_state); // CLOSED
testing.expectEqual(1000, data.code);
testing.expectEqual('bye', data.reason);
testing.expectEqual(true, data.was_clean);
});
}
</script>

View File

@@ -19,7 +19,7 @@
const std = @import("std");
const lp = @import("lightpanda");
const Frame = @import("../../Frame.zig");
const Page = @import("../../Page.zig");
const Event = @import("../Event.zig");
const String = lp.String;
@@ -39,23 +39,23 @@ const CloseEventOptions = struct {
const Options = Event.inheritOptions(CloseEvent, CloseEventOptions);
pub fn init(typ: []const u8, _opts: ?Options, frame: *Frame) !*CloseEvent {
const arena = try frame.getArena(.tiny, "CloseEvent");
errdefer frame.releaseArena(arena);
pub fn init(typ: []const u8, _opts: ?Options, page: *Page) !*CloseEvent {
const arena = try page.getArena(.tiny, "CloseEvent");
errdefer page.releaseArena(arena);
const type_string = try String.init(arena, typ, .{});
return initWithTrusted(arena, type_string, _opts, false, frame);
return initWithTrusted(arena, type_string, _opts, false, page);
}
pub fn initTrusted(typ: String, _opts: ?Options, frame: *Frame) !*CloseEvent {
const arena = try frame.getArena(.tiny, "CloseEvent.trusted");
errdefer frame.releaseArena(arena);
return initWithTrusted(arena, typ, _opts, true, frame);
pub fn initTrusted(typ: String, _opts: ?Options, page: *Page) !*CloseEvent {
const arena = try page.getArena(.tiny, "CloseEvent.trusted");
errdefer page.releaseArena(arena);
return initWithTrusted(arena, typ, _opts, true, page);
}
fn initWithTrusted(arena: Allocator, typ: String, _opts: ?Options, trusted: bool, frame: *Frame) !*CloseEvent {
fn initWithTrusted(arena: Allocator, typ: String, _opts: ?Options, trusted: bool, page: *Page) !*CloseEvent {
const opts = _opts orelse Options{};
const event = try frame._factory.event(
const event = try page.factory.event(
arena,
typ,
CloseEvent{

View File

@@ -26,7 +26,6 @@ const Blob = @import("../Blob.zig");
const URL = @import("../../URL.zig");
const Page = @import("../../Page.zig");
const Frame = @import("../../Frame.zig");
const HttpClient = @import("../../HttpClient.zig");
const Event = @import("../Event.zig");
@@ -35,13 +34,14 @@ const CloseEvent = @import("../event/CloseEvent.zig");
const MessageEvent = @import("../event/MessageEvent.zig");
const log = lp.log;
const Execution = js.Execution;
const Allocator = std.mem.Allocator;
const IS_DEBUG = @import("builtin").mode == .Debug;
const WebSocket = @This();
_rc: lp.RC(u8) = .{},
_frame: *Frame,
_exec: *const Execution,
_proto: *EventTarget,
_arena: Allocator,
@@ -92,12 +92,12 @@ pub const BinaryType = enum {
arraybuffer,
};
pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket {
pub fn init(url: []const u8, protocols: [][]const u8, exec: *const Execution) !*WebSocket {
{
if (url.len < 6) {
return error.SyntaxError;
}
const normalized_start = std.ascii.lowerString(&frame.buf, url[0..6]);
const normalized_start = std.ascii.lowerString(exec.buf, url[0..6]);
if (!std.mem.startsWith(u8, normalized_start, "ws://") and !std.mem.startsWith(u8, normalized_start, "wss://")) {
return error.SyntaxError;
}
@@ -112,12 +112,12 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
}
}
const arena = try frame.getArena(.medium, "WebSocket");
errdefer frame.releaseArena(arena);
const arena = try exec.getArena(.medium, "WebSocket");
errdefer exec.releaseArena(arena);
const resolved_url = try URL.resolve(arena, frame.base(), url, .{ .always_dupe = true, .encoding = frame.charset });
const resolved_url = try URL.resolve(arena, exec.base(), url, .{ .always_dupe = true, .encoding = exec.charset.* });
const http_client = &frame._session.browser.http_client;
const http_client = &exec.context.page.session.browser.http_client;
const conn = http_client.network.newConnection() orelse {
return error.NoFreeConnection;
};
@@ -139,8 +139,8 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
try conn.setHeaders(&headers);
}
const self = try frame._factory.eventTargetWithAllocator(arena, WebSocket{
._frame = frame,
const self = try exec._factory.eventTargetWithAllocator(arena, WebSocket{
._exec = exec,
._conn = conn,
._arena = arena,
._proto = undefined,
@@ -150,7 +150,7 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
});
conn.transport = .{ .websocket = self };
try http_client.trackConn(conn);
frame._http_owner.addWS(self);
exec.httpOwner().addWS(self);
if (comptime IS_DEBUG) {
log.info(.websocket, "connecting", .{ .url = url });
@@ -236,11 +236,11 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
fn cleanup(self: *WebSocket) void {
if (self._conn) |conn| {
self._frame._http_owner.removeWS(self);
self._exec.httpOwner().removeWS(self);
self._http_client.removeConn(conn);
self._req_headers.deinit();
self._conn = null;
self.releaseRef(self._frame._page);
self.releaseRef(self._exec.context.page);
self._send_queue.clearRetainingCapacity();
}
}
@@ -308,8 +308,8 @@ pub fn send(self: *WebSocket, data: SendData) !void {
switch (data) {
.blob => |blob| {
const arena = try self._frame.getArena(blob._slice.len, "WebSocket.message");
errdefer self._frame.releaseArena(arena);
const arena = try self._exec.getArena(blob._slice.len, "WebSocket.message");
errdefer self._exec.releaseArena(arena);
try self.queueMessage(.{ .binary = .{
.arena = arena,
.data = try arena.dupe(u8, blob._slice),
@@ -317,8 +317,8 @@ pub fn send(self: *WebSocket, data: SendData) !void {
},
.js_val => |js_val| {
if (js_val.isString()) |str| {
const arena = try self._frame.getArena(str.len(), "WebSocket.message");
errdefer self._frame.releaseArena(arena);
const arena = try self._exec.getArena(str.len(), "WebSocket.message");
errdefer self._exec.releaseArena(arena);
try self.queueMessage(.{ .text = .{
.arena = arena,
.data = try str.toSliceWithAlloc(arena),
@@ -327,8 +327,8 @@ pub fn send(self: *WebSocket, data: SendData) !void {
const binary = try js_val.toZig(BinaryData);
const buffer = binary.asBuffer();
const arena = try self._frame.getArena(buffer.len, "WebSocket.message");
errdefer self._frame.releaseArena(arena);
const arena = try self._exec.getArena(buffer.len, "WebSocket.message");
errdefer self._exec.releaseArena(arena);
try self.queueMessage(.{ .binary = .{
.arena = arena,
.data = try arena.dupe(u8, buffer),
@@ -453,25 +453,25 @@ pub fn setOnClose(self: *WebSocket, cb_: ?js.Function) !void {
}
fn dispatchOpenEvent(self: *WebSocket) !void {
const frame = self._frame;
const exec = self._exec;
const target = self.asEventTarget();
if (frame._event_manager.hasDirectListeners(target, "open", self._on_open)) {
const event = try Event.initTrusted(comptime .wrap("open"), .{}, frame._page);
try frame._event_manager.dispatchDirect(target, event, self._on_open, .{ .context = "WebSocket open" });
if (exec.hasDirectListeners(target, "open", self._on_open)) {
const event = try Event.initTrusted(comptime .wrap("open"), .{}, exec.context.page);
try exec.dispatch(target, event, self._on_open, .{ .context = "WebSocket open" });
}
}
fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void {
const frame = self._frame;
const exec = self._exec;
const target = self.asEventTarget();
if (frame._event_manager.hasDirectListeners(target, "message", self._on_message)) {
if (exec.hasDirectListeners(target, "message", self._on_message)) {
const msg_data: MessageEvent.Data = if (frame_type == .binary)
switch (self._binary_type) {
.arraybuffer => .{ .arraybuffer = .{ .values = data } },
.blob => blk: {
const blob = try Blob.initFromBytes(data, "", false, frame._page);
const blob = try Blob.initFromBytes(data, "", false, exec.context.page);
blob.acquireRef();
break :blk .{ .blob = blob };
},
@@ -482,32 +482,32 @@ fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsF
const event = try MessageEvent.initTrusted(comptime .wrap("message"), .{
.data = msg_data,
.origin = "",
}, frame._page);
try frame._event_manager.dispatchDirect(target, event.asEvent(), self._on_message, .{ .context = "WebSocket message" });
}, exec.context.page);
try exec.dispatch(target, event.asEvent(), self._on_message, .{ .context = "WebSocket message" });
}
}
fn dispatchErrorEvent(self: *WebSocket) !void {
const frame = self._frame;
const exec = self._exec;
const target = self.asEventTarget();
if (frame._event_manager.hasDirectListeners(target, "error", self._on_error)) {
const event = try Event.initTrusted(comptime .wrap("error"), .{}, frame._page);
try frame._event_manager.dispatchDirect(target, event, self._on_error, .{ .context = "WebSocket error" });
if (exec.hasDirectListeners(target, "error", self._on_error)) {
const event = try Event.initTrusted(comptime .wrap("error"), .{}, exec.context.page);
try exec.dispatch(target, event, self._on_error, .{ .context = "WebSocket error" });
}
}
fn dispatchCloseEvent(self: *WebSocket, code: u16, reason: []const u8, was_clean: bool) !void {
const frame = self._frame;
const exec = self._exec;
const target = self.asEventTarget();
if (frame._event_manager.hasDirectListeners(target, "close", self._on_close)) {
if (exec.hasDirectListeners(target, "close", self._on_close)) {
const event = try CloseEvent.initTrusted(comptime .wrap("close"), .{
.code = code,
.reason = reason,
.wasClean = was_clean,
}, frame);
try frame._event_manager.dispatchDirect(target, event.asEvent(), self._on_close, .{ .context = "WebSocket close" });
}, exec.context.page);
try exec.dispatch(target, event.asEvent(), self._on_close, .{ .context = "WebSocket close" });
}
}
@@ -580,7 +580,7 @@ fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: M
if (self._send_offset >= byte_msg.data.len) {
const removed = self._send_queue.orderedRemove(0);
removed.deinit(self._frame._page);
removed.deinit(self._exec.context.page);
if (comptime IS_DEBUG) {
log.debug(.websocket, "send complete", .{ .url = self._url, .len = byte_msg.data.len, .queue = self._send_queue.items.len });
}
@@ -768,3 +768,7 @@ const testing = @import("../../../testing.zig");
test "WebApi: WebSocket" {
try testing.htmlRunner("net/websocket.html", .{});
}
test "WebApi: WebSocket in worker" {
try testing.htmlRunner("net/websocket_worker.html", .{});
}