From 5733c35a2de3cd5262a4e0fa21ad1b10ba026758 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Tue, 31 Mar 2026 20:37:28 +0800 Subject: [PATCH 01/15] WebSocket WebAPI Uses libcurl's websocket capabilities to add support for WebSocket. Depends on https://github.com/lightpanda-io/zig-v8-fork/pull/167 Issue: https://github.com/lightpanda-io/browser/issues/1952 This is a WIP because it currently uses the same connection pool used for all HTTP requests. It would be pretty easy for a page to starve the pool and block any progress. We previously stored the *Transfer inside of the easy's private data. We now store the *Connection, and a Connection now has a `transport` field which is a union for `http: *Transfer` or `websocket: *Websocket`. --- build.zig | 1 + build.zig.zon | 4 +- src/TestWSServer.zig | 371 ++++++++++++ src/browser/HttpClient.zig | 149 ++--- src/browser/js/bridge.zig | 2 + src/browser/tests/net/websocket.html | 240 ++++++++ src/browser/tests/net/websocket2.html | 233 ++++++++ src/browser/tests/net/websocket3.html | 77 +++ src/browser/webapi/Event.zig | 2 + src/browser/webapi/EventTarget.zig | 3 + src/browser/webapi/MessagePort.zig | 2 +- src/browser/webapi/Window.zig | 2 +- src/browser/webapi/event/CloseEvent.zig | 102 ++++ src/browser/webapi/event/MessageEvent.zig | 17 +- src/browser/webapi/net/WebSocket.zig | 687 ++++++++++++++++++++++ src/network/http.zig | 90 ++- src/sys/libcurl.zig | 122 ++++ src/testing.zig | 40 +- 18 files changed, 2039 insertions(+), 105 deletions(-) create mode 100644 src/TestWSServer.zig create mode 100644 src/browser/tests/net/websocket.html create mode 100644 src/browser/tests/net/websocket2.html create mode 100644 src/browser/tests/net/websocket3.html create mode 100644 src/browser/webapi/event/CloseEvent.zig create mode 100644 src/browser/webapi/net/WebSocket.zig diff --git a/build.zig b/build.zig index d5b06794..fe61f488 100644 --- a/build.zig +++ b/build.zig @@ -493,6 +493,7 @@ fn buildCurl( .CURL_DISABLE_SMTP = true, .CURL_DISABLE_TELNET = true, .CURL_DISABLE_TFTP = true, + .CURL_DISABLE_WEBSOCKETS = false, // Enable WebSocket support .ssize_t = null, ._FILE_OFFSET_BITS = 64, diff --git a/build.zig.zon b/build.zig.zon index f6c231bb..0c6096a4 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -5,8 +5,8 @@ .minimum_zig_version = "0.15.2", .dependencies = .{ .v8 = .{ - .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/refs/tags/v0.3.7.tar.gz", - .hash = "v8-0.0.0-xddH67uBBAD95hWsPQz3Ni1PlZjdywtPXrGUAp8rSKco", + .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/99c1ddf2d0b15f141e92ea09abdfc8e0e5f441e6.tar.gz", + .hash = "v8-0.0.0-xddH63-BBABP05dni8oMrs9qQwuczHhNhXHbXXlPb95s", }, // .v8 = .{ .path = "../zig-v8-fork" }, .brotli = .{ diff --git a/src/TestWSServer.zig b/src/TestWSServer.zig new file mode 100644 index 00000000..4d7ddd54 --- /dev/null +++ b/src/TestWSServer.zig @@ -0,0 +1,371 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const posix = std.posix; + +const TestWSServer = @This(); + +shutdown: std.atomic.Value(bool), +listener: ?posix.socket_t, + +pub fn init() TestWSServer { + return .{ + .shutdown = .init(true), + .listener = null, + }; +} + +pub fn deinit(self: *TestWSServer) void { + if (self.listener) |socket| { + posix.close(socket); + self.listener = null; + } +} + +pub fn stop(self: *TestWSServer) void { + self.shutdown.store(true, .release); + if (self.listener) |socket| { + posix.close(socket); + self.listener = null; + } +} + +pub fn run(self: *TestWSServer, wg: *std.Thread.WaitGroup) void { + self.runImpl(wg) catch |err| { + std.debug.print("WebSocket echo server error: {}\n", .{err}); + }; +} + +fn runImpl(self: *TestWSServer, wg: *std.Thread.WaitGroup) !void { + const socket = try posix.socket(posix.AF.INET, posix.SOCK.STREAM, 0); + errdefer posix.close(socket); + + const addr = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 9584); + + try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + try posix.bind(socket, &addr.any, addr.getOsSockLen()); + try posix.listen(socket, 8); + + self.listener = socket; + self.shutdown.store(false, .release); + wg.finish(); + + while (!self.shutdown.load(.acquire)) { + var client_addr: posix.sockaddr = undefined; + var addr_len: posix.socklen_t = @sizeOf(posix.sockaddr); + + const client = posix.accept(socket, &client_addr, &addr_len, 0) catch |err| { + if (self.shutdown.load(.acquire)) return; + std.debug.print("[WS Server] Accept error: {}\n", .{err}); + continue; + }; + + const thread = std.Thread.spawn(.{}, handleClient, .{client}) catch |err| { + std.debug.print("[WS Server] Thread spawn error: {}\n", .{err}); + posix.close(client); + continue; + }; + thread.detach(); + } +} + +fn handleClient(client: posix.socket_t) void { + defer posix.close(client); + + var buf: [4096]u8 = undefined; + const n = posix.read(client, &buf) catch return; + + const request = buf[0..n]; + + // Find Sec-WebSocket-Key + const key_header = "Sec-WebSocket-Key: "; + const key_start = std.mem.indexOf(u8, request, key_header) orelse return; + const key_line_start = key_start + key_header.len; + const key_end = std.mem.indexOfScalarPos(u8, request, key_line_start, '\r') orelse return; + const key = request[key_line_start..key_end]; + + // Compute accept key + var hasher = std.crypto.hash.Sha1.init(.{}); + hasher.update(key); + hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + var hash: [20]u8 = undefined; + hasher.final(&hash); + + var accept_key: [28]u8 = undefined; + _ = std.base64.standard.Encoder.encode(&accept_key, &hash); + + // Send upgrade response + var resp_buf: [256]u8 = undefined; + const resp = std.fmt.bufPrint(&resp_buf, "HTTP/1.1 101 Switching Protocols\r\n" ++ + "Upgrade: websocket\r\n" ++ + "Connection: Upgrade\r\n" ++ + "Sec-WebSocket-Accept: {s}\r\n\r\n", .{accept_key}) catch return; + _ = posix.write(client, resp) catch return; + + // Message loop with larger buffer for big messages + var msg_buf: [128 * 1024]u8 = undefined; + var recv_buf = RecvBuffer{ .buf = &msg_buf }; + + while (true) { + const frame = recv_buf.readFrame(client) orelse break; + + // Close frame - echo it back before closing + if (frame.opcode == 8) { + sendFrame(client, 8, "", frame.payload) catch {}; + break; + } + + // Handle commands or echo + if (frame.opcode == 1) { // Text + handleTextMessage(client, frame.payload) catch break; + } else if (frame.opcode == 2) { // Binary + handleBinaryMessage(client, frame.payload) catch break; + } + } +} + +const Frame = struct { + opcode: u8, + payload: []u8, +}; + +const RecvBuffer = struct { + buf: []u8, + start: usize = 0, + end: usize = 0, + + fn available(self: *RecvBuffer) []u8 { + return self.buf[self.start..self.end]; + } + + fn consume(self: *RecvBuffer, n: usize) void { + self.start += n; + if (self.start >= self.end) { + self.start = 0; + self.end = 0; + } + } + + fn ensureBytes(self: *RecvBuffer, client: posix.socket_t, needed: usize) bool { + while (self.end - self.start < needed) { + // Compact buffer if needed + if (self.end >= self.buf.len - 1024) { + const avail = self.end - self.start; + std.mem.copyForwards(u8, self.buf[0..avail], self.buf[self.start..self.end]); + self.start = 0; + self.end = avail; + } + + const n = posix.read(client, self.buf[self.end..]) catch return false; + if (n == 0) return false; + self.end += n; + } + return true; + } + + fn readFrame(self: *RecvBuffer, client: posix.socket_t) ?Frame { + // Need at least 2 bytes for basic header + if (!self.ensureBytes(client, 2)) return null; + + const data = self.available(); + const opcode = data[0] & 0x0F; + const masked = (data[1] & 0x80) != 0; + var payload_len: usize = data[1] & 0x7F; + var header_size: usize = 2; + + // Extended payload length + if (payload_len == 126) { + if (!self.ensureBytes(client, 4)) return null; + const d = self.available(); + payload_len = @as(usize, d[2]) << 8 | d[3]; + header_size = 4; + } else if (payload_len == 127) { + if (!self.ensureBytes(client, 10)) return null; + const d = self.available(); + payload_len = @as(usize, d[2]) << 56 | + @as(usize, d[3]) << 48 | + @as(usize, d[4]) << 40 | + @as(usize, d[5]) << 32 | + @as(usize, d[6]) << 24 | + @as(usize, d[7]) << 16 | + @as(usize, d[8]) << 8 | + d[9]; + header_size = 10; + } + + const mask_size: usize = if (masked) 4 else 0; + const total_frame_size = header_size + mask_size + payload_len; + + if (!self.ensureBytes(client, total_frame_size)) return null; + + const frame_data = self.available(); + + // Get mask key if present + var mask_key: [4]u8 = undefined; + if (masked) { + @memcpy(&mask_key, frame_data[header_size..][0..4]); + } + + // Get payload and unmask + const payload_start = header_size + mask_size; + const payload = frame_data[payload_start..][0..payload_len]; + + if (masked) { + for (payload, 0..) |*b, i| { + b.* ^= mask_key[i % 4]; + } + } + + self.consume(total_frame_size); + + return .{ .opcode = opcode, .payload = payload }; + } +}; + +fn handleTextMessage(client: posix.socket_t, payload: []const u8) !void { + // Command: force-close - close socket immediately without close frame + if (std.mem.eql(u8, payload, "force-close")) { + return error.ForceClose; + } + + // Command: send-large:N - send a message of N bytes + if (std.mem.startsWith(u8, payload, "send-large:")) { + const size_str = payload["send-large:".len..]; + const size = std.fmt.parseInt(usize, size_str, 10) catch return error.InvalidCommand; + try sendLargeMessage(client, size); + return; + } + + // Command: close:CODE:REASON - send close frame with specific code/reason + if (std.mem.startsWith(u8, payload, "close:")) { + const rest = payload["close:".len..]; + if (std.mem.indexOf(u8, rest, ":")) |sep| { + const code = std.fmt.parseInt(u16, rest[0..sep], 10) catch 1000; + const reason = rest[sep + 1 ..]; + try sendCloseFrame(client, code, reason); + } + return; + } + + // Default: echo with "echo-" prefix + const prefix = "echo-"; + try sendFrame(client, 1, prefix, payload); +} + +fn handleBinaryMessage(client: posix.socket_t, payload: []const u8) !void { + // Echo binary data back with byte 0xEE prepended as marker + const marker = [_]u8{0xEE}; + try sendFrame(client, 2, &marker, payload); +} + +fn sendFrame(client: posix.socket_t, opcode: u8, prefix: []const u8, payload: []const u8) !void { + const total_len = prefix.len + payload.len; + + // Build header + var header: [10]u8 = undefined; + var header_len: usize = 2; + + header[0] = 0x80 | opcode; // FIN + opcode + + if (total_len <= 125) { + header[1] = @intCast(total_len); + } else if (total_len <= 65535) { + header[1] = 126; + header[2] = @intCast((total_len >> 8) & 0xFF); + header[3] = @intCast(total_len & 0xFF); + header_len = 4; + } else { + header[1] = 127; + header[2] = @intCast((total_len >> 56) & 0xFF); + header[3] = @intCast((total_len >> 48) & 0xFF); + header[4] = @intCast((total_len >> 40) & 0xFF); + header[5] = @intCast((total_len >> 32) & 0xFF); + header[6] = @intCast((total_len >> 24) & 0xFF); + header[7] = @intCast((total_len >> 16) & 0xFF); + header[8] = @intCast((total_len >> 8) & 0xFF); + header[9] = @intCast(total_len & 0xFF); + header_len = 10; + } + + _ = try posix.write(client, header[0..header_len]); + if (prefix.len > 0) { + _ = try posix.write(client, prefix); + } + if (payload.len > 0) { + _ = try posix.write(client, payload); + } +} + +fn sendLargeMessage(client: posix.socket_t, size: usize) !void { + // Build header + var header: [10]u8 = undefined; + var header_len: usize = 2; + + header[0] = 0x81; // FIN + text + + if (size <= 125) { + header[1] = @intCast(size); + } else if (size <= 65535) { + header[1] = 126; + header[2] = @intCast((size >> 8) & 0xFF); + header[3] = @intCast(size & 0xFF); + header_len = 4; + } else { + header[1] = 127; + header[2] = @intCast((size >> 56) & 0xFF); + header[3] = @intCast((size >> 48) & 0xFF); + header[4] = @intCast((size >> 40) & 0xFF); + header[5] = @intCast((size >> 32) & 0xFF); + header[6] = @intCast((size >> 24) & 0xFF); + header[7] = @intCast((size >> 16) & 0xFF); + header[8] = @intCast((size >> 8) & 0xFF); + header[9] = @intCast(size & 0xFF); + header_len = 10; + } + + _ = try posix.write(client, header[0..header_len]); + + // Send payload in chunks - pattern of 'A'-'Z' repeating + var sent: usize = 0; + var chunk: [4096]u8 = undefined; + while (sent < size) { + const to_send = @min(chunk.len, size - sent); + for (chunk[0..to_send], 0..) |*b, i| { + b.* = @intCast('A' + ((sent + i) % 26)); + } + _ = try posix.write(client, chunk[0..to_send]); + sent += to_send; + } +} + +fn sendCloseFrame(client: posix.socket_t, code: u16, reason: []const u8) !void { + const reason_len = @min(reason.len, 123); // Max 123 bytes for reason + const payload_len = 2 + reason_len; + + var frame: [129]u8 = undefined; // 2 header + 2 code + 123 reason + 2 padding + frame[0] = 0x88; // FIN + close + frame[1] = @intCast(payload_len); + frame[2] = @intCast((code >> 8) & 0xFF); + frame[3] = @intCast(code & 0xFF); + if (reason_len > 0) { + @memcpy(frame[4..][0..reason_len], reason[0..reason_len]); + } + + _ = try posix.write(client, frame[0 .. 4 + reason_len]); +} diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 6247e323..59354f11 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -28,6 +28,7 @@ const URL = @import("URL.zig"); const Config = @import("../Config.zig"); const Notification = @import("../Notification.zig"); const CookieJar = @import("webapi/storage/Cookie.zig").Jar; +const WebSocket = @import("webapi/net/WebSocket.zig"); const http = @import("../network/http.zig"); const Network = @import("../network/Network.zig"); @@ -116,6 +117,8 @@ obey_robots: bool, cdp_client: ?CDPClient = null, +max_response_size: usize, + // libcurl can monitor arbitrary sockets, this lets us use libcurl to poll // both HTTP data as well as messages from an CDP connection. // Furthermore, we have some tension between blocking scripts and request @@ -156,6 +159,7 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { .http_proxy = http_proxy, .tls_verify = network.config.tlsVerifyHost(), .obey_robots = network.config.obeyRobots(), + .max_response_size = network.config.httpMaxResponseSize() orelse std.math.maxInt(u32), }; return client; @@ -224,16 +228,18 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { while (n) |node| { n = node.next; const conn: *http.Connection = @fieldParentPtr("node", node); - var transfer = Transfer.fromConnection(conn) catch |err| { - // Let's cleanup what we can - self.removeConn(conn); - log.err(.http, "get private info", .{ .err = err, .source = "abort" }); - continue; - }; - if (comptime abort_all) { - transfer.kill(); - } else if (transfer.req.frame_id == frame_id) { - transfer.kill(); + switch (conn.transport) { + .http => |transfer| { + if ((comptime abort_all) or transfer.req.frame_id == frame_id) { + transfer.kill(); + } + }, + .websocket => |ws| { + if ((comptime abort_all) or ws._page._frame_id == frame_id) { + ws.kill(); + } + }, + .none => unreachable, } } } @@ -706,7 +712,6 @@ fn makeTransfer(self: *Client, req: Request) !*Transfer { .url = req.url, .req = req, .client = self, - .max_response_size = self.network.config.httpMaxResponseSize(), }; return transfer; } @@ -764,15 +769,11 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer // fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do // cleanup. But if things fail after `curl_multi_add_handle`, we expect // perfom to pickup the failure and cleanup. - self.in_use.append(&conn.node); - self.handles.add(conn) catch |err| { + self.trackConn(conn) catch |err| { transfer._conn = null; transfer.deinit(); - self.in_use.remove(&conn.node); - self.releaseConn(conn); return err; }; - self.active += 1; if (transfer.req.start_callback) |cb| { cb(Response.fromTransfer(transfer)) catch |err| { @@ -836,7 +837,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T // Also check on RecvError: proxy may send 407 with headers before // closing the connection (CONNECT tunnel not yet established). if (msg.err == null or msg.err.? == error.RecvError) { - transfer.detectAuthChallenge(&msg.conn); + transfer.detectAuthChallenge(msg.conn); } // In case of auth challenge @@ -938,7 +939,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T if (!transfer._header_done_called) { // In case of request w/o data, we need to call the header done // callback now. - const proceed = try transfer.headerDoneCallback(&msg.conn); + const proceed = try transfer.headerDoneCallback(msg.conn); if (!proceed) { transfer.requestFailed(error.Abort, true); return true; @@ -984,30 +985,63 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T fn processMessages(self: *Client) !bool { var processed = false; - while (self.handles.readMessage()) |msg| { - const transfer = try Transfer.fromConnection(&msg.conn); - const done = self.processOneMessage(msg, transfer) catch |err| blk: { - log.err(.http, "process_messages", .{ .err = err, .req = transfer }); - transfer.requestFailed(err, true); - if (transfer._detached_conn) |c| { - // Conn was removed from handles during redirect reconfiguration - // but not re-added. Release it directly to avoid double-remove. - self.in_use.remove(&c.node); - self.active -= 1; - self.releaseConn(c); - transfer._detached_conn = null; - } - break :blk true; - }; - if (done) { - transfer.deinit(); - processed = true; + while (try self.handles.readMessage()) |msg| { + switch (msg.conn.transport) { + .http => |transfer| { + const done = self.processOneMessage(msg, transfer) catch |err| blk: { + log.err(.http, "process_messages", .{ .err = err, .req = transfer }); + transfer.requestFailed(err, true); + if (transfer._detached_conn) |c| { + // Conn was removed from handles during redirect reconfiguration + // but not re-added. Release it directly to avoid double-remove. + self.in_use.remove(&c.node); + self.active -= 1; + self.releaseConn(c); + transfer._detached_conn = null; + } + break :blk true; + }; + if (done) { + transfer.deinit(); + processed = true; + } + }, + .websocket => |ws| { + if (msg.err) |err| switch (err) { + error.GotNothing => ws.disconnected(null), + else => ws.disconnected(err), + } else { + // Clean close - no error + ws.disconnected(null); + } + + return true; + }, + .none => unreachable, } } return processed; } -fn removeConn(self: *Client, conn: *http.Connection) void { +pub fn trackConn(self: *Client, conn: *http.Connection) !void { + self.in_use.append(&conn.node); + // Set private pointer so readMessage can find the Connection. + // Must be done each time since curl_easy_reset clears it when + // connections are returned to pool. + conn.setPrivate(conn) catch |err| { + self.in_use.remove(&conn.node); + self.releaseConn(conn); + return err; + }; + self.handles.add(conn) catch |err| { + self.in_use.remove(&conn.node); + self.releaseConn(conn); + return err; + }; + self.active += 1; +} + +pub fn removeConn(self: *Client, conn: *http.Connection) void { self.in_use.remove(&conn.node); self.active -= 1; if (self.handles.remove(conn)) { @@ -1040,7 +1074,6 @@ pub const Request = struct { resource_type: ResourceType, credentials: ?[:0]const u8 = null, notification: *Notification, - max_response_size: ?usize = null, // This is only relevant for intercepted requests. If a request is flagged // as blocking AND is intercepted, then it'll be up to us to wait until @@ -1168,8 +1201,6 @@ pub const Transfer = struct { aborted: bool = false, - max_response_size: ?usize = null, - // We'll store the response header here response_header: ?ResponseHead = null, @@ -1300,7 +1331,7 @@ pub const Transfer = struct { const req = &self.req; // Set callbacks and per-client settings on the pooled connection. - try conn.setCallbacks(Transfer.dataCallback); + try conn.setWriteCallback(Transfer.dataCallback); try conn.setFollowLocation(false); try conn.setProxy(client.http_proxy); try conn.setTlsVerify(client.tls_verify, client.use_proxy); @@ -1328,7 +1359,7 @@ pub const Transfer = struct { try conn.setCookies(@ptrCast(cookies.ptr)); } - try conn.setPrivate(self); + conn.transport = .{ .http = self }; // add credentials if (req.credentials) |creds| { @@ -1528,11 +1559,9 @@ pub const Transfer = struct { } } - if (transfer.max_response_size) |max_size| { - if (transfer.getContentLength()) |cl| { - if (cl > max_size) { - return error.ResponseTooLarge; - } + if (transfer.getContentLength()) |cl| { + if (cl > transfer.client.max_response_size) { + return error.ResponseTooLarge; } } @@ -1605,10 +1634,7 @@ pub const Transfer = struct { } const conn: *http.Connection = @ptrCast(@alignCast(data)); - var transfer = fromConnection(conn) catch |err| { - log.err(.http, "get private info", .{ .err = err, .source = "body callback" }); - return http.writefunc_error; - }; + var transfer = conn.transport.http; if (!transfer._first_data_received) { transfer._first_data_received = true; @@ -1625,11 +1651,9 @@ pub const Transfer = struct { // Pre-size buffer from Content-Length. if (transfer.getContentLength()) |cl| { - if (transfer.max_response_size) |max_size| { - if (cl > max_size) { - transfer._callback_error = error.ResponseTooLarge; - return http.writefunc_error; - } + if (cl > transfer.client.max_response_size) { + transfer._callback_error = error.ResponseTooLarge; + return http.writefunc_error; } transfer._stream_buffer.ensureTotalCapacity(transfer.arena.allocator(), cl) catch {}; } @@ -1638,11 +1662,9 @@ pub const Transfer = struct { if (transfer._skip_body) return @intCast(chunk_len); transfer.bytes_received += chunk_len; - if (transfer.max_response_size) |max_size| { - if (transfer.bytes_received > max_size) { - transfer._callback_error = error.ResponseTooLarge; - return http.writefunc_error; - } + if (transfer.bytes_received > transfer.client.max_response_size) { + transfer._callback_error = error.ResponseTooLarge; + return http.writefunc_error; } const chunk = buffer[0..chunk_len]; @@ -1671,11 +1693,6 @@ pub const Transfer = struct { return .{ .list = .{ .list = self.response_header.?._injected_headers } }; } - fn fromConnection(conn: *const http.Connection) !*Transfer { - const private = try conn.getPrivate(); - return @ptrCast(@alignCast(private)); - } - pub fn fulfill(transfer: *Transfer, status: u16, headers: []const http.Header, body: ?[]const u8) !void { if (transfer._conn != null) { // should never happen, should have been intercepted/paused, and then diff --git a/src/browser/js/bridge.zig b/src/browser/js/bridge.zig index 0a51327e..8fbdc315 100644 --- a/src/browser/js/bridge.zig +++ b/src/browser/js/bridge.zig @@ -829,6 +829,8 @@ pub const JsApis = flattenTypes(&.{ @import("../webapi/net/URLSearchParams.zig"), @import("../webapi/net/XMLHttpRequest.zig"), @import("../webapi/net/XMLHttpRequestEventTarget.zig"), + @import("../webapi/net/WebSocket.zig"), + @import("../webapi/event/CloseEvent.zig"), @import("../webapi/streams/ReadableStream.zig"), @import("../webapi/streams/ReadableStreamDefaultReader.zig"), @import("../webapi/streams/ReadableStreamDefaultController.zig"), diff --git a/src/browser/tests/net/websocket.html b/src/browser/tests/net/websocket.html new file mode 100644 index 00000000..8ad03a70 --- /dev/null +++ b/src/browser/tests/net/websocket.html @@ -0,0 +1,240 @@ + + + + + + + + + + + + + + + + + + diff --git a/src/browser/tests/net/websocket2.html b/src/browser/tests/net/websocket2.html new file mode 100644 index 00000000..d421867e --- /dev/null +++ b/src/browser/tests/net/websocket2.html @@ -0,0 +1,233 @@ + + + + + + + + + + + + + + + + + + diff --git a/src/browser/tests/net/websocket3.html b/src/browser/tests/net/websocket3.html new file mode 100644 index 00000000..12dc19cc --- /dev/null +++ b/src/browser/tests/net/websocket3.html @@ -0,0 +1,77 @@ + + + + + + + + diff --git a/src/browser/webapi/Event.zig b/src/browser/webapi/Event.zig index b48bc059..b573bfc7 100644 --- a/src/browser/webapi/Event.zig +++ b/src/browser/webapi/Event.zig @@ -80,6 +80,7 @@ pub const Type = union(enum) { promise_rejection_event: *@import("event/PromiseRejectionEvent.zig"), submit_event: *@import("event/SubmitEvent.zig"), form_data_event: *@import("event/FormDataEvent.zig"), + close_event: *@import("event/CloseEvent.zig"), }; pub const Options = struct { @@ -171,6 +172,7 @@ pub fn is(self: *Event, comptime T: type) ?*T { .promise_rejection_event => |e| return if (T == @import("event/PromiseRejectionEvent.zig")) e else null, .submit_event => |e| return if (T == @import("event/SubmitEvent.zig")) e else null, .form_data_event => |e| return if (T == @import("event/FormDataEvent.zig")) e else null, + .close_event => |e| return if (T == @import("event/CloseEvent.zig")) e else null, .ui_event => |e| { if (T == @import("event/UIEvent.zig")) { return e; diff --git a/src/browser/webapi/EventTarget.zig b/src/browser/webapi/EventTarget.zig index 704efeb3..60dfbf11 100644 --- a/src/browser/webapi/EventTarget.zig +++ b/src/browser/webapi/EventTarget.zig @@ -45,6 +45,7 @@ pub const Type = union(enum) { visual_viewport: *@import("VisualViewport.zig"), file_reader: *@import("FileReader.zig"), font_face_set: *@import("css/FontFaceSet.zig"), + websocket: *@import("net/WebSocket.zig"), }; pub fn init(page: *Page) !*EventTarget { @@ -141,6 +142,7 @@ pub fn format(self: *EventTarget, writer: *std.Io.Writer) !void { .visual_viewport => writer.writeAll(""), .file_reader => writer.writeAll(""), .font_face_set => writer.writeAll(""), + .websocket => writer.writeAll(""), }; } @@ -160,6 +162,7 @@ pub fn toString(self: *EventTarget) []const u8 { .visual_viewport => return "[object VisualViewport]", .file_reader => return "[object FileReader]", .font_face_set => return "[object FontFaceSet]", + .websocket => return "[object WebSocket]", }; } diff --git a/src/browser/webapi/MessagePort.zig b/src/browser/webapi/MessagePort.zig index 51d208b0..a7bb9bfc 100644 --- a/src/browser/webapi/MessagePort.zig +++ b/src/browser/webapi/MessagePort.zig @@ -125,7 +125,7 @@ const PostMessageCallback = struct { const target = self.port.asEventTarget(); if (page._event_manager.hasDirectListeners(target, "message", self.port._on_message)) { const event = (MessageEvent.initTrusted(comptime .wrap("message"), .{ - .data = self.message, + .data = .{ .value = self.message }, .origin = "", .source = null, }, page) catch |err| { diff --git a/src/browser/webapi/Window.zig b/src/browser/webapi/Window.zig index fb3ec8f8..621e1e3a 100644 --- a/src/browser/webapi/Window.zig +++ b/src/browser/webapi/Window.zig @@ -791,7 +791,7 @@ const PostMessageCallback = struct { const event_target = window.asEventTarget(); if (page._event_manager.hasDirectListeners(event_target, "message", window._on_message)) { const event = (try MessageEvent.initTrusted(comptime .wrap("message"), .{ - .data = self.message, + .data = .{ .value = self.message }, .origin = self.origin, .source = self.source, .bubbles = false, diff --git a/src/browser/webapi/event/CloseEvent.zig b/src/browser/webapi/event/CloseEvent.zig new file mode 100644 index 00000000..aa9f1d2b --- /dev/null +++ b/src/browser/webapi/event/CloseEvent.zig @@ -0,0 +1,102 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const String = @import("../../../string.zig").String; + +const Page = @import("../../Page.zig"); +const Session = @import("../../Session.zig"); +const Event = @import("../Event.zig"); +const Allocator = std.mem.Allocator; + +const CloseEvent = @This(); +_proto: *Event, +_code: u16 = 1000, +_reason: []const u8 = "", +_was_clean: bool = true, + +const CloseEventOptions = struct { + code: u16 = 1000, + reason: []const u8 = "", + wasClean: bool = true, +}; + +const Options = Event.inheritOptions(CloseEvent, CloseEventOptions); + +pub fn init(typ: []const u8, _opts: ?Options, page: *Page) !*CloseEvent { + const arena = try page.getArena(.{ .debug = "CloseEvent" }); + errdefer page.releaseArena(arena); + const type_string = try String.init(arena, typ, .{}); + return initWithTrusted(arena, type_string, _opts, false, page); +} + +pub fn initTrusted(typ: String, _opts: ?Options, page: *Page) !*CloseEvent { + const arena = try page.getArena(.{ .debug = "CloseEvent.trusted" }); + errdefer page.releaseArena(arena); + return initWithTrusted(arena, typ, _opts, true, page); +} + +fn initWithTrusted(arena: Allocator, typ: String, _opts: ?Options, trusted: bool, page: *Page) !*CloseEvent { + const opts = _opts orelse Options{}; + + const event = try page._factory.event( + arena, + typ, + CloseEvent{ + ._proto = undefined, + ._code = opts.code, + ._reason = if (opts.reason.len > 0) try arena.dupe(u8, opts.reason) else "", + ._was_clean = opts.wasClean, + }, + ); + + Event.populatePrototypes(event, opts, trusted); + return event; +} + +pub fn asEvent(self: *CloseEvent) *Event { + return self._proto; +} + +pub fn getCode(self: *const CloseEvent) u16 { + return self._code; +} + +pub fn getReason(self: *const CloseEvent) []const u8 { + return self._reason; +} + +pub fn getWasClean(self: *const CloseEvent) bool { + return self._was_clean; +} + +pub const JsApi = struct { + const js = @import("../../js/js.zig"); + pub const bridge = js.Bridge(CloseEvent); + + pub const Meta = struct { + pub const name = "CloseEvent"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(CloseEvent.init, .{}); + pub const code = bridge.accessor(CloseEvent.getCode, null, .{}); + pub const reason = bridge.accessor(CloseEvent.getReason, null, .{}); + pub const wasClean = bridge.accessor(CloseEvent.getWasClean, null, .{}); +}; diff --git a/src/browser/webapi/event/MessageEvent.zig b/src/browser/webapi/event/MessageEvent.zig index 03530400..66ffd8c6 100644 --- a/src/browser/webapi/event/MessageEvent.zig +++ b/src/browser/webapi/event/MessageEvent.zig @@ -30,16 +30,22 @@ const Allocator = std.mem.Allocator; const MessageEvent = @This(); _proto: *Event, -_data: ?js.Value.Temp = null, +_data: ?Data = null, _origin: []const u8 = "", _source: ?*Window = null, const MessageEventOptions = struct { - data: ?js.Value.Temp = null, + data: ?Data = null, origin: ?[]const u8 = null, source: ?*Window = null, }; +pub const Data = union(enum) { + value: js.Value.Temp, + string: []const u8, + arraybuffer: js.ArrayBuffer, +}; + const Options = Event.inheritOptions(MessageEvent, MessageEventOptions); pub fn init(typ: []const u8, opts_: ?Options, page: *Page) !*MessageEvent { @@ -75,7 +81,10 @@ fn initWithTrusted(arena: Allocator, typ: String, opts_: ?Options, trusted: bool pub fn deinit(self: *MessageEvent, session: *Session) void { if (self._data) |d| { - d.release(); + switch (d) { + .value => |js_val| js_val.release(), + .string, .arraybuffer => {}, + } } self._proto.deinit(session); } @@ -92,7 +101,7 @@ pub fn asEvent(self: *MessageEvent) *Event { return self._proto; } -pub fn getData(self: *const MessageEvent) ?js.Value.Temp { +pub fn getData(self: *const MessageEvent) ?Data { return self._data; } diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig new file mode 100644 index 00000000..1a07dcf6 --- /dev/null +++ b/src/browser/webapi/net/WebSocket.zig @@ -0,0 +1,687 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const lp = @import("lightpanda"); + +const log = @import("../../../log.zig"); +const http = @import("../../../network/http.zig"); + +const js = @import("../../js/js.zig"); +const Blob = @import("../Blob.zig"); +const URL = @import("../../URL.zig"); +const Page = @import("../../Page.zig"); +const Session = @import("../../Session.zig"); +const HttpClient = @import("../../HttpClient.zig"); + +const Event = @import("../Event.zig"); +const EventTarget = @import("../EventTarget.zig"); +const MessageEvent = @import("../event/MessageEvent.zig"); +const CloseEvent = @import("../event/CloseEvent.zig"); + +const Allocator = std.mem.Allocator; +const IS_DEBUG = @import("builtin").mode == .Debug; + +const WebSocket = @This(); +_rc: lp.RC(u8) = .{}, +_page: *Page, +_proto: *EventTarget, +_arena: Allocator, + +// Connection state +_ready_state: ReadyState = .connecting, +_url: [:0]const u8 = "", +_binary_type: BinaryType = .blob, + +// Handshake tracking +_got_101: bool = false, +_got_upgrade: bool = false, + +_conn: ?*http.Connection, +_http_client: *HttpClient, + +// buffered outgoing messages +_send_queue: std.ArrayList(Message) = .empty, +_send_offset: usize = 0, + +// buffered incoming frame +_recv_buffer: std.ArrayList(u8) = .empty, + +// close info for event dispatch +_close_code: u16 = 1000, +_close_reason: []const u8 = "", + +// Event handlers +_on_open: ?js.Function.Temp = null, +_on_message: ?js.Function.Temp = null, +_on_error: ?js.Function.Temp = null, +_on_close: ?js.Function.Temp = null, + +pub const ReadyState = enum(u8) { + connecting = 0, + open = 1, + closing = 2, + closed = 3, +}; + +pub const BinaryType = enum { + blob, + arraybuffer, +}; + +pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket { + if (protocols_) |protocols| { + if (protocols.len > 0) { + log.warn(.not_implemented, "WS protocols", .{ .protocols = protocols }); + } + } + + { + if (url.len < 6) { + return error.SyntaxError; + } + const normalized_start = std.ascii.lowerString(&page.buf, url[0..6]); + if (!std.mem.startsWith(u8, normalized_start, "ws://") and !std.mem.startsWith(u8, normalized_start, "wss://")) { + return error.SyntaxError; + } + } + + const arena = try page.getArena(.{ .debug = "WebSocket" }); + errdefer page.releaseArena(arena); + + const resolved_url = try URL.resolve(arena, page.base(), url, .{ .always_dupe = true, .encode = true }); + + const http_client = page._session.browser.http_client; + const conn = http_client.network.getConnection() orelse { + // TODO: figure out how/where we actually want to get WebSocket connections + // from. I feel like sharing this with the HTTP Connection Pool is a + // mistake. + return error.NoFreeConnection; + }; + + errdefer http_client.network.releaseConnection(conn); + + try conn.setURL(resolved_url); + try conn.setConnectOnly(false); + + try conn.setReadCallback(sendDataCallback, true); + try conn.setWriteCallback(receivedDataCallback); + try conn.setHeaderCallback(receivedHeaderCalllback); + + const self = try page._factory.eventTargetWithAllocator(arena, WebSocket{ + ._page = page, + ._conn = conn, + ._arena = arena, + ._proto = undefined, + ._url = resolved_url, + ._http_client = http_client, + }); + conn.transport = .{ .websocket = self }; + try http_client.trackConn(conn); + + if (comptime IS_DEBUG) { + log.info(.http, "WS connecting", .{ .url = url }); + } + + // Unlike an XHR object where we only selectively reference the instance + // while the request is actually inflight, WS connection is "inflight" from + // the moment it's created. + self.acquireRef(); + + return self; +} + +pub fn deinit(self: *WebSocket, session: *Session) void { + self.cleanup(); + + if (self._on_open) |func| { + func.release(); + } + if (self._on_message) |func| { + func.release(); + } + if (self._on_error) |func| { + func.release(); + } + if (self._on_close) |func| { + func.release(); + } + + for (self._send_queue.items) |msg| { + msg.deinit(session); + } + + session.releaseArena(self._arena); +} + +// we're being aborted internally (e.g. page shutting down) +pub fn kill(self: *WebSocket) void { + self.cleanup(); +} + +pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { + const was_clean = self._ready_state == .closing and err_ == null; + self._ready_state = .closed; + + if (err_) |err| { + log.warn(.http, "WS disconnected", .{ .err = err, .url = self._url }); + } else { + log.info(.http, "WS disconnected", .{ .url = self._url, .reason = "closed" }); + } + + self.cleanup(); + + // Use 1006 (abnormal closure) if connection wasn't cleanly closed + const code = if (was_clean) self._close_code else 1006; + const reason = if (was_clean) self._close_reason else ""; + + self.dispatchCloseEvent(code, reason, was_clean) catch |err| { + log.err(.http, "WS close event dispatch failed", .{ .err = err }); + }; +} + +fn cleanup(self: *WebSocket) void { + if (self._conn) |conn| { + self._http_client.removeConn(conn); + self._conn = null; + self.releaseRef(self._page._session); + } +} + +pub fn releaseRef(self: *WebSocket, session: *Session) void { + self._rc.release(self, session); +} + +pub fn acquireRef(self: *WebSocket) void { + self._rc.acquire(); +} + +fn asEventTarget(self: *WebSocket) *EventTarget { + return self._proto; +} + +fn queueMessage(self: *WebSocket, msg: Message) !void { + const was_empty = self._send_queue.items.len == 0; + try self._send_queue.append(self._arena, msg); + + if (was_empty) { + // Unpause the send callback so libcurl will request data + if (self._conn) |conn| { + try conn.pause(.{ .cont = true }); + } + } +} + +/// WebSocket send() accepts string, Blob, ArrayBuffer, or TypedArray +const SendData = union(enum) { + blob: *Blob, + js_val: js.Value, +}; + +/// Union for extracting bytes from ArrayBuffer/TypedArray +const BinaryData = union(enum) { + int8: []i8, + uint8: []u8, + int16: []i16, + uint16: []u16, + int32: []i32, + uint32: []u32, + int64: []i64, + uint64: []u64, + + fn asBuffer(self: BinaryData) []u8 { + return switch (self) { + .int8 => |b| @as([*]u8, @ptrCast(b.ptr))[0..b.len], + .uint8 => |b| b, + .int16 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 2], + .uint16 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 2], + .int32 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 4], + .uint32 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 4], + .int64 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 8], + .uint64 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 8], + }; + } +}; + +pub fn send(self: *WebSocket, data: SendData) !void { + if (self._ready_state != .open) { + return error.InvalidStateError; + } + + // Get a dedicated arena for this message + const arena = try self._page._session.getArena(.{ .debug = "WebSocket message" }); + errdefer self._page._session.releaseArena(arena); + + switch (data) { + .blob => |blob| { + try self.queueMessage(.{ .binary = .{ + .arena = arena, + .data = try arena.dupe(u8, blob._slice), + } }); + }, + .js_val => |js_val| { + if (js_val.isString()) |str| { + try self.queueMessage(.{ .text = .{ + .arena = arena, + .data = try str.toSliceWithAlloc(arena), + } }); + } else { + const binary = try js_val.toZig(BinaryData); + try self.queueMessage(.{ .binary = .{ + .arena = arena, + .data = try arena.dupe(u8, binary.asBuffer()), + } }); + } + }, + } +} + +pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void { + if (self._ready_state == .closing or self._ready_state == .closed) { + return; + } + + const code = code_ orelse 1000; + const reason = reason_ orelse ""; + + if (self._ready_state == .connecting) { + // Connection not yet established - fail it + self._ready_state = .closed; + self.cleanup(); + try self.dispatchCloseEvent(code, reason, false); + return; + } + + self._ready_state = .closing; + self._close_code = code; + self._close_reason = try self._arena.dupe(u8, reason); + try self.queueMessage(.close); +} + +pub fn getUrl(self: *const WebSocket) []const u8 { + return self._url; +} + +pub fn getReadyState(self: *const WebSocket) u16 { + return @intFromEnum(self._ready_state); +} + +pub fn getBufferedAmount(self: *const WebSocket) u32 { + var buffered: u32 = 0; + for (self._send_queue.items) |msg| { + switch (msg) { + .text, .binary => |byte_msg| buffered += @intCast(byte_msg.data.len), + .close => buffered += @intCast(2 + self._close_reason.len), + } + } + return buffered; +} + +pub fn getProtocol(self: *const WebSocket) []const u8 { + return self._protocol; +} + +pub fn getExtensions(self: *const WebSocket) []const u8 { + return self._extensions; +} + +pub fn getBinaryType(self: *const WebSocket) []const u8 { + return @tagName(self._binary_type); +} + +pub fn setBinaryType(self: *WebSocket, value: []const u8) void { + if (std.meta.stringToEnum(BinaryType, value)) |bt| { + self._binary_type = bt; + } +} + +pub fn getOnOpen(self: *const WebSocket) ?js.Function.Temp { + return self._on_open; +} + +pub fn setOnOpen(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_open) |old| old.release(); + if (cb_) |cb| { + self._on_open = try cb.tempWithThis(self); + } else { + self._on_open = null; + } +} + +pub fn getOnMessage(self: *const WebSocket) ?js.Function.Temp { + return self._on_message; +} + +pub fn setOnMessage(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_message) |old| old.release(); + if (cb_) |cb| { + self._on_message = try cb.tempWithThis(self); + } else { + self._on_message = null; + } +} + +pub fn getOnError(self: *const WebSocket) ?js.Function.Temp { + return self._on_error; +} + +pub fn setOnError(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_error) |old| old.release(); + if (cb_) |cb| { + self._on_error = try cb.tempWithThis(self); + } else { + self._on_error = null; + } +} + +pub fn getOnClose(self: *const WebSocket) ?js.Function.Temp { + return self._on_close; +} + +pub fn setOnClose(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_close) |old| old.release(); + if (cb_) |cb| { + self._on_close = try cb.tempWithThis(self); + } else { + self._on_close = null; + } +} + +fn dispatchOpenEvent(self: *WebSocket) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "open", self._on_open)) { + const event = try Event.initTrusted(.wrap("open"), .{}, page); + try page._event_manager.dispatchDirect(target, event, self._on_open, .{ .context = "WebSocket open" }); + } +} + +fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "message", self._on_message)) { + const msg_data: MessageEvent.Data = if (frame_type == .binary and self._binary_type == .arraybuffer) + .{ .arraybuffer = .{ .values = data } } + else + .{ .string = data }; + + const event = try MessageEvent.initTrusted(.wrap("message"), .{ + .data = msg_data, + .origin = "", + }, page); + try page._event_manager.dispatchDirect(target, event.asEvent(), self._on_message, .{ .context = "WebSocket message" }); + } +} + +fn dispatchCloseEvent(self: *WebSocket, code: u16, reason: []const u8, was_clean: bool) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "close", self._on_close)) { + const event = try CloseEvent.initTrusted(.wrap("close"), .{ + .code = code, + .reason = reason, + .wasClean = was_clean, + }, page); + try page._event_manager.dispatchDirect(target, event.asEvent(), self._on_close, .{ .context = "WebSocket close" }); + } +} + +fn sendDataCallback(buffer: [*]u8, buf_count: usize, buf_len: usize, data: *anyopaque) usize { + if (comptime IS_DEBUG) { + std.debug.assert(buf_count == 1); + } + const conn: *http.Connection = @ptrCast(@alignCast(data)); + return _sendDataCallback(conn, buffer[0..buf_len]) catch |err| { + log.warn(.http, "WS send callback", .{ .err = err }); + return http.readfunc_pause; + }; +} + +fn _sendDataCallback(conn: *http.Connection, buf: []u8) !usize { + lp.assert(buf.len >= 2, "WS short buffer", .{ .len = buf.len }); + + const self = conn.transport.websocket; + + if (self._send_queue.items.len == 0) { + // No data to send - pause until queueMessage is called + return http.readfunc_pause; + } + + const msg = &self._send_queue.items[0]; + + switch (msg.*) { + .close => { + const code = self._close_code; + const reason = self._close_reason; + + // Close frame: 2 bytes for code (big-endian) + optional reason + // Truncate reason to fit in buf (max 123 bytes per spec) + const reason_len: usize = @min(reason.len, 123, buf.len -| 2); + const frame_len = 2 + reason_len; + const to_copy = @min(buf.len, frame_len); + + var close_payload: [125]u8 = undefined; + close_payload[0] = @intCast((code >> 8) & 0xFF); + close_payload[1] = @intCast(code & 0xFF); + if (reason_len > 0) { + @memcpy(close_payload[2..][0..reason_len], reason[0..reason_len]); + } + + try conn.wsStartFrame(.close, to_copy); + @memcpy(buf[0..to_copy], close_payload[0..to_copy]); + + _ = self._send_queue.orderedRemove(0); + return to_copy; + }, + .text => |content| return self.writeContent(conn, buf, content, .text), + .binary => |content| return self.writeContent(conn, buf, content, .binary), + } +} + +fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: Message.Content, frame_type: http.WsFrameType) !usize { + if (self._send_offset == 0) { + // start of the message + try conn.wsStartFrame(frame_type, byte_msg.data.len); + } + + const remaining = byte_msg.data[self._send_offset..]; + const to_copy = @min(remaining.len, buf.len); + @memcpy(buf[0..to_copy], remaining[0..to_copy]); + + self._send_offset += to_copy; + + if (self._send_offset >= byte_msg.data.len) { + const removed = self._send_queue.orderedRemove(0); + removed.deinit(self._page._session); + self._send_offset = 0; + } + + return to_copy; +} + +fn receivedDataCallback(buffer: [*]const u8, buf_count: usize, buf_len: usize, data: *anyopaque) usize { + if (comptime IS_DEBUG) { + std.debug.assert(buf_count == 1); + } + const conn: *http.Connection = @ptrCast(@alignCast(data)); + _receivedDataCallback(conn, buffer[0..buf_len]) catch |err| { + log.warn(.http, "WS receive callback", .{ .err = err }); + // TODO: are there errors, like an invalid frame, that we shouldn't treat + // as an error? + return http.writefunc_error; + }; + + return buf_len; +} + +fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { + const self = conn.transport.websocket; + const meta = conn.wsMeta() orelse { + log.err(.http, "WS missing meta", .{ .url = self._url }); + return error.NoFrameMeta; + }; + + if (meta.offset == 0) { + // Start of new frame. Pre-allocate buffer + self._recv_buffer.clearRetainingCapacity(); + if (meta.len > self._http_client.max_response_size) { + return error.MessageTooLarge; + } + try self._recv_buffer.ensureTotalCapacity(self._arena, meta.len); + } + + try self._recv_buffer.appendSlice(self._arena, data); + + if (meta.bytes_left > 0) { + // still more data waiting for this frame + return; + } + + const message = self._recv_buffer.items; + switch (meta.frame_type) { + .text, .binary => try self.dispatchMessageEvent(message, meta.frame_type), + .close => { + // Parse close frame: 2-byte code (big-endian) + optional reason + self._close_code = if (message.len >= 2) + @as(u16, message[0]) << 8 | message[1] + else + 1005; // No status code received + if (message.len > 2) { + self._close_reason = try self._arena.dupe(u8, message[2..]); + } + self._ready_state = .closing; + self.disconnected(null); + }, + .ping, .pong, .cont => {}, + } +} + +// libcurl has no mechanism to signal that the connection is established. The +// best option I could come up with was looking for an upgrade header response. +fn receivedHeaderCalllback(buffer: [*]const u8, header_count: usize, buf_len: usize, data: *anyopaque) usize { + if (comptime IS_DEBUG) { + std.debug.assert(header_count == 1); + } + const conn: *http.Connection = @ptrCast(@alignCast(data)); + const self = conn.transport.websocket; + const header = buffer[0..buf_len]; + + if (self._got_101 == false and std.mem.startsWith(u8, header, "HTTP/")) { + if (std.mem.indexOf(u8, header, " 101 ")) |_| { + self._got_101 = true; + } + return buf_len; + } + + // Empty line = end of headers + if (buf_len <= 2) { + if (!self._got_101 or !self._got_upgrade) { + return 0; + } + + self._ready_state = .open; + log.info(.http, "WS connected", .{ .url = self._url }); + + self.dispatchOpenEvent() catch |err| { + log.err(.http, "WS open event fail", .{ .err = err }); + }; + return buf_len; + } + + if (self._got_upgrade) { + // dont' care about headers once we've gotten the upgrade header + return buf_len; + } + + const colon = std.mem.indexOfScalarPos(u8, header, 0, ':') orelse { + // weird, continue... + return buf_len; + }; + + if (std.ascii.eqlIgnoreCase(header[0..colon], "upgrade") == false) { + return buf_len; + } + + const value = std.mem.trim(u8, header[colon + 1 ..], " \t\r\n"); + if (std.ascii.eqlIgnoreCase(value, "websocket")) { + self._got_upgrade = true; + } + + return buf_len; +} + +const Message = union(enum) { + close, + text: Content, + binary: Content, + + const Content = struct { + arena: Allocator, + data: []const u8, + }; + fn deinit(self: Message, session: *Session) void { + switch (self) { + .text, .binary => |msg| session.releaseArena(msg.arena), + .close => {}, + } + } +}; + +pub const JsApi = struct { + pub const bridge = js.Bridge(WebSocket); + + pub const Meta = struct { + pub const name = "WebSocket"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(WebSocket.init, .{ .dom_exception = true }); + + pub const CONNECTING = bridge.property(@intFromEnum(ReadyState.connecting), .{ .template = true }); + pub const OPEN = bridge.property(@intFromEnum(ReadyState.open), .{ .template = true }); + pub const CLOSING = bridge.property(@intFromEnum(ReadyState.closing), .{ .template = true }); + pub const CLOSED = bridge.property(@intFromEnum(ReadyState.closed), .{ .template = true }); + + pub const url = bridge.accessor(WebSocket.getUrl, null, .{}); + pub const readyState = bridge.accessor(WebSocket.getReadyState, null, .{}); + pub const bufferedAmount = bridge.accessor(WebSocket.getBufferedAmount, null, .{}); + pub const binaryType = bridge.accessor(WebSocket.getBinaryType, WebSocket.setBinaryType, .{}); + + pub const protocol = bridge.property("", .{ .template = false }); + pub const extensions = bridge.property("", .{ .template = false }); + + pub const onopen = bridge.accessor(WebSocket.getOnOpen, WebSocket.setOnOpen, .{}); + pub const onmessage = bridge.accessor(WebSocket.getOnMessage, WebSocket.setOnMessage, .{}); + pub const onerror = bridge.accessor(WebSocket.getOnError, WebSocket.setOnError, .{}); + pub const onclose = bridge.accessor(WebSocket.getOnClose, WebSocket.setOnClose, .{}); + + pub const send = bridge.function(WebSocket.send, .{ .dom_exception = true }); + pub const close = bridge.function(WebSocket.close, .{}); +}; + +const testing = @import("../../../testing.zig"); +test "WebApi: WebSocket" { + // TEMP since we're currently limited to 10 concurrent connections + try testing.htmlRunner("net/websocket.html", .{}); + try testing.htmlRunner("net/websocket2.html", .{}); + try testing.htmlRunner("net/websocket3.html", .{}); +} diff --git a/src/network/http.zig b/src/network/http.zig index 6dc217ea..94b06fb9 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -28,7 +28,9 @@ pub const ENABLE_DEBUG = false; pub const Blob = libcurl.CurlBlob; pub const WaitFd = libcurl.CurlWaitFd; +pub const readfunc_pause = libcurl.curl_readfunc_pause; pub const writefunc_error = libcurl.curl_writefunc_error; +pub const WsFrameType = libcurl.WsFrameType; const Error = libcurl.Error; @@ -222,15 +224,19 @@ pub const ResponseHead = struct { pub const Connection = struct { _easy: *libcurl.Curl, + transport: Transport, node: std.DoublyLinkedList.Node = .{}, - pub fn init( - ca_blob: ?libcurl.CurlBlob, - config: *const Config, - ) !Connection { + pub const Transport = union(enum) { + none, // used for cases that manage their own connection, e.g. telemetry + http: *@import("../browser/HttpClient.zig").Transfer, + websocket: *@import("../browser/webapi/net/WebSocket.zig"), + }; + + pub fn init(ca_blob: ?libcurl.CurlBlob, config: *const Config) !Connection { const easy = libcurl.curl_easy_init() orelse return error.FailedToInitializeEasy; - const self = Connection{ ._easy = easy }; + var self = Connection{ ._easy = easy, .transport = .none }; errdefer self.deinit(); try self.reset(config, ca_blob); @@ -310,7 +316,12 @@ pub const Connection = struct { try libcurl.curl_easy_setopt(self._easy, .user_pwd, creds.ptr); } - pub fn setCallbacks( + pub fn setConnectOnly(self: *const Connection, connect_only: bool) !void { + const value: c_long = if (connect_only) 2 else 0; + try libcurl.curl_easy_setopt(self._easy, .connect_only, value); + } + + pub fn setWriteCallback( self: *Connection, comptime data_cb: libcurl.CurlWriteFunction, ) !void { @@ -318,12 +329,49 @@ pub const Connection = struct { try libcurl.curl_easy_setopt(self._easy, .write_function, data_cb); } + pub fn setReadCallback( + self: *Connection, + comptime data_cb: libcurl.CurlReadFunction, + upload: bool, + ) !void { + try libcurl.curl_easy_setopt(self._easy, .read_data, self); + try libcurl.curl_easy_setopt(self._easy, .read_function, data_cb); + if (upload) { + try libcurl.curl_easy_setopt(self._easy, .upload, true); + } + } + + pub fn setHeaderCallback( + self: *Connection, + comptime data_cb: libcurl.CurlHeaderFunction, + ) !void { + try libcurl.curl_easy_setopt(self._easy, .header_data, self); + try libcurl.curl_easy_setopt(self._easy, .header_function, data_cb); + } + + pub const PauseFlags = packed struct { + red: bool = false, + green: bool = false, + blue: bool = false, + alpha: bool = false, + // Optional padding to match a specific size, e.g., a u32 + _padding: u28 = 0, + }; + + pub fn pause( + self: *Connection, + flags: libcurl.CurlPauseFlags, + ) !void { + try libcurl.curl_easy_pause(self._easy, flags); + } + pub fn reset( - self: *const Connection, + self: *Connection, config: *const Config, ca_blob: ?libcurl.CurlBlob, ) !void { libcurl.curl_easy_reset(self._easy); + self.transport = .none; // timeouts try libcurl.curl_easy_setopt(self._easy, .timeout_ms, config.httpTimeout()); @@ -460,12 +508,6 @@ pub const Connection = struct { }; } - pub fn getPrivate(self: *const Connection) !*anyopaque { - var private: *anyopaque = undefined; - try libcurl.curl_easy_getinfo(self._easy, .private, &private); - return private; - } - // These are headers that may not be send to the users for inteception. pub fn secretHeaders(_: *const Connection, headers: *Headers, http_headers: *const Config.HttpHeaders) !void { if (http_headers.proxy_bearer_header) |hdr| { @@ -482,6 +524,14 @@ pub const Connection = struct { try libcurl.curl_easy_perform(self._easy); return self.getResponseCode(); } + + pub fn wsStartFrame(self: *const Connection, frame_type: libcurl.WsFrameType, size: usize) !void { + try libcurl.curl_ws_start_frame(self._easy, frame_type, @intCast(size)); + } + + pub fn wsMeta(self: *const Connection) ?libcurl.WsFrameMeta { + return libcurl.curl_ws_meta(self._easy); + } }; pub const Handles = struct { @@ -519,17 +569,21 @@ pub const Handles = struct { } pub const MultiMessage = struct { - conn: Connection, + conn: *Connection, err: ?Error, }; - pub fn readMessage(self: *Handles) ?MultiMessage { + pub fn readMessage(self: *Handles) !?MultiMessage { var messages_count: c_int = 0; const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null; return switch (msg.data) { - .done => |err| .{ - .conn = .{ ._easy = msg.easy_handle }, - .err = err, + .done => |err| { + var private: *anyopaque = undefined; + try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private); + return .{ + .conn = @ptrCast(@alignCast(private)), + .err = err, + }; }, else => unreachable, }; diff --git a/src/sys/libcurl.zig b/src/sys/libcurl.zig index 0e2defe3..31587823 100644 --- a/src/sys/libcurl.zig +++ b/src/sys/libcurl.zig @@ -40,6 +40,8 @@ pub const CurlDebugFunction = fn (*Curl, CurlInfoType, [*c]u8, usize, *anyopaque pub const CurlHeaderFunction = fn ([*]const u8, usize, usize, *anyopaque) usize; pub const CurlWriteFunction = fn ([*]const u8, usize, usize, *anyopaque) usize; pub const curl_writefunc_error: usize = c.CURL_WRITEFUNC_ERROR; +pub const curl_readfunc_pause: usize = c.CURL_READFUNC_PAUSE; +pub const CurlReadFunction = fn ([*]u8, usize, usize, *anyopaque) usize; pub const FreeCallback = fn (ptr: ?*anyopaque) void; pub const StrdupCallback = fn (str: [*:0]const u8) ?[*:0]u8; @@ -98,6 +100,23 @@ pub const CurlWaitFd = extern struct { revents: CurlWaitEvents, }; +pub const CurlPauseFlags = packed struct(c_short) { + recv: bool = false, + send: bool = false, + all: bool = false, + cont: bool = false, + _reserved: u12 = 0, + + pub fn to_c(self: @This()) c_int { + var flags: c_int = 0; + if (self.recv) flags |= c.CURLPAUSE_RECV; + if (self.send) flags |= c.CURLPAUSE_SEND; + if (self.all) flags |= c.CURLPAUSE_ALL; + if (self.cont) flags |= c.CURLPAUSE_CONT; + return flags; + } +}; + comptime { const debug_cb_check: c.curl_debug_callback = struct { fn cb(handle: ?*Curl, msg_type: c.curl_infotype, raw: [*c]u8, len: usize, user: ?*anyopaque) callconv(.c) c_int { @@ -167,6 +186,10 @@ pub const CurlOption = enum(c.CURLoption) { header_function = c.CURLOPT_HEADERFUNCTION, write_data = c.CURLOPT_WRITEDATA, write_function = c.CURLOPT_WRITEFUNCTION, + read_data = c.CURLOPT_READDATA, + read_function = c.CURLOPT_READFUNCTION, + connect_only = c.CURLOPT_CONNECT_ONLY, + upload = c.CURLOPT_UPLOAD, }; pub const CurlMOption = enum(c.CURLMoption) { @@ -530,6 +553,7 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype const code = switch (option) { .verbose, .post, + .upload, .http_get, .ssl_verify_host, .ssl_verify_peer, @@ -551,6 +575,7 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype .max_redirs, .follow_location, .post_field_size, + .connect_only, => blk: { const n: c_long = switch (@typeInfo(@TypeOf(value))) { .comptime_int, .int => @intCast(value), @@ -593,6 +618,7 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype .private, .header_data, + .read_data, .write_data, => blk: { const ptr: ?*anyopaque = switch (@typeInfo(@TypeOf(value))) { @@ -631,6 +657,22 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype break :blk c.curl_easy_setopt(easy, opt, cb); }, + .read_function => blk: { + const cb: c.curl_write_callback = switch (@typeInfo(@TypeOf(value))) { + .null => null, + .@"fn" => |info| struct { + fn cb(buffer: [*c]u8, count: usize, len: usize, user: ?*anyopaque) callconv(.c) usize { + const user_arg = if (@typeInfo(info.params[3].type.?) == .optional) + user + else + user orelse unreachable; + return value(@ptrCast(buffer), count, len, user_arg); + } + }.cb, + else => @compileError("expected Zig function or null for " ++ @tagName(option) ++ ", got " ++ @typeName(@TypeOf(value))), + }; + break :blk c.curl_easy_setopt(easy, opt, cb); + }, .write_function => blk: { const cb: c.curl_write_callback = switch (@typeInfo(@TypeOf(value))) { .null => null, @@ -677,6 +719,10 @@ pub fn curl_easy_getinfo(easy: *Curl, comptime info: CurlInfo, out: anytype) Err try errorCheck(code); } +pub fn curl_easy_pause(easy: *Curl, flags: CurlPauseFlags) Error!void { + try errorCheck(c.curl_easy_pause(easy, flags.to_c())); +} + pub fn curl_easy_header( easy: *Curl, name: [*:0]const u8, @@ -804,3 +850,79 @@ pub fn curl_slist_free_all(list: ?*CurlSList) void { c.curl_slist_free_all(ptr); } } + +// WebSocket support (requires libcurl 7.86.0+) +pub const WsFrameType = enum { + text, + binary, + cont, + close, + ping, + pong, + + fn toInt(self: WsFrameType) c_uint { + return switch (self) { + .text => c.CURLWS_TEXT, + .binary => c.CURLWS_BINARY, + .cont => c.CURLWS_CONT, + .close => c.CURLWS_CLOSE, + .ping => c.CURLWS_PING, + .pong => c.CURLWS_PONG, + }; + } + + fn fromFlags(flags: c_int) WsFrameType { + const f: c_uint = @bitCast(flags); + if (f & c.CURLWS_TEXT != 0) return .text; + if (f & c.CURLWS_BINARY != 0) return .binary; + if (f & c.CURLWS_CLOSE != 0) return .close; + if (f & c.CURLWS_PING != 0) return .ping; + if (f & c.CURLWS_PONG != 0) return .pong; + if (f & c.CURLWS_CONT != 0) return .cont; + return .binary; // default fallback + } +}; + +pub const WsFrameMeta = struct { + frame_type: WsFrameType, + offset: usize, + bytes_left: usize, + len: usize, + + fn from(frame: *const c.curl_ws_frame) WsFrameMeta { + return .{ + .frame_type = WsFrameType.fromFlags(frame.flags), + .offset = @intCast(frame.offset), + .bytes_left = @intCast(frame.bytesleft), + .len = if (frame.len < 0) + std.math.maxInt(usize) + else + @intCast(frame.len), + }; + } +}; + +pub fn curl_ws_send(easy: *Curl, buffer: []const u8, sent: *usize, fragsize: CurlOffT, frame_type: WsFrameType) Error!void { + try errorCheck(c.curl_ws_send(easy, buffer.ptr, buffer.len, sent, fragsize, frame_type.toInt())); +} + +pub fn curl_ws_recv(easy: *Curl, buffer: []u8, recv: *usize, meta: *?WsFrameMeta) Error!void { + var c_meta: [*c]const c.curl_ws_frame = null; + const code = c.curl_ws_recv(easy, buffer.ptr, buffer.len, recv, &c_meta); + if (c_meta) |m| { + meta.* = WsFrameMeta.from(m); + } else { + meta.* = null; + } + try errorCheck(code); +} + +pub fn curl_ws_meta(easy: *Curl) ?WsFrameMeta { + const ptr = c.curl_ws_meta(easy); + if (ptr == null) return null; + return WsFrameMeta.from(ptr); +} + +pub fn curl_ws_start_frame(easy: *Curl, frame_type: WsFrameType, size: CurlOffT) Error!void { + try errorCheck(c.curl_ws_start_frame(easy, frame_type.toInt(), size)); +} diff --git a/src/testing.zig b/src/testing.zig index 8ff59751..8dd2eb88 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -436,19 +436,17 @@ fn runWebApiTest(test_file: [:0]const u8) !void { if (js_val.isTrue()) { return; } - switch (try runner.tick(.{ .ms = 20 })) { - .done => return error.TestNeverSignaledCompletion, - .ok => |next_ms| { - const ms_elapsed = timer.lap() / 1_000_000; - if (ms_elapsed >= wait_ms) { - return error.TestTimedOut; - } - wait_ms -= @intCast(ms_elapsed); - if (next_ms > 0) { - std.Thread.sleep(std.time.ns_per_ms * next_ms); - } - }, + const sleep_ms: usize = switch (try runner.tick(.{ .ms = 20 })) { + .done => 20, + .ok => |next_ms| @min(next_ms, 20), + }; + + const ms_elapsed = timer.lap() / 1_000_000; + if (ms_elapsed >= wait_ms) { + return error.TestTimedOut; } + wait_ms -= @intCast(ms_elapsed); + std.Thread.sleep(std.time.ns_per_ms * sleep_ms); } } @@ -476,12 +474,15 @@ pub fn pageTest(comptime test_file: []const u8, opts: PageTestOpts) !*Page { const log = @import("log.zig"); const TestHTTPServer = @import("TestHTTPServer.zig"); +const TestWSServer = @import("TestWSServer.zig"); const Server = @import("Server.zig"); var test_cdp_server: ?*Server = null; var test_cdp_server_thread: ?std.Thread = null; var test_http_server: ?TestHTTPServer = null; var test_http_server_thread: ?std.Thread = null; +var test_ws_server: ?TestWSServer = null; +var test_ws_server_thread: ?std.Thread = null; var test_config: Config = undefined; @@ -514,13 +515,16 @@ test "tests:beforeAll" { test_session = try test_browser.newSession(test_notification); var wg: std.Thread.WaitGroup = .{}; - wg.startMany(2); + wg.startMany(3); test_cdp_server_thread = try std.Thread.spawn(.{}, serveCDP, .{&wg}); test_http_server = TestHTTPServer.init(testHTTPHandler); test_http_server_thread = try std.Thread.spawn(.{}, TestHTTPServer.run, .{ &test_http_server.?, &wg }); + test_ws_server = TestWSServer.init(); + test_ws_server_thread = try std.Thread.spawn(.{}, TestWSServer.run, .{ &test_ws_server.?, &wg }); + // need to wait for the servers to be listening, else tests will fail because // they aren't able to connect. wg.wait(); @@ -545,6 +549,16 @@ test "tests:afterAll" { server.deinit(); } + if (test_ws_server) |*server| { + server.stop(); + } + if (test_ws_server_thread) |thread| { + thread.join(); + } + if (test_ws_server) |*server| { + server.deinit(); + } + @import("root").v8_peak_memory = test_browser.env.isolate.getHeapStatistics().total_physical_size; test_notification.deinit(); From 14dcb7895a802730df3ba3cef71438db42a93912 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 2 Apr 2026 10:27:03 +0800 Subject: [PATCH 02/15] Give websockets their own connection pool, improve websocket message logging --- src/Config.zig | 25 +++ src/browser/tests/net/websocket.html | 306 +++++++++++++++++++++++++++ src/browser/webapi/net/WebSocket.zig | 33 +-- src/log.zig | 1 + src/network/Network.zig | 63 +++++- src/testing.zig | 1 + 6 files changed, 406 insertions(+), 23 deletions(-) diff --git a/src/Config.zig b/src/Config.zig index 6788db1d..624fc63b 100644 --- a/src/Config.zig +++ b/src/Config.zig @@ -128,6 +128,13 @@ pub fn httpMaxResponseSize(self: *const Config) ?usize { }; } +pub fn wsMaxConcurrent(self: *const Config) u8 { + return switch (self.mode) { + inline .serve, .fetch, .mcp => |opts| opts.common.ws_max_concurrent orelse 8, + else => unreachable, + }; +} + pub fn logLevel(self: *const Config) ?log.Level { return switch (self.mode) { inline .serve, .fetch, .mcp => |opts| opts.common.log_level, @@ -275,6 +282,7 @@ pub const Common = struct { http_timeout: ?u31 = null, http_connect_timeout: ?u31 = null, http_max_response_size: ?usize = null, + ws_max_concurrent: ?u8 = null, tls_verify_host: bool = true, log_level: ?log.Level = null, log_format: ?log.Format = null, @@ -375,6 +383,10 @@ pub fn printUsageAndExit(self: *const Config, success: bool) void { \\ (e.g. XHR, fetch, script loading, ...). \\ Defaults to no limit. \\ + \\--ws-max-concurrent + \\ The maximum number of concurrent WebSocket connections. + \\ Defaults to 8. + \\ \\--log-level The log level: debug, info, warn, error or fatal. \\ Defaults to ++ (if (builtin.mode == .Debug) " info." else "warn.") ++ @@ -983,6 +995,19 @@ fn parseCommonArg( return true; } + if (std.mem.eql(u8, "--ws-max-concurrent", opt) or std.mem.eql(u8, "--ws_max_concurrent", opt)) { + const str = args.next() orelse { + log.fatal(.app, "missing argument value", .{ .arg = opt }); + return error.InvalidArgument; + }; + + common.ws_max_concurrent = std.fmt.parseInt(u8, str, 10) catch |err| { + log.fatal(.app, "invalid argument value", .{ .arg = opt, .err = err }); + return error.InvalidArgument; + }; + return true; + } + if (std.mem.eql(u8, "--log-level", opt) or std.mem.eql(u8, "--log_level", opt)) { const str = args.next() orelse { log.fatal(.app, "missing argument value", .{ .arg = opt }); diff --git a/src/browser/tests/net/websocket.html b/src/browser/tests/net/websocket.html index 8ad03a70..257c2136 100644 --- a/src/browser/tests/net/websocket.html +++ b/src/browser/tests/net/websocket.html @@ -238,3 +238,309 @@ }); } + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index 1a07dcf6..dc19728e 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -107,10 +107,7 @@ pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket { const resolved_url = try URL.resolve(arena, page.base(), url, .{ .always_dupe = true, .encode = true }); const http_client = page._session.browser.http_client; - const conn = http_client.network.getConnection() orelse { - // TODO: figure out how/where we actually want to get WebSocket connections - // from. I feel like sharing this with the HTTP Connection Pool is a - // mistake. + const conn = http_client.network.newConnection() orelse { return error.NoFreeConnection; }; @@ -135,7 +132,7 @@ pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket { try http_client.trackConn(conn); if (comptime IS_DEBUG) { - log.info(.http, "WS connecting", .{ .url = url }); + log.info(.websocket, "connecting", .{ .url = url }); } // Unlike an XHR object where we only selectively reference the instance @@ -179,9 +176,9 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { self._ready_state = .closed; if (err_) |err| { - log.warn(.http, "WS disconnected", .{ .err = err, .url = self._url }); + log.warn(.websocket, "disconnected", .{ .err = err, .url = self._url }); } else { - log.info(.http, "WS disconnected", .{ .url = self._url, .reason = "closed" }); + log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" }); } self.cleanup(); @@ -191,7 +188,7 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { const reason = if (was_clean) self._close_reason else ""; self.dispatchCloseEvent(code, reason, was_clean) catch |err| { - log.err(.http, "WS close event dispatch failed", .{ .err = err }); + log.err(.websocket, "close event dispatch failed", .{ .err = err }); }; } @@ -413,6 +410,7 @@ fn dispatchOpenEvent(self: *WebSocket) !void { } fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void { + std.debug.print("{any} {s}\n", .{ frame_type, data }); const page = self._page; const target = self.asEventTarget(); @@ -450,7 +448,7 @@ fn sendDataCallback(buffer: [*]u8, buf_count: usize, buf_len: usize, data: *anyo } const conn: *http.Connection = @ptrCast(@alignCast(data)); return _sendDataCallback(conn, buffer[0..buf_len]) catch |err| { - log.warn(.http, "WS send callback", .{ .err = err }); + log.warn(.websocket, "send callback", .{ .err = err }); return http.readfunc_pause; }; } @@ -499,6 +497,9 @@ fn _sendDataCallback(conn: *http.Connection, buf: []u8) !usize { fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: Message.Content, frame_type: http.WsFrameType) !usize { if (self._send_offset == 0) { // start of the message + if (comptime IS_DEBUG) { + log.debug(.websocket, "send start", .{ .url = self._url, .len = byte_msg.data.len }); + } try conn.wsStartFrame(frame_type, byte_msg.data.len); } @@ -511,6 +512,9 @@ fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: M if (self._send_offset >= byte_msg.data.len) { const removed = self._send_queue.orderedRemove(0); removed.deinit(self._page._session); + if (comptime IS_DEBUG) { + log.debug(.websocket, "send complete", .{ .url = self._url, .len = byte_msg.data.len, .queue = self._send_queue.items.len }); + } self._send_offset = 0; } @@ -523,7 +527,7 @@ fn receivedDataCallback(buffer: [*]const u8, buf_count: usize, buf_len: usize, d } const conn: *http.Connection = @ptrCast(@alignCast(data)); _receivedDataCallback(conn, buffer[0..buf_len]) catch |err| { - log.warn(.http, "WS receive callback", .{ .err = err }); + log.warn(.websocket, "receive callback", .{ .err = err }); // TODO: are there errors, like an invalid frame, that we shouldn't treat // as an error? return http.writefunc_error; @@ -535,11 +539,14 @@ fn receivedDataCallback(buffer: [*]const u8, buf_count: usize, buf_len: usize, d fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { const self = conn.transport.websocket; const meta = conn.wsMeta() orelse { - log.err(.http, "WS missing meta", .{ .url = self._url }); + log.err(.websocket, "missing meta", .{ .url = self._url }); return error.NoFrameMeta; }; if (meta.offset == 0) { + if (comptime IS_DEBUG) { + log.debug(.websocket, "incoming message", .{ .url = self._url, .len = meta.len, .bytes_left = meta.bytes_left, .type = meta.frame_type }); + } // Start of new frame. Pre-allocate buffer self._recv_buffer.clearRetainingCapacity(); if (meta.len > self._http_client.max_response_size) { @@ -598,10 +605,10 @@ fn receivedHeaderCalllback(buffer: [*]const u8, header_count: usize, buf_len: us } self._ready_state = .open; - log.info(.http, "WS connected", .{ .url = self._url }); + log.info(.websocket, "connected", .{ .url = self._url }); self.dispatchOpenEvent() catch |err| { - log.err(.http, "WS open event fail", .{ .err = err }); + log.err(.websocket, "open event fail", .{ .err = err }); }; return buf_len; } diff --git a/src/log.zig b/src/log.zig index 3e1016c5..84ff1049 100644 --- a/src/log.zig +++ b/src/log.zig @@ -40,6 +40,7 @@ pub const Scope = enum { unknown_prop, mcp, cache, + websocket, }; const Opts = struct { diff --git a/src/network/Network.zig b/src/network/Network.zig index ab11e5ce..1fb8c8fb 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -61,6 +61,11 @@ connections: []http.Connection, available: std.DoublyLinkedList = .{}, conn_mutex: std.Thread.Mutex = .{}, +ws_pool: std.heap.MemoryPool(http.Connection), +ws_count: usize = 0, +ws_max: u8, +ws_mutex: std.Thread.Mutex = .{}, + pollfds: []posix.pollfd, listener: ?Listener = null, @@ -268,9 +273,13 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network { .connections = connections, .app = app, + .robot_store = RobotStore.init(allocator), .web_bot_auth = web_bot_auth, .cache = cache, + + .ws_pool = .init(allocator), + .ws_max = config.wsMaxConcurrent(), }; } @@ -298,6 +307,8 @@ pub fn deinit(self: *Network) void { } self.allocator.free(self.connections); + self.ws_pool.deinit(); + self.robot_store.deinit(); if (self.web_bot_auth) |wba| { wba.deinit(self.allocator); @@ -592,18 +603,50 @@ pub fn getConnection(self: *Network) ?*http.Connection { } pub fn releaseConnection(self: *Network, conn: *http.Connection) void { - conn.reset(self.config, self.ca_blob) catch |err| { - lp.assert(false, "couldn't reset curl easy", .{ .err = err }); - }; - - self.conn_mutex.lock(); - defer self.conn_mutex.unlock(); - - self.available.append(&conn.node); + switch (conn.transport) { + .websocket => { + conn.deinit(); + self.ws_mutex.lock(); + defer self.ws_mutex.unlock(); + self.ws_pool.destroy(conn); + self.ws_count -= 1; + }, + else => { + conn.reset(self.config, self.ca_blob) catch |err| { + lp.assert(false, "couldn't reset curl easy", .{ .err = err }); + }; + self.conn_mutex.lock(); + defer self.conn_mutex.unlock(); + self.available.append(&conn.node); + }, + } } -pub fn newConnection(self: *Network) !http.Connection { - return http.Connection.init(self.ca_blob, self.config); +pub fn newConnection(self: *Network) ?*http.Connection { + const conn = blk: { + self.ws_mutex.lock(); + defer self.ws_mutex.unlock(); + + if (self.ws_count >= self.ws_max) { + return null; + } + + const c = self.ws_pool.create() catch return null; + self.ws_count += 1; + break :blk c; + }; + + // don't do this under lock + conn.* = http.Connection.init(self.ca_blob, self.config) catch { + self.ws_mutex.lock(); + defer self.ws_mutex.unlock(); + self.ws_pool.destroy(conn); + self.ws_count -= 1; + + return null; + }; + + return conn; } // Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is diff --git a/src/testing.zig b/src/testing.zig index 8dd2eb88..44abfea8 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -496,6 +496,7 @@ test "tests:beforeAll" { .common = .{ .tls_verify_host = false, .user_agent_suffix = "internal-tester", + .ws_max_concurrent = 50, }, } }); From 6d83da516194d51d61d61bf7e246534e6d50f125 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 2 Apr 2026 10:45:02 +0800 Subject: [PATCH 03/15] support 'blob' binaryType --- src/browser/tests/net/websocket.html | 40 +++++++++++++++++++++++ src/browser/webapi/event/MessageEvent.zig | 2 ++ src/browser/webapi/net/WebSocket.zig | 12 +++++-- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/browser/tests/net/websocket.html b/src/browser/tests/net/websocket.html index 257c2136..dbfc0c4a 100644 --- a/src/browser/tests/net/websocket.html +++ b/src/browser/tests/net/websocket.html @@ -544,3 +544,43 @@ }); } + + diff --git a/src/browser/webapi/event/MessageEvent.zig b/src/browser/webapi/event/MessageEvent.zig index 66ffd8c6..dfd813d5 100644 --- a/src/browser/webapi/event/MessageEvent.zig +++ b/src/browser/webapi/event/MessageEvent.zig @@ -44,6 +44,7 @@ pub const Data = union(enum) { value: js.Value.Temp, string: []const u8, arraybuffer: js.ArrayBuffer, + blob: *@import("../Blob.zig"), }; const Options = Event.inheritOptions(MessageEvent, MessageEventOptions); @@ -83,6 +84,7 @@ pub fn deinit(self: *MessageEvent, session: *Session) void { if (self._data) |d| { switch (d) { .value => |js_val| js_val.release(), + .blob => |blob| blob.releaseRef(session), .string, .arraybuffer => {}, } } diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index dc19728e..7fa390e4 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -410,13 +410,19 @@ fn dispatchOpenEvent(self: *WebSocket) !void { } fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void { - std.debug.print("{any} {s}\n", .{ frame_type, data }); const page = self._page; const target = self.asEventTarget(); if (page._event_manager.hasDirectListeners(target, "message", self._on_message)) { - const msg_data: MessageEvent.Data = if (frame_type == .binary and self._binary_type == .arraybuffer) - .{ .arraybuffer = .{ .values = data } } + const msg_data: MessageEvent.Data = if (frame_type == .binary) + switch (self._binary_type) { + .arraybuffer => .{ .arraybuffer = .{ .values = data } }, + .blob => blk: { + const blob = try Blob.init(&.{data}, .{}, page); + blob.acquireRef(); + break :blk .{ .blob = blob }; + }, + } else .{ .string = data }; From 0d75c00f858212df7563056812bee55f486d6a88 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 2 Apr 2026 11:44:32 +0800 Subject: [PATCH 04/15] update v8 dep --- .github/actions/install/action.yml | 2 +- Dockerfile | 2 +- build.zig.zon | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/actions/install/action.yml b/.github/actions/install/action.yml index 2f7ad2e2..f5bb72d3 100644 --- a/.github/actions/install/action.yml +++ b/.github/actions/install/action.yml @@ -13,7 +13,7 @@ inputs: zig-v8: description: 'zig v8 version to install' required: false - default: 'v0.3.7' + default: 'v0.3.8' v8: description: 'v8 version to install' required: false diff --git a/Dockerfile b/Dockerfile index b266f916..ed90e014 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ FROM debian:stable-slim ARG MINISIG=0.12 ARG ZIG_MINISIG=RWSGOq2NVecA2UPNdBUZykf1CCb147pkmdtYxgb3Ti+JO/wCYvhbAb/U ARG V8=14.0.365.4 -ARG ZIG_V8=v0.3.7 +ARG ZIG_V8=v0.3.8 ARG TARGETPLATFORM RUN apt-get update -yq && \ diff --git a/build.zig.zon b/build.zig.zon index 0c6096a4..338f49d7 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -5,8 +5,8 @@ .minimum_zig_version = "0.15.2", .dependencies = .{ .v8 = .{ - .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/99c1ddf2d0b15f141e92ea09abdfc8e0e5f441e6.tar.gz", - .hash = "v8-0.0.0-xddH63-BBABP05dni8oMrs9qQwuczHhNhXHbXXlPb95s", + .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/refs/tags/v0.3.8.tar.gz", + .hash = "v8-0.0.0-xddH6weEBAAdY3uxkNWqYpG7bX_h1Oj3UYBIkbxEyNCl", }, // .v8 = .{ .path = "../zig-v8-fork" }, .brotli = .{ From 6bf35e1ed4bab5e5bf8304949a5440ff3e73a74b Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 2 Apr 2026 12:31:55 +0800 Subject: [PATCH 05/15] try to improve test ws shutdown, merge ws tests --- src/TestWSServer.zig | 13 +- src/browser/tests/net/websocket.html | 2 - src/browser/tests/net/websocket2.html | 233 -------------------------- src/browser/tests/net/websocket3.html | 77 --------- src/browser/webapi/net/WebSocket.zig | 3 - src/testing.zig | 3 - 6 files changed, 4 insertions(+), 327 deletions(-) delete mode 100644 src/browser/tests/net/websocket2.html delete mode 100644 src/browser/tests/net/websocket3.html diff --git a/src/TestWSServer.zig b/src/TestWSServer.zig index 4d7ddd54..6d28acf3 100644 --- a/src/TestWSServer.zig +++ b/src/TestWSServer.zig @@ -31,18 +31,13 @@ pub fn init() TestWSServer { }; } -pub fn deinit(self: *TestWSServer) void { - if (self.listener) |socket| { - posix.close(socket); - self.listener = null; - } -} - pub fn stop(self: *TestWSServer) void { self.shutdown.store(true, .release); if (self.listener) |socket| { - posix.close(socket); - self.listener = null; + switch (@import("builtin").target.os.tag) { + .linux => std.posix.shutdown(socket, .recv) catch {}, + else => std.posix.close(socket), + } } } diff --git a/src/browser/tests/net/websocket.html b/src/browser/tests/net/websocket.html index dbfc0c4a..fd9f3c29 100644 --- a/src/browser/tests/net/websocket.html +++ b/src/browser/tests/net/websocket.html @@ -309,7 +309,6 @@ const state = await testing.async(); let received = []; - console.warn('last-test'); let ws = new WebSocket('ws://127.0.0.1:9584/'); ws.binaryType = 'arraybuffer'; @@ -328,7 +327,6 @@ }); await state.done(() => { - console.warn('aaa'); // 1 marker byte + 8 bytes (2 x 4-byte int32) testing.expectEqual([9], received); }); diff --git a/src/browser/tests/net/websocket2.html b/src/browser/tests/net/websocket2.html deleted file mode 100644 index d421867e..00000000 --- a/src/browser/tests/net/websocket2.html +++ /dev/null @@ -1,233 +0,0 @@ - - - - - - - - - - - - - - - - - - diff --git a/src/browser/tests/net/websocket3.html b/src/browser/tests/net/websocket3.html deleted file mode 100644 index 12dc19cc..00000000 --- a/src/browser/tests/net/websocket3.html +++ /dev/null @@ -1,77 +0,0 @@ - - - - - - - - diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index 7fa390e4..07b53ae1 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -693,8 +693,5 @@ pub const JsApi = struct { const testing = @import("../../../testing.zig"); test "WebApi: WebSocket" { - // TEMP since we're currently limited to 10 concurrent connections try testing.htmlRunner("net/websocket.html", .{}); - try testing.htmlRunner("net/websocket2.html", .{}); - try testing.htmlRunner("net/websocket3.html", .{}); } diff --git a/src/testing.zig b/src/testing.zig index 44abfea8..e2ec937e 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -556,9 +556,6 @@ test "tests:afterAll" { if (test_ws_server_thread) |thread| { thread.join(); } - if (test_ws_server) |*server| { - server.deinit(); - } @import("root").v8_peak_memory = test_browser.env.isolate.getHeapStatistics().total_physical_size; From c4250418afb96d3f41c9093e1b67d11974d9e07b Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Sat, 4 Apr 2026 07:39:55 +0800 Subject: [PATCH 06/15] Address feedback -dispatch error on abnormal close -reciprocal close message -more url validation -cleanup dead code --- src/browser/HttpClient.zig | 8 +++- src/browser/tests/net/websocket.html | 41 +++++------------ src/browser/webapi/net/WebSocket.zig | 66 ++++++++++++++++++++-------- src/network/http.zig | 9 ---- 4 files changed, 64 insertions(+), 60 deletions(-) diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 59354f11..4683e38e 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -270,7 +270,11 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { var leftover: usize = 0; while (it) |node| : (it = node.next) { const conn: *http.Connection = @fieldParentPtr("node", node); - std.debug.assert((Transfer.fromConnection(conn) catch unreachable).aborted); + switch (conn.transport) { + .http => |transfer| std.debug.assert(transfer.aborted), + .websocket => {}, + .none => {}, + } leftover += 1; } std.debug.assert(self.active == leftover); @@ -1015,7 +1019,7 @@ fn processMessages(self: *Client) !bool { ws.disconnected(null); } - return true; + processed = true; }, .none => unreachable, } diff --git a/src/browser/tests/net/websocket.html b/src/browser/tests/net/websocket.html index fd9f3c29..9ca32120 100644 --- a/src/browser/tests/net/websocket.html +++ b/src/browser/tests/net/websocket.html @@ -304,35 +304,6 @@ } - - @@ -582,3 +553,11 @@ }); } + + diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index 07b53ae1..1244a61e 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -99,6 +99,10 @@ pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket { if (!std.mem.startsWith(u8, normalized_start, "ws://") and !std.mem.startsWith(u8, normalized_start, "wss://")) { return error.SyntaxError; } + // Fragments are not allowed in WebSocket URLs + if (std.mem.indexOfScalar(u8, url, '#') != null) { + return error.SyntaxError; + } } const arena = try page.getArena(.{ .debug = "WebSocket" }); @@ -118,7 +122,7 @@ pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket { try conn.setReadCallback(sendDataCallback, true); try conn.setWriteCallback(receivedDataCallback); - try conn.setHeaderCallback(receivedHeaderCalllback); + try conn.setHeaderCallback(receivedHeaderCallback); const self = try page._factory.eventTargetWithAllocator(arena, WebSocket{ ._page = page, @@ -187,6 +191,13 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { const code = if (was_clean) self._close_code else 1006; const reason = if (was_clean) self._close_reason else ""; + // Spec requires error event before close on abnormal closure + if (!was_clean) { + self.dispatchErrorEvent() catch |err| { + log.err(.websocket, "error event dispatch failed", .{ .err = err }); + }; + } + self.dispatchCloseEvent(code, reason, was_clean) catch |err| { log.err(.websocket, "close event dispatch failed", .{ .err = err }); }; @@ -197,6 +208,7 @@ fn cleanup(self: *WebSocket) void { self._http_client.removeConn(conn); self._conn = null; self.releaseRef(self._page._session); + self._send_queue.clearRetainingCapacity(); } } @@ -293,6 +305,13 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void { return; } + // Validate close code per spec: must be 1000 or in range 3000-4999 + if (code_) |code| { + if (code != 1000 and (code < 3000 or code > 4999)) { + return error.InvalidAccessError; + } + } + const code = code_ orelse 1000; const reason = reason_ orelse ""; @@ -329,14 +348,6 @@ pub fn getBufferedAmount(self: *const WebSocket) u32 { return buffered; } -pub fn getProtocol(self: *const WebSocket) []const u8 { - return self._protocol; -} - -pub fn getExtensions(self: *const WebSocket) []const u8 { - return self._extensions; -} - pub fn getBinaryType(self: *const WebSocket) []const u8 { return @tagName(self._binary_type); } @@ -404,7 +415,7 @@ fn dispatchOpenEvent(self: *WebSocket) !void { const target = self.asEventTarget(); if (page._event_manager.hasDirectListeners(target, "open", self._on_open)) { - const event = try Event.initTrusted(.wrap("open"), .{}, page); + const event = try Event.initTrusted(comptime .wrap("open"), .{}, page); try page._event_manager.dispatchDirect(target, event, self._on_open, .{ .context = "WebSocket open" }); } } @@ -426,7 +437,7 @@ fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsF else .{ .string = data }; - const event = try MessageEvent.initTrusted(.wrap("message"), .{ + const event = try MessageEvent.initTrusted(comptime .wrap("message"), .{ .data = msg_data, .origin = "", }, page); @@ -434,12 +445,22 @@ fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsF } } +fn dispatchErrorEvent(self: *WebSocket) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "error", self._on_error)) { + const event = try Event.initTrusted(comptime .wrap("error"), .{}, page); + try page._event_manager.dispatchDirect(target, event, self._on_error, .{ .context = "WebSocket error" }); + } +} + fn dispatchCloseEvent(self: *WebSocket, code: u16, reason: []const u8, was_clean: bool) !void { const page = self._page; const target = self.asEventTarget(); if (page._event_manager.hasDirectListeners(target, "close", self._on_close)) { - const event = try CloseEvent.initTrusted(.wrap("close"), .{ + const event = try CloseEvent.initTrusted(comptime .wrap("close"), .{ .code = code, .reason = reason, .wasClean = was_clean, @@ -573,15 +594,24 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { .text, .binary => try self.dispatchMessageEvent(message, meta.frame_type), .close => { // Parse close frame: 2-byte code (big-endian) + optional reason - self._close_code = if (message.len >= 2) + const received_code = if (message.len >= 2) @as(u16, message[0]) << 8 | message[1] else 1005; // No status code received - if (message.len > 2) { - self._close_reason = try self._arena.dupe(u8, message[2..]); + + if (self._ready_state == .closing) { + // Client-initiated close: this is the server's response. + // Close handshake complete - disconnect. + self.disconnected(null); + } else { + // Server-initiated close: send reciprocal close frame per RFC 6455 §5.5.1 + self._close_code = received_code; + if (message.len > 2) { + self._close_reason = try self._arena.dupe(u8, message[2..]); + } + self._ready_state = .closing; + try self.queueMessage(.close); } - self._ready_state = .closing; - self.disconnected(null); }, .ping, .pong, .cont => {}, } @@ -589,7 +619,7 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { // libcurl has no mechanism to signal that the connection is established. The // best option I could come up with was looking for an upgrade header response. -fn receivedHeaderCalllback(buffer: [*]const u8, header_count: usize, buf_len: usize, data: *anyopaque) usize { +fn receivedHeaderCallback(buffer: [*]const u8, header_count: usize, buf_len: usize, data: *anyopaque) usize { if (comptime IS_DEBUG) { std.debug.assert(header_count == 1); } diff --git a/src/network/http.zig b/src/network/http.zig index 94b06fb9..e7a7fab4 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -349,15 +349,6 @@ pub const Connection = struct { try libcurl.curl_easy_setopt(self._easy, .header_function, data_cb); } - pub const PauseFlags = packed struct { - red: bool = false, - green: bool = false, - blue: bool = false, - alpha: bool = false, - // Optional padding to match a specific size, e.g., a u32 - _padding: u28 = 0, - }; - pub fn pause( self: *Connection, flags: libcurl.CurlPauseFlags, From 95f80c9645a7102640cc3b57e19f3e1d228872a1 Mon Sep 17 00:00:00 2001 From: Trevin Chow Date: Fri, 3 Apr 2026 16:59:21 -0700 Subject: [PATCH 07/15] feat: emit Page.javascriptDialogOpening CDP events for JS dialogs window.alert(), confirm(), and prompt() now dispatch a javascript_dialog_opening notification that the CDP layer forwards as a Page.javascriptDialogOpening event. This enables Puppeteer's page.on('dialog') to fire when JS dialogs open. Also adds Page.handleJavaScriptDialog as a CDP method. Dialogs still auto-dismiss in headless mode (alert is void, confirm returns false, prompt returns null), so handleJavaScriptDialog is an acknowledgement rather than a blocking gate. Changes: - Notification.zig: add JavascriptDialogOpening event type - CDP.zig: register listener, forward to page domain - page.zig: handleJavaScriptDialog handler + event emitter - Window.zig: alert/confirm/prompt dispatch the notification Fixes #2082 Ref #2043 --- src/Notification.zig | 8 ++++++++ src/browser/webapi/Window.zig | 24 ++++++++++++++++++++---- src/cdp/CDP.zig | 6 ++++++ src/cdp/domains/page.zig | 28 ++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/src/Notification.zig b/src/Notification.zig index 6adbfd81..e7bf1e33 100644 --- a/src/Notification.zig +++ b/src/Notification.zig @@ -83,6 +83,7 @@ const EventListeners = struct { http_request_auth_required: List = .{}, http_response_data: List = .{}, http_response_header_done: List = .{}, + javascript_dialog_opening: List = .{}, }; const Events = union(enum) { @@ -102,6 +103,7 @@ const Events = union(enum) { http_request_done: *const RequestDone, http_response_data: *const ResponseData, http_response_header_done: *const ResponseHeaderDone, + javascript_dialog_opening: *const JavascriptDialogOpening, }; const EventType = std.meta.FieldEnum(Events); @@ -185,6 +187,12 @@ pub const RequestFail = struct { err: anyerror, }; +pub const JavascriptDialogOpening = struct { + url: [:0]const u8, + message: []const u8, + dialog_type: []const u8, +}; + pub fn init(allocator: Allocator) !*Notification { const notification = try allocator.create(Notification); errdefer allocator.destroy(notification); diff --git a/src/browser/webapi/Window.zig b/src/browser/webapi/Window.zig index fb3ec8f8..582ed659 100644 --- a/src/browser/webapi/Window.zig +++ b/src/browser/webapi/Window.zig @@ -903,15 +903,31 @@ pub const JsApi = struct { pub const opener = bridge.property(null, .{ .template = false }); pub const alert = bridge.function(struct { - fn alert(_: *const Window, _: ?[]const u8) void {} - }.alert, .{ .noop = true }); + fn alert(_: *const Window, message: ?[]const u8, page: *Page) void { + page._session.notification.dispatch(.javascript_dialog_opening, &.{ + .url = page.url, + .message = message orelse "", + .dialog_type = "alert", + }); + } + }.alert, .{}); pub const confirm = bridge.function(struct { - fn confirm(_: *const Window, _: ?[]const u8) bool { + fn confirm(_: *const Window, message: ?[]const u8, page: *Page) bool { + page._session.notification.dispatch(.javascript_dialog_opening, &.{ + .url = page.url, + .message = message orelse "", + .dialog_type = "confirm", + }); return false; } }.confirm, .{}); pub const prompt = bridge.function(struct { - fn prompt(_: *const Window, _: ?[]const u8, _: ?[]const u8) ?[]const u8 { + fn prompt(_: *const Window, message: ?[]const u8, _: ?[]const u8, page: *Page) ?[]const u8 { + page._session.notification.dispatch(.javascript_dialog_opening, &.{ + .url = page.url, + .message = message orelse "", + .dialog_type = "prompt", + }); return null; } }.prompt, .{}); diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index 09fa8324..89efcef1 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -431,6 +431,7 @@ pub const BrowserContext = struct { try notification.register(.page_frame_created, self, onPageFrameCreated); try notification.register(.page_dom_content_loaded, self, onPageDOMContentLoaded); try notification.register(.page_loaded, self, onPageLoaded); + try notification.register(.javascript_dialog_opening, self, onJavascriptDialogOpening); } pub fn deinit(self: *BrowserContext) void { @@ -641,6 +642,11 @@ pub const BrowserContext = struct { return @import("domains/page.zig").pageLoaded(self, msg); } + pub fn onJavascriptDialogOpening(ctx: *anyopaque, msg: *const Notification.JavascriptDialogOpening) !void { + const self: *BrowserContext = @ptrCast(@alignCast(ctx)); + return @import("domains/page.zig").javascriptDialogOpening(self, msg); + } + pub fn onHttpResponseHeadersDone(ctx: *anyopaque, msg: *const Notification.ResponseHeaderDone) !void { const self: *BrowserContext = @ptrCast(@alignCast(ctx)); defer self.resetNotificationArena(); diff --git a/src/cdp/domains/page.zig b/src/cdp/domains/page.zig index cf3cdd7d..d5fb8c7b 100644 --- a/src/cdp/domains/page.zig +++ b/src/cdp/domains/page.zig @@ -48,6 +48,7 @@ pub fn processMessage(cmd: *CDP.Command) !void { close, captureScreenshot, getLayoutMetrics, + handleJavaScriptDialog, }, cmd.input.action) orelse return error.UnknownMethod; switch (action) { @@ -63,6 +64,7 @@ pub fn processMessage(cmd: *CDP.Command) !void { .close => return close(cmd), .captureScreenshot => return captureScreenshot(cmd), .getLayoutMetrics => return getLayoutMetrics(cmd), + .handleJavaScriptDialog => return handleJavaScriptDialog(cmd), } } @@ -642,6 +644,32 @@ fn sendPageLifecycle(bc: *CDP.BrowserContext, name: []const u8, timestamp: u64, }, .{ .session_id = session_id }); } +// https://chromedevtools.github.io/devtools-protocol/tot/Page/#method-handleJavaScriptDialog +fn handleJavaScriptDialog(cmd: *CDP.Command) !void { + // Dialogs auto-dismiss in headless mode, so this is an acknowledgement. + // accept and promptText params are parsed but not used since the dialog + // already returned by the time the CDP client sends this. + _ = try cmd.params(struct { + accept: bool, + promptText: ?[]const u8 = null, + }); + try cmd.sendResult(null, .{}); +} + +// https://chromedevtools.github.io/devtools-protocol/tot/Page/#event-javascriptDialogOpening +pub fn javascriptDialogOpening(bc: anytype, event: *const Notification.JavascriptDialogOpening) !void { + const session_id = bc.session_id orelse return; + var cdp = bc.cdp; + + try cdp.sendEvent("Page.javascriptDialogOpening", .{ + .url = event.url, + .message = event.message, + .type = event.dialog_type, + .hasBrowserHandler = false, + .defaultPrompt = "", + }, .{ .session_id = session_id }); +} + const LifecycleEvent = struct { frameId: []const u8, loaderId: ?[]const u8, From 918d3709feabd64ef93f2592b4cb0fd1f0510378 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Sun, 5 Apr 2026 08:13:38 +0800 Subject: [PATCH 08/15] Move the NodeList created from a ChildNode to the ChildNode's arena --- src/browser/webapi/collections/ChildNodes.zig | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/browser/webapi/collections/ChildNodes.zig b/src/browser/webapi/collections/ChildNodes.zig index 9c2bde91..df3e7ee1 100644 --- a/src/browser/webapi/collections/ChildNodes.zig +++ b/src/browser/webapi/collections/ChildNodes.zig @@ -126,8 +126,12 @@ fn versionCheck(self: *ChildNodes, page: *Page) bool { } const NodeList = @import("NodeList.zig"); -pub fn runtimeGenericWrap(self: *ChildNodes, page: *Page) !*NodeList { - return page._factory.create(NodeList{ ._data = .{ .child_nodes = self } }); +pub fn runtimeGenericWrap(self: *ChildNodes, _: *const Page) !*NodeList { + const nl = try self._arena.create(NodeList); + nl.* = .{ + ._data = .{ .child_nodes = self }, + }; + return nl; } const Iterator = struct { From e54c99d0f037baf5ca813e3b9df34d5b22723256 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Mon, 6 Apr 2026 16:32:48 +0800 Subject: [PATCH 09/15] protect against index out of bound on invalid datetime --- src/datetime.zig | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/datetime.zig b/src/datetime.zig index 04f1a6d5..3b9d5951 100644 --- a/src/datetime.zig +++ b/src/datetime.zig @@ -335,7 +335,7 @@ pub const DateTime = struct { } const tm = try parser.time(false); - if (parser.consumeIf(' ') == false) { + if (parser.unconsumed() == 0 or parser.consumeIf(' ') == false) { return error.InvalidTime; } @@ -1496,6 +1496,9 @@ test "DateTime: parse RFC822" { try testing.expectError(error.InvalidTime, DateTime.parse("Wed, 01 Jan 20 20:1a:22 X", .rfc822)); try testing.expectError(error.InvalidTime, DateTime.parse("Wed, 01 Jan 20 20:1a:22 ZZ", .rfc822)); + // Missing timezone - input ends exactly at time boundary (was causing index out of bounds) + try testing.expectError(error.InvalidTime, DateTime.parse("Wed, 01 Jan 2020 10:10:10", .rfc822)); + { const dt = try DateTime.parse("31 Dec 68 23:59 Z", .rfc822); try testing.expectEqual(3124223940000000, dt.micros); From c4fd37127e571e5983ff4b5179866653385d9cb1 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Mon, 6 Apr 2026 16:56:05 +0800 Subject: [PATCH 10/15] Better handling of cookies with empty domains --- src/browser/webapi/storage/Cookie.zig | 33 ++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/browser/webapi/storage/Cookie.zig b/src/browser/webapi/storage/Cookie.zig index 649a04cc..09231e5e 100644 --- a/src/browser/webapi/storage/Cookie.zig +++ b/src/browser/webapi/storage/Cookie.zig @@ -283,7 +283,10 @@ pub fn parseDomain(arena: Allocator, url_: ?[:0]const u8, explicit_domain: ?[]co } } - return encoded_host orelse return error.InvalidDomain; // default-domain + if (encoded_host) |host| { + if (host.len > 0) return host; + } + return error.InvalidDomain; } pub fn percentEncode(arena: Allocator, part: []const u8, comptime isValidChar: fn (u8) bool) ![]u8 { @@ -353,6 +356,9 @@ pub fn appliesTo(self: *const Cookie, url: *const PreparedUri, same_site: bool, } { + if (self.domain.len == 0) { + return false; + } if (self.domain[0] == '.') { // When a Set-Cookie header has a Domain attribute // Then we will _always_ prefix it with a dot, extending its @@ -1079,3 +1085,28 @@ fn expectAttribute(expected: anytype, url_: ?[:0]const u8, set_cookie: []const u fn expectError(expected: anyerror, url: ?[:0]const u8, set_cookie: []const u8) !void { try testing.expectError(expected, Cookie.parse(testing.allocator, url orelse test_url, set_cookie)); } + +test "Cookie: appliesTo with empty domain" { + const cookie = Cookie{ + .arena = std.heap.ArenaAllocator.init(testing.allocator), + .name = "test", + .value = "value", + .domain = "", + .path = "/", + .expires = null, + }; + defer cookie.deinit(); + + const target = PreparedUri{ + .host = "example.com", + .path = "/", + .secure = false, + }; + + try testing.expectEqual(false, cookie.appliesTo(&target, true, true, true)); +} + +test "Cookie: parse rejects URL with empty host" { + try testing.expectError(error.InvalidDomain, Cookie.parse(testing.allocator, "http:///path", "name=value")); + try testing.expectError(error.InvalidDomain, Cookie.parse(testing.allocator, "http://", "name=value")); +} From d917728053e8fd50b18f2582d8868deb618ebd47 Mon Sep 17 00:00:00 2001 From: NYCU-Chung Date: Mon, 6 Apr 2026 20:39:56 +0800 Subject: [PATCH 11/15] fix(cookie): reject Set-Cookie domains that are public suffixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit parseDomain() rejects bare TLDs (e.g. Domain=.io) but accepts multi-level public suffixes like .co.uk, .com.au, .co.jp. Per RFC 6265bis §5.7.3.10, user agents should reject cookies whose domain attribute is a public suffix. Chrome, Firefox, and Safari all enforce this using the Public Suffix List. The PSL data is already imported (Cookie.zig:26) and used in findSecondLevelDomain(), but parseDomain() does not consult it. This causes behavior differences vs Chrome when automating .co.uk / .com.au / .co.jp sites via CDP — cookies that Chrome silently drops are accepted by Lightpanda, polluting the cookie jar across unrelated sites in the same session. --- src/browser/webapi/storage/Cookie.zig | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/browser/webapi/storage/Cookie.zig b/src/browser/webapi/storage/Cookie.zig index 649a04cc..4616b3c5 100644 --- a/src/browser/webapi/storage/Cookie.zig +++ b/src/browser/webapi/storage/Cookie.zig @@ -273,6 +273,12 @@ pub fn parseDomain(arena: Allocator, url_: ?[:0]const u8, explicit_domain: ?[]co // can't set a cookie for a TLD return error.InvalidDomain; } + + // Can't set a cookie for a public suffix (e.g. co.uk, com.au). + if (public_suffix_list(owned_domain[1..])) { + return error.InvalidDomain; + } + if (encoded_host) |host| { if (std.mem.endsWith(u8, host, owned_domain[1..]) == false) { return error.InvalidDomain; @@ -1027,6 +1033,15 @@ test "Cookie: parse domain" { try expectError(error.InvalidDomain, "http://lightpanda.io/", "b;domain=other.lightpanda.io"); try expectError(error.InvalidDomain, "http://lightpanda.io/", "b;domain=other.lightpanda.com"); try expectError(error.InvalidDomain, "http://lightpanda.io/", "b;domain=other.example.com"); + + // Public suffixes should be rejected (test PSL entries: "gov.uk", "api.gov.uk") + try expectError(error.InvalidDomain, "http://example.gov.uk/", "b;domain=gov.uk"); + try expectError(error.InvalidDomain, "http://example.gov.uk/", "b;domain=.gov.uk"); + try expectError(error.InvalidDomain, "http://test.api.gov.uk/", "b;domain=api.gov.uk"); + + // Subdomains of public suffixes should still be accepted + try expectAttribute(.{ .domain = ".example.gov.uk" }, "http://example.gov.uk/", "b;domain=example.gov.uk"); + try expectAttribute(.{ .domain = ".example.gov.uk" }, "http://sub.example.gov.uk/", "b;domain=example.gov.uk"); } test "Cookie: parse limit" { From f813d34897128c1b840691f50025212006bb523d Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Mon, 6 Apr 2026 23:06:03 +0800 Subject: [PATCH 12/15] Don't release un-acquired reference on CDP input event. --- src/browser/Page.zig | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/browser/Page.zig b/src/browser/Page.zig index b265a45d..16a05806 100644 --- a/src/browser/Page.zig +++ b/src/browser/Page.zig @@ -3438,10 +3438,7 @@ pub fn handleClick(self: *Page, target: *Node) !void { pub fn triggerKeyboard(self: *Page, keyboard_event: *KeyboardEvent) !void { const event = keyboard_event.asEvent(); - const element = self.window._document._active_element orelse { - _ = event.releaseRef(self._session); - return; - }; + const element = self.window._document._active_element orelse return; if (comptime IS_DEBUG) { log.debug(.page, "page keydown", .{ From 7208934bda254fde89b1229716f1999b159a8884 Mon Sep 17 00:00:00 2001 From: Trevin Chow Date: Mon, 6 Apr 2026 11:08:27 -0700 Subject: [PATCH 13/15] fix: return CDP error from handleJavaScriptDialog instead of silent no-op Dialogs auto-dismiss in headless mode, so there is no pending dialog by the time the CDP client sends Page.handleJavaScriptDialog. Return an explicit error so the client knows the action had no effect. --- src/cdp/domains/page.zig | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cdp/domains/page.zig b/src/cdp/domains/page.zig index d5fb8c7b..c306ead7 100644 --- a/src/cdp/domains/page.zig +++ b/src/cdp/domains/page.zig @@ -646,14 +646,14 @@ fn sendPageLifecycle(bc: *CDP.BrowserContext, name: []const u8, timestamp: u64, // https://chromedevtools.github.io/devtools-protocol/tot/Page/#method-handleJavaScriptDialog fn handleJavaScriptDialog(cmd: *CDP.Command) !void { - // Dialogs auto-dismiss in headless mode, so this is an acknowledgement. - // accept and promptText params are parsed but not used since the dialog - // already returned by the time the CDP client sends this. + // Dialogs auto-dismiss in headless mode. By the time the CDP client + // sends this command, the dialog has already returned and there is + // no pending dialog to accept or dismiss. _ = try cmd.params(struct { accept: bool, promptText: ?[]const u8 = null, }); - try cmd.sendResult(null, .{}); + return cmd.sendError(-32000, "No dialog is showing", .{}); } // https://chromedevtools.github.io/devtools-protocol/tot/Page/#event-javascriptDialogOpening From 36d3be55341d85a6d1fb6a9f9037cb326291b5e7 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Mon, 6 Apr 2026 18:03:45 +0800 Subject: [PATCH 14/15] add assertion on RC (to catch release overflow) --- src/lightpanda.zig | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lightpanda.zig b/src/lightpanda.zig index 5859324d..464769e0 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -249,9 +249,7 @@ pub fn RC(comptime T: type) type { } pub fn release(self: *@This(), value: anytype, session: *Session) void { - if (comptime IS_DEBUG) { - std.debug.assert(self._refs > 0); - } + assert(self._refs > 0, "release overflow", .{.type = @typeName(@TypeOf(value))}); const refs = self._refs - 1; self._refs = refs; From de8a5eeec1b169bd757c93bf4ea8b7a9405284ee Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Mon, 6 Apr 2026 18:12:44 +0800 Subject: [PATCH 15/15] zig fmt --- src/lightpanda.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightpanda.zig b/src/lightpanda.zig index 464769e0..4d6c23fb 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -249,7 +249,7 @@ pub fn RC(comptime T: type) type { } pub fn release(self: *@This(), value: anytype, session: *Session) void { - assert(self._refs > 0, "release overflow", .{.type = @typeName(@TypeOf(value))}); + assert(self._refs > 0, "release overflow", .{ .type = @typeName(@TypeOf(value)) }); const refs = self._refs - 1; self._refs = refs;