Merge pull request #2359 from lightpanda-io/wp/mrdimidium/spli-cdp-http

Reorganize Server, Client, CDP and HttpClient
This commit is contained in:
Nikolay Govorov
2026-05-04 17:03:19 +01:00
committed by GitHub
21 changed files with 724 additions and 797 deletions

View File

@@ -20,10 +20,10 @@ const std = @import("std");
const lp = @import("lightpanda");
const App = @import("App.zig");
const Config = @import("Config.zig");
const CDP = @import("cdp/CDP.zig");
const Net = @import("network/websocket.zig");
const HttpClient = @import("browser/HttpClient.zig");
const Config = @import("Config.zig");
const CDPClient = @import("./browser/HttpClient.zig").CDPClient;
const WsConnection = @import("network/WsConnection.zig");
const log = lp.log;
const net = std.net;
@@ -33,29 +33,29 @@ const Allocator = std.mem.Allocator;
const Server = @This();
app: *App,
allocator: Allocator,
json_version_response: []const u8,
// Thread management
active_threads: std.atomic.Value(u32) = .init(0),
clients: std.ArrayList(*Client) = .{},
client_mutex: std.Thread.Mutex = .{},
clients_pool: std.heap.MemoryPool(Client),
pending: std.ArrayList(*CDP) = .{},
conns: std.ArrayList(*CDP) = .{},
conns_mutex: std.Thread.Mutex = .{},
conns_pool: std.heap.MemoryPool(CDP),
pub fn init(app: *App, address: net.Address) !*Server {
const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(app);
errdefer allocator.free(json_version_response);
errdefer app.allocator.free(json_version_response);
const self = try allocator.create(Server);
errdefer allocator.destroy(self);
const self = try app.allocator.create(Server);
errdefer app.allocator.destroy(self);
self.* = .{
.app = app,
.allocator = allocator,
.conns_pool = .init(app.allocator),
.json_version_response = json_version_response,
.clients_pool = std.heap.MemoryPool(Client).init(allocator),
};
errdefer self.conns_pool.deinit();
var bound_address = address;
try self.app.network.bind(&bound_address, self, onAccept);
@@ -65,21 +65,34 @@ pub fn init(app: *App, address: net.Address) !*Server {
}
pub fn shutdown(self: *Server) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
self.conns_mutex.lock();
defer self.conns_mutex.unlock();
for (self.clients.items) |client| {
client.stop();
self.app.network.unbind();
for (self.conns.items) |cdp| {
cdp.browser.env.terminate();
cdp.ws.sendClose();
cdp.ws.shutdown();
}
for (self.pending.items) |conn| {
conn.ws.shutdown();
}
}
pub fn deinit(self: *Server) void {
self.shutdown();
self.joinThreads();
self.clients.deinit(self.allocator);
self.clients_pool.deinit();
self.allocator.free(self.json_version_response);
self.allocator.destroy(self);
while (self.active_threads.load(.monotonic) > 0) {
std.Thread.sleep(10 * std.time.ns_per_ms);
}
self.conns.deinit(self.app.allocator);
self.pending.deinit(self.app.allocator);
self.conns_pool.deinit();
self.app.allocator.free(self.json_version_response);
self.app.allocator.destroy(self);
}
fn onAccept(ctx: *anyopaque, socket: posix.socket_t) void {
@@ -90,102 +103,6 @@ fn onAccept(ctx: *anyopaque, socket: posix.socket_t) void {
};
}
// Liveness is enforced at the TCP layer via keepalive probes sent by the
// kernel. This is transparent to CDP clients — unlike a WebSocket ping, which
// go-rod panics on and chromedp logs as "malformed". Tunables in Config.zig.
fn setTcpKeepalive(socket: posix.socket_t) void {
posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(c_int, 1))) catch |err| {
log.warn(.app, "SO_KEEPALIVE", .{ .err = err });
return;
};
const option = switch (@import("builtin").os.tag) {
.macos, .ios => posix.TCP.KEEPALIVE,
else => posix.TCP.KEEPIDLE,
};
posix.setsockopt(socket, posix.IPPROTO.TCP, option, &std.mem.toBytes(Config.CDP_KEEPALIVE_IDLE_S)) catch |err| {
log.warn(.app, "TCP_KEEPIDLE", .{ .err = err });
};
if (@hasDecl(posix.TCP, "KEEPINTVL")) {
posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPINTVL, &std.mem.toBytes(Config.CDP_KEEPALIVE_INTVL_S)) catch |err| {
log.warn(.app, "TCP_KEEPINTVL", .{ .err = err });
};
}
if (@hasDecl(posix.TCP, "KEEPCNT")) {
posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPCNT, &std.mem.toBytes(Config.CDP_KEEPALIVE_CNT)) catch |err| {
log.warn(.app, "TCP_KEEPCNT", .{ .err = err });
};
}
}
fn handleConnection(self: *Server, socket: posix.socket_t) void {
defer posix.close(socket);
setTcpKeepalive(socket);
// Client is HUGE (> 512KB) because it has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const client = self.getClient() catch |err| {
log.err(.app, "CDP client create", .{ .err = err });
return;
};
defer self.releaseClient(client);
client.* = Client.init(
socket,
self.allocator,
self.app,
self.json_version_response,
) catch |err| {
log.err(.app, "CDP client init", .{ .err = err });
return;
};
defer client.deinit();
self.registerClient(client);
defer self.unregisterClient(client);
// Check shutdown after registering to avoid missing the stop signal.
// If deinit() already iterated over clients, this client won't receive stop()
// and would block joinThreads() indefinitely.
if (self.app.shutdown()) {
return;
}
client.start();
}
fn getClient(self: *Server) !*Client {
self.client_mutex.lock();
defer self.client_mutex.unlock();
return self.clients_pool.create();
}
fn releaseClient(self: *Server, client: *Client) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
self.clients_pool.destroy(client);
}
fn registerClient(self: *Server, client: *Client) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
self.clients.append(self.allocator, client) catch {};
}
fn unregisterClient(self: *Server, client: *Client) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
for (self.clients.items, 0..) |c, i| {
if (c == client) {
_ = self.clients.swapRemove(i);
break;
}
}
}
fn spawnWorker(self: *Server, socket: posix.socket_t) !void {
if (self.app.shutdown()) {
return error.ShuttingDown;
@@ -213,300 +130,109 @@ fn spawnWorker(self: *Server, socket: posix.socket_t) !void {
}
errdefer _ = self.active_threads.fetchSub(1, .monotonic);
const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket });
const thread = try std.Thread.spawn(.{}, handleConnection, .{ self, socket });
thread.detach();
}
fn runWorker(self: *Server, socket: posix.socket_t) void {
fn handleConnection(self: *Server, socket: posix.socket_t) void {
defer _ = self.active_threads.fetchSub(1, .monotonic);
handleConnection(self, socket);
}
defer posix.close(socket);
fn joinThreads(self: *Server) void {
while (self.active_threads.load(.monotonic) > 0) {
std.Thread.sleep(10 * std.time.ns_per_ms);
// CDP is HUGE (> 512KB) because WsConnection has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const cdp = self.allocConn() catch |err| {
log.err(.app, "CDP alloc", .{ .err = err });
return;
};
defer self.releaseConn(cdp);
cdp.init(self.app, socket, self.json_version_response) catch |err| {
log.err(.app, "CDP init", .{ .err = err });
return;
};
defer cdp.deinit();
if (log.enabled(.app, .info)) {
const client_address = cdp.ws.getAddress() catch null;
log.info(.app, "client connected", .{ .ip = client_address });
}
self.registerHandshake(cdp);
const handshake_result = cdp.ws.handshake();
self.unregisterHandshake(cdp);
const upgraded = handshake_result catch |err| {
log.err(.app, "CDP handshake", .{ .err = err });
return;
};
if (!upgraded) return;
self.registerConn(cdp);
defer self.unregisterConn(cdp);
// Check shutdown after registering to avoid missing the stop signal.
// If shutdown() already iterated over conns, this conn won't be terminated
// and would block deinit() indefinitely.
if (self.app.shutdown()) {
return;
}
while (true) {
const next = cdp.tick() catch |err| {
log.err(.app, "cdp tick", .{ .err = err });
return;
};
if (!next) break;
}
}
// Handle exactly one TCP connection.
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
mode: union(enum) {
http: void,
cdp: CDP,
},
fn registerHandshake(self: *Server, conn: *CDP) void {
self.conns_mutex.lock();
defer self.conns_mutex.unlock();
allocator: Allocator,
app: *App,
http: *HttpClient,
ws: Net.WsConnection,
self.pending.append(self.app.allocator, conn) catch {};
}
pub fn init(
socket: posix.socket_t,
allocator: Allocator,
app: *App,
json_version_response: []const u8,
) !Client {
var ws = try Net.WsConnection.init(socket, allocator, json_version_response);
errdefer ws.deinit();
fn unregisterHandshake(self: *Server, conn: *CDP) void {
self.conns_mutex.lock();
defer self.conns_mutex.unlock();
if (log.enabled(.app, .info)) {
const client_address = ws.getAddress() catch null;
log.info(.app, "client connected", .{ .ip = client_address });
}
const http = try HttpClient.init(allocator, &app.network);
errdefer http.deinit();
return .{
.allocator = allocator,
.app = app,
.http = http,
.ws = ws,
.mode = .{ .http = {} },
};
}
fn stop(self: *Client) void {
switch (self.mode) {
.http => {},
.cdp => |*cdp| {
cdp.browser.env.terminate();
self.ws.sendClose();
},
}
self.ws.shutdown();
}
pub fn deinit(self: *Client) void {
switch (self.mode) {
.cdp => |*cdp| cdp.deinit(),
.http => {},
}
self.ws.deinit();
self.http.deinit();
}
fn start(self: *Client) void {
const http = self.http;
http.cdp_client = .{
.socket = self.ws.socket,
.ctx = self,
.blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop,
};
defer http.cdp_client = null;
self.httpLoop(http) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
fn httpLoop(self: *Client, http: *HttpClient) !void {
lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{});
// Liveness is enforced by TCP keepalive configured in
// Server.setTcpKeepalive; the kernel closes dead sockets, which
// surfaces as EOF/error from readSocket. The loop blocks for ~24 days
// on each poll rather than tracking app-level timeouts. Capped at
// i32-max because HttpClient.tick narrows to c_int.
const wait_ms: u32 = std.math.maxInt(i32);
while (true) {
const status = http.tick(wait_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) continue;
if (self.readSocket() == false) {
return;
}
if (self.mode == .cdp) {
break;
}
}
var cdp = &self.mode.cdp;
while (true) {
const result = cdp.pageWait(wait_ms) catch |wait_err| switch (wait_err) {
error.NoPage => {
const status = http.tick(wait_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) continue;
if (self.readSocket() == false) {
return;
}
continue;
},
else => return wait_err,
};
switch (result) {
.cdp_socket => {
if (self.readSocket() == false) {
return;
}
},
.done => {},
}
for (self.pending.items, 0..) |w, i| {
if (w == conn) {
_ = self.pending.swapRemove(i);
break;
}
}
}
fn blockingReadStart(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
self.ws.setBlocking(true) catch |err| {
log.warn(.app, "CDP blockingReadStart", .{ .err = err });
return false;
};
return true;
}
fn allocConn(self: *Server) !*CDP {
self.conns_mutex.lock();
defer self.conns_mutex.unlock();
return self.conns_pool.create();
}
fn blockingRead(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
return self.readSocket();
}
fn releaseConn(self: *Server, conn: *CDP) void {
self.conns_mutex.lock();
defer self.conns_mutex.unlock();
self.conns_pool.destroy(conn);
}
fn blockingReadStop(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
self.ws.setBlocking(false) catch |err| {
log.warn(.app, "CDP blockingReadStop", .{ .err = err });
return false;
};
return true;
}
fn registerConn(self: *Server, conn: *CDP) void {
self.conns_mutex.lock();
defer self.conns_mutex.unlock();
self.conns.append(self.app.allocator, conn) catch {};
}
fn readSocket(self: *Client) bool {
const n = self.ws.read() catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
}
return self.processData() catch false;
}
fn processData(self: *Client) !bool {
switch (self.mode) {
.cdp => |*cdp| return self.processWebsocketMessage(cdp),
.http => return self.processHTTPRequest(),
fn unregisterConn(self: *Server, conn: *CDP) void {
self.conns_mutex.lock();
defer self.conns_mutex.unlock();
for (self.conns.items, 0..) |c, i| {
if (c == conn) {
_ = self.conns.swapRemove(i);
break;
}
}
fn processHTTPRequest(self: *Client) !bool {
lp.assert(self.ws.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.ws.reader.pos });
const request = self.ws.reader.buf[0..self.ws.reader.len];
if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) {
self.writeHTTPErrorResponse(413, "Request too large");
return error.RequestTooLarge;
}
// we're only expecting [body-less] GET requests.
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
// we need more data, put any more data here
return true;
}
// the next incoming data can go to the front of our buffer
defer self.ws.reader.len = 0;
return self.handleHTTPRequest(request) catch |err| {
switch (err) {
error.NotFound => self.writeHTTPErrorResponse(404, "Not found"),
error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"),
error.InvalidProtocol => self.writeHTTPErrorResponse(400, "Invalid HTTP protocol"),
error.MissingHeaders => self.writeHTTPErrorResponse(400, "Missing required header"),
error.InvalidUpgradeHeader => self.writeHTTPErrorResponse(400, "Unsupported upgrade type"),
error.InvalidVersionHeader => self.writeHTTPErrorResponse(400, "Invalid websocket version"),
error.InvalidConnectionHeader => self.writeHTTPErrorResponse(400, "Invalid connection header"),
else => {
log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] });
self.writeHTTPErrorResponse(500, "Internal Server Error");
},
}
return err;
};
}
fn handleHTTPRequest(self: *Client, request: []u8) !bool {
if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest;
}
if (std.mem.eql(u8, request[0..4], "GET ") == false) {
return error.NotFound;
}
const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse {
return error.InvalidRequest;
};
const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) {
try self.upgradeConnection(request);
return true;
}
if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) {
try self.ws.send(self.ws.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the
// websocket upgrade) blocks until the first one times out.
// We can avoid that by closing the connection. json_version_response
// has a Connection: Close header too.
self.ws.shutdown();
return false;
}
if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or
std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/"))
{
try self.ws.send(empty_json_list_response);
self.ws.shutdown();
return false;
}
return error.NotFound;
}
fn upgradeConnection(self: *Client, request: []u8) !void {
try self.ws.upgrade(request);
self.mode = .{ .cdp = try CDP.init(self) };
}
fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void {
self.ws.sendHttpError(status, body);
}
fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool {
return self.ws.processMessages(cdp);
}
pub fn sendAllocator(self: *Client) Allocator {
return self.ws.send_arena.allocator();
}
pub fn sendJSON(self: *Client, message: anytype, opts: std.json.Stringify.Options) !void {
return self.ws.sendJSON(message, opts);
}
pub fn sendJSONRaw(self: *Client, buf: std.ArrayList(u8)) !void {
return self.ws.sendJSONRaw(buf);
}
};
}
// Utils
// --------
@@ -545,13 +271,6 @@ fn buildJSONVersionResponse(
return try std.fmt.allocPrint(app.allocator, response_format, .{ body_len, host, port });
}
const empty_json_list_response =
"HTTP/1.1 200 OK\r\n" ++
"Content-Length: 2\r\n" ++
"Connection: Close\r\n" ++
"Content-Type: application/json; charset=UTF-8\r\n\r\n" ++
"[]";
pub const timestamp = @import("datetime.zig").timestamp;
pub const milliTimestamp = @import("datetime.zig").milliTimestamp;
@@ -901,7 +620,7 @@ fn createTestClient() !TestClient {
const TestClient = struct {
stream: std.net.Stream,
buf: [1024]u8 = undefined,
reader: Net.Reader(false),
reader: WsConnection.Reader(false),
fn deinit(self: *TestClient) void {
self.stream.close();
@@ -968,7 +687,7 @@ const TestClient = struct {
"Sec-Websocket-Accept: flzHu2DevQ2dSCSVqKSii5e9C2o=\r\n\r\n", res);
}
fn readWebsocketMessage(self: *TestClient) !?Net.Message {
fn readWebsocketMessage(self: *TestClient) !?WsConnection.Message {
while (true) {
const n = try self.stream.read(self.reader.readBuf());
if (n == 0) {

View File

@@ -40,37 +40,38 @@ app: *App,
session: ?Session,
allocator: Allocator,
arena_pool: *ArenaPool,
http_client: *HttpClient,
http_client: HttpClient,
// used by sessions to allocate pages.
page_pool: std.heap.MemoryPool(Page),
const InitOpts = struct {
env: js.Env.InitOpts = .{},
http_client: *HttpClient,
};
pub fn init(app: *App, opts: InitOpts) !Browser {
pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp_client: ?HttpClient.CDPClient) !void {
const allocator = app.allocator;
var env = try js.Env.init(app, opts.env);
errdefer env.deinit();
return .{
self.* = .{
.app = app,
.env = env,
.session = null,
.allocator = allocator,
.arena_pool = &app.arena_pool,
.http_client = opts.http_client,
.http_client = undefined,
.page_pool = std.heap.MemoryPool(Page).init(allocator),
};
try self.http_client.init(allocator, &app.network, cdp_client);
}
pub fn deinit(self: *Browser) void {
self.closeSession();
self.env.deinit();
self.page_pool.deinit();
self.http_client.deinit();
}
pub fn newSession(self: *Browser, notification: *Notification) !*Session {

View File

@@ -325,7 +325,7 @@ pub fn init(self: *Frame, frame_id: u32, page: *Page, parent: ?*Frame) !void {
errdefer self._style_manager.deinit();
const browser = session.browser;
self._script_manager = ScriptManager.init(browser.allocator, browser.http_client, self);
self._script_manager = ScriptManager.init(browser.allocator, &browser.http_client, self);
errdefer self._script_manager.deinit();
self.js = try browser.env.createContext(self, .{
@@ -605,7 +605,7 @@ pub fn navigate(self: *Frame, request_url: [:0]const u8, opts: NavigateOpts) !vo
return;
}
var http_client = session.browser.http_client;
const http_client = &session.browser.http_client;
self.url = try self.arena.dupeZ(u8, request_url);
self.origin = try URL.getOrigin(self.arena, self.url);

View File

@@ -166,23 +166,21 @@ pub const CDPClient = struct {
blocking_read_end: *const fn (*anyopaque) bool,
};
pub fn init(allocator: Allocator, network: *Network) !*Client {
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void {
var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator);
errdefer transfer_pool.deinit();
const client = try allocator.create(Client);
errdefer allocator.destroy(client);
var handles = try http.Handles.init(network.config);
errdefer handles.deinit();
const http_proxy = network.config.httpProxy();
client.* = .{
self.* = Client{
.handles = handles,
.network = network,
.allocator = allocator,
.transfer_pool = transfer_pool,
.cdp_client = cdp_client,
.use_proxy = http_proxy != null,
.http_proxy = http_proxy,
@@ -197,25 +195,23 @@ pub fn init(allocator: Allocator, network: *Network) !*Client {
.entry_layer = undefined,
};
var next = client.layer();
var next = self.layer();
if (network.config.obeyRobots()) {
next = layerWith(&client.robots_layer, next);
next = layerWith(&self.robots_layer, next);
}
if (network.config.httpCacheDir() != null) {
next = layerWith(&client.cache_layer, next);
next = layerWith(&self.cache_layer, next);
}
next = layerWith(&client.interception_layer, next);
next = layerWith(&self.interception_layer, next);
if (network.config.webBotAuth() != null) {
next = layerWith(&client.web_bot_auth_layer, next);
next = layerWith(&self.web_bot_auth_layer, next);
}
client.entry_layer = next;
return client;
self.entry_layer = next;
}
pub fn deinit(self: *Client) void {
@@ -226,8 +222,6 @@ pub fn deinit(self: *Client) void {
self.clearUserAgentOverride();
self.robots_layer.deinit(self.allocator);
self.allocator.destroy(self);
}
pub fn layer(self: *Client) Layer {

View File

@@ -45,7 +45,7 @@ pub fn init(session: *Session, _: Opts) !Runner {
return .{
.frame = frame,
.session = session,
.http_client = session.browser.http_client,
.http_client = &session.browser.http_client,
};
}

View File

@@ -92,7 +92,7 @@ pub fn init(url: []const u8, exec: *Execution) !*Worker {
return self;
}
const http_client = session.browser.http_client;
const http_client = &session.browser.http_client;
http_client.request(.{
.ctx = self,
.params = .{

View File

@@ -130,7 +130,7 @@ pub fn init(worker: *Worker, url: [:0]const u8) !*WorkerGlobalScope {
self._script_manager = ScriptManagerBase.init(
arena,
session.browser.http_client,
&session.browser.http_client,
.{ .worker = self },
);
@@ -361,7 +361,7 @@ fn importScript(self: *WorkerGlobalScope, arena: Allocator, url: [:0]const u8) !
const resolved_url = try URL.resolve(arena, self.url, url, .{});
const http_client = session.browser.http_client;
const http_client = &session.browser.http_client;
var headers = try http_client.newHeaders();
try self.headersForRequest(&headers);

View File

@@ -77,7 +77,7 @@ pub fn init(input: Input, options: ?InitOpts, exec: *const Execution) !js.Promis
};
const session = exec.context.page.session;
const http_client = session.browser.http_client;
const http_client = &session.browser.http_client;
var headers = try http_client.newHeaders();
if (request._headers) |h| {
try h.populateHttpHeader(exec.call_arena, &headers);

View File

@@ -115,7 +115,7 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
const resolved_url = try URL.resolve(arena, frame.base(), url, .{ .always_dupe = true, .encoding = frame.charset });
const http_client = frame._session.browser.http_client;
const http_client = &frame._session.browser.http_client;
const conn = http_client.network.newConnection() orelse {
return error.NoFreeConnection;
};

View File

@@ -242,7 +242,7 @@ pub fn send(self: *XMLHttpRequest, body_: ?[]const u8) !void {
}
const session = exec.context.page.session;
const http_client = session.browser.http_client;
const http_client = &session.browser.http_client;
var headers = try http_client.newHeaders();
// Only add cookies for same-origin or when withCredentials is true

View File

@@ -19,8 +19,8 @@
const std = @import("std");
const lp = @import("lightpanda");
const App = @import("../App.zig");
const Notification = @import("../Notification.zig");
const Client = @import("../Server.zig").Client;
const js = @import("../browser/js/js.zig");
const Browser = @import("../browser/Browser.zig");
const Session = @import("../browser/Session.zig");
@@ -29,12 +29,15 @@ const Mime = @import("../browser/Mime.zig");
const Element = @import("../browser/webapi/Element.zig");
const Label = @import("../browser/webapi/element/html/Label.zig");
const Request = @import("../browser/HttpClient.zig").Request;
const CDPClient = @import("../browser/HttpClient.zig").CDPClient;
const WsConnection = @import("../network/WsConnection.zig");
const Incrementing = @import("id.zig").Incrementing;
const InterceptState = @import("domains/fetch.zig").InterceptState;
const log = lp.log;
const json = std.json;
const posix = std.posix;
const Allocator = std.mem.Allocator;
pub const URL_BASE = "chrome://newtab/";
@@ -48,10 +51,10 @@ const BrowserContextIdGen = Incrementing(u32, "BID");
// Generic so that we can inject mocks into it.
const CDP = @This();
// Used for sending message to the client and closing on error
client: *Client,
allocator: Allocator,
app: *App,
ws: WsConnection,
// The active browser
browser: Browser,
@@ -79,18 +82,18 @@ frame_arena: std.heap.ArenaAllocator,
// (or altogether eliminate) our use of this.
browser_context_arena: std.heap.ArenaAllocator,
pub fn init(client: *Client) !CDP {
const app = client.app;
pub fn init(
self: *CDP,
app: *App,
socket: posix.socket_t,
json_version_response: []const u8,
) !void {
const allocator = app.allocator;
const browser = try Browser.init(app, .{
.env = .{ .with_inspector = true },
.http_client = client.http,
});
errdefer browser.deinit();
return .{
.client = client,
.browser = browser,
self.* = .{
.app = app,
.ws = undefined,
.browser = undefined,
.allocator = allocator,
.browser_context = null,
.frame_arena = std.heap.ArenaAllocator.init(allocator),
@@ -98,6 +101,17 @@ pub fn init(client: *Client) !CDP {
.notification_arena = std.heap.ArenaAllocator.init(allocator),
.browser_context_arena = std.heap.ArenaAllocator.init(allocator),
};
try self.ws.init(socket, self.app.allocator, json_version_response);
errdefer self.ws.deinit();
try self.browser.init(app, .{ .env = .{ .with_inspector = true } }, .{
.ctx = self,
.socket = socket,
.blocking_read_start = CDP.blockingReadStart,
.blocking_read = CDP.blockingRead,
.blocking_read_end = CDP.blockingReadStop,
});
}
pub fn deinit(self: *CDP) void {
@@ -109,6 +123,48 @@ pub fn deinit(self: *CDP) void {
self.message_arena.deinit();
self.notification_arena.deinit();
self.browser_context_arena.deinit();
self.ws.deinit();
}
pub fn blockingReadStart(ctx: *anyopaque) bool {
const self: *CDP = @ptrCast(@alignCast(ctx));
self.ws.setBlocking(true) catch |err| {
log.warn(.app, "CDP blockingReadStart", .{ .err = err });
return false;
};
return true;
}
pub fn blockingRead(ctx: *anyopaque) bool {
const self: *CDP = @ptrCast(@alignCast(ctx));
return self.readSocket();
}
pub fn blockingReadStop(ctx: *anyopaque) bool {
const self: *CDP = @ptrCast(@alignCast(ctx));
self.ws.setBlocking(false) catch |err| {
log.warn(.app, "CDP blockingReadStop", .{ .err = err });
return false;
};
return true;
}
pub fn readSocket(self: *CDP) bool {
const n = self.ws.read() catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
}
return self.ws.processMessages(self) catch false;
}
pub fn sendJSON(self: *CDP, message: anytype) !void {
try self.ws.sendJSON(message, .{ .emit_null_optional_fields = false });
}
pub fn handleMessage(self: *CDP, msg: []const u8) bool {
@@ -133,6 +189,29 @@ pub fn pageWait(self: *CDP, ms: u32) !Session.Runner.CDPWaitResult {
return runner.waitCDP(.{ .ms = ms });
}
pub fn tick(self: *CDP) !bool {
// Liveness is enforced by TCP keepalive configured in
// Network.acceptConnections; the wakeup lets V8 run or terminate.
const wait_ms: u32 = 1000; // 1s
const result = self.pageWait(wait_ms) catch |wait_err| switch (wait_err) {
error.NoPage => {
const status = self.browser.http_client.tick(wait_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return false;
};
return status != .cdp_socket or self.readSocket();
},
else => return wait_err,
};
if (result == .cdp_socket) {
return self.readSocket();
}
return true;
}
// Called from above, in processMessage which handles client messages
// but can also be called internally. For example, Target.sendMessageToTarget
// calls back into dispatch to capture the response.
@@ -303,12 +382,6 @@ pub fn sendEvent(self: *CDP, method: []const u8, p: anytype, opts: SendEventOpts
});
}
pub fn sendJSON(self: *CDP, message: anytype) !void {
return self.client.sendJSON(message, .{
.emit_null_optional_fields = false,
});
}
pub const BrowserContext = struct {
const Node = @import("Node.zig");
const AXNode = @import("AXNode.zig");
@@ -414,7 +487,7 @@ pub const BrowserContext = struct {
errdefer notification.deinit();
const session = try cdp.browser.newSession(notification);
if (cdp.client.app.config.cookieFile()) |cookie_path| {
if (cdp.app.config.cookieFile()) |cookie_path| {
lp.cookies.loadFromFile(session, cookie_path);
}
@@ -470,7 +543,7 @@ pub const BrowserContext = struct {
// abort all intercepted requests before closing the session/page
// since some of these might callback into the page/scriptmanager
const http_client = browser.http_client;
const http_client = &browser.http_client;
for (self.intercept_state.pendingIntercepts()) |intercept| {
defer {
lp.assert(
@@ -804,7 +877,7 @@ pub const BrowserContext = struct {
};
const cdp = self.cdp;
const allocator = cdp.client.sendAllocator();
const allocator = cdp.ws.send_arena.allocator();
const field = ",\"sessionId\":\"";
@@ -830,7 +903,7 @@ pub const BrowserContext = struct {
std.debug.assert(buf.items.len == message_len);
}
try cdp.client.sendJSONRaw(buf);
try cdp.ws.sendJSONRaw(buf);
}
};

View File

@@ -98,7 +98,7 @@ pub fn setUserAgentOverride(cmd: *CDP.Command) !void {
};
const bc = cmd.browser_context orelse return error.BrowserContextNotLoaded;
const http_client = cmd.cdp.browser.http_client;
const http_client = &cmd.cdp.browser.http_client;
try http_client.setUserAgentOverride(ua);
bc.user_agent_changed = true;

View File

@@ -286,7 +286,7 @@ fn continueRequest(cmd: *CDP.Command) !void {
}
// todo: replace.
const client = bc.cdp.browser.http_client;
const client = &bc.cdp.browser.http_client;
try client.interception_layer.continueRequest(client, request);
return cmd.sendResult(null, .{});
}
@@ -321,7 +321,7 @@ fn continueWithAuth(cmd: *CDP.Command) !void {
.response = params.authChallengeResponse.response,
});
const client = bc.cdp.browser.http_client;
const client = &bc.cdp.browser.http_client;
if (params.authChallengeResponse.response != .ProvideCredentials) {
transfer.abortAuthChallenge();
@@ -385,7 +385,7 @@ fn fulfillRequest(cmd: *CDP.Command) !void {
body = buf;
}
const client = bc.cdp.browser.http_client;
const client = &bc.cdp.browser.http_client;
try client.interception_layer.fulfillRequest(client, request, params.responseCode, params.responseHeaders orelse &.{}, body);
return cmd.sendResult(null, .{});
}
@@ -403,7 +403,7 @@ fn failRequest(cmd: *CDP.Command) !void {
const pending = intercept_state.remove(request_id) orelse return error.RequestNotFound;
const request = pending.request;
const client = bc.cdp.browser.http_client;
const client = &bc.cdp.browser.http_client;
defer client.interception_layer.abortRequest(client, request);
log.info(.cdp, "request intercept", .{

View File

@@ -22,6 +22,8 @@ const posix = std.posix;
const CDP = @import("CDP.zig");
const Server = @import("../Server.zig");
const Net = @import("../network/WsConnection.zig");
const HttpClient = @import("../browser/HttpClient.zig");
const base = @import("../testing.zig");
pub const allocator = base.allocator;
@@ -37,26 +39,25 @@ pub const LogFilter = base.LogFilter;
const TestContext = struct {
read_at: usize = 0,
read_buf: [1024 * 32]u8 = undefined,
cdp_: ?CDP = null,
client: Server.Client,
cdp_: CDP = undefined,
cdp_initialized: bool = false,
cdp_socket: posix.socket_t,
socket: posix.socket_t,
received: std.ArrayList(json.Value) = .empty,
received_raw: std.ArrayList([]const u8) = .empty,
pub fn deinit(self: *TestContext) void {
if (self.cdp_) |*c| {
c.deinit();
}
self.client.deinit();
if (self.cdp_initialized) self.cdp_.deinit();
posix.close(self.socket);
base.reset();
}
pub fn cdp(self: *TestContext) *CDP {
if (self.cdp_ == null) {
self.cdp_ = CDP.init(&self.client) catch |err| @panic(@errorName(err));
if (!self.cdp_initialized) {
self.cdp_.init(base.test_app, self.cdp_socket, "json-version") catch |err| @panic(@errorName(err));
self.cdp_initialized = true;
}
return &self.cdp_.?;
return &self.cdp_;
}
const BrowserContextOpts = struct {
@@ -202,12 +203,10 @@ const TestContext = struct {
return;
}
if (self.cdp_) |*cdp__| {
if (cdp__.browser_context) |*bc| {
if (bc.session.hasPage()) {
var runner = try bc.session.runner(.{});
_ = try runner.tick(.{ .ms = 1000 });
}
if (self.cdp_.browser_context) |*bc| {
if (bc.session.hasPage()) {
var runner = try bc.session.runner(.{});
_ = try runner.tick(.{ .ms = 1000 });
}
}
std.Thread.sleep(5 * std.time.ns_per_ms);
@@ -315,10 +314,8 @@ pub fn context() !TestContext {
try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.RCVBUF, &std.mem.toBytes(@as(c_int, 32_768)));
try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.SNDBUF, &std.mem.toBytes(@as(c_int, 32_768)));
const client = try Server.Client.init(pair[1], base.arena_allocator, base.test_app, "json-version");
return .{
.client = client,
.cdp_socket = pair[1],
.socket = pair[0],
};
}

View File

@@ -248,29 +248,26 @@ noinline fn assertionFailure(comptime ctx: []const u8, args: anytype) noreturn {
// Reference counting helper
pub fn RC(comptime T: type) type {
return struct {
_refs: T = 0,
_refs: std.atomic.Value(T) = .init(0),
pub fn init(refs: T) @This() {
return .{ ._refs = refs };
return .{ ._refs = .init(refs) };
}
pub fn acquire(self: *@This()) void {
self._refs += 1;
_ = self._refs.fetchAdd(1, .monotonic);
}
pub fn release(self: *@This(), value: anytype, page: *Page) void {
assert(self._refs > 0, "release overflow", .{ .type = @typeName(@TypeOf(value)) });
const refs = self._refs - 1;
self._refs = refs;
if (refs > 0) {
return;
const prev = self._refs.fetchSub(1, .acq_rel);
assert(prev > 0, "release overflow", .{ .type = @typeName(@TypeOf(value)) });
if (prev == 1) {
value.deinit(page);
}
value.deinit(page);
}
pub fn format(self: @This(), writer: *std.Io.Writer) !void {
return writer.print("{d}", .{self._refs});
return writer.print("{d}", .{self._refs.load(.monotonic)});
}
};
}

View File

@@ -210,13 +210,8 @@ const FetchTerminator = struct {
fn fetchThread(app: *App, ft: *FetchTerminator, url: [:0]const u8, fetch_opts: lp.FetchOpts) void {
defer app.network.stop();
const http_client = lp.HttpClient.init(app.allocator, &app.network) catch |err| {
log.fatal(.app, "http client init error", .{ .err = err });
return;
};
defer http_client.deinit();
var browser = lp.Browser.init(app, .{ .http_client = http_client }) catch |err| {
var browser: lp.Browser = undefined;
browser.init(app, .{}, null) catch |err| {
log.fatal(.app, "browser init error", .{ .err = err });
return;
};

View File

@@ -44,10 +44,8 @@ pub fn main() !void {
var test_arena = std.heap.ArenaAllocator.init(allocator);
defer test_arena.deinit();
const http_client = try lp.HttpClient.init(allocator, &app.network);
defer http_client.deinit();
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
var browser: lp.Browser = undefined;
try browser.init(app, .{}, null);
defer browser.deinit();
const notification = try lp.Notification.init(allocator);

View File

@@ -3,7 +3,6 @@ const std = @import("std");
const lp = @import("lightpanda");
const App = @import("../App.zig");
const HttpClient = @import("../browser/HttpClient.zig");
const testing = @import("../testing.zig");
const protocol = @import("protocol.zig");
const router = @import("router.zig");
@@ -14,7 +13,6 @@ const Self = @This();
allocator: std.mem.Allocator,
app: *App,
http_client: *HttpClient,
notification: *lp.Notification,
browser: lp.Browser,
session: *lp.Session,
@@ -25,30 +23,26 @@ mutex: std.Thread.Mutex = .{},
aw: std.io.Writer.Allocating,
pub fn init(allocator: std.mem.Allocator, app: *App, writer: *std.io.Writer) !*Self {
const http_client = try HttpClient.init(allocator, &app.network);
errdefer http_client.deinit();
const notification = try lp.Notification.init(allocator);
errdefer notification.deinit();
const self = try allocator.create(Self);
errdefer allocator.destroy(self);
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
errdefer browser.deinit();
self.* = .{
.allocator = allocator,
.app = app,
.writer = writer,
.browser = browser,
.browser = undefined,
.aw = .init(allocator),
.http_client = http_client,
.notification = notification,
.session = undefined,
.node_registry = CDPNode.Registry.init(allocator),
};
try self.browser.init(app, .{}, null);
errdefer self.browser.deinit();
self.session = try self.browser.newSession(self.notification);
if (app.config.cookieFile()) |cookie_path| {
@@ -67,7 +61,6 @@ pub fn deinit(self: *Self) void {
self.aw.deinit();
self.browser.deinit();
self.notification.deinit();
self.http_client.deinit();
self.allocator.destroy(self);
}

View File

@@ -70,6 +70,7 @@ ws_mutex: std.Thread.Mutex = .{},
pollfds: []posix.pollfd,
listener: ?Listener = null,
accept: std.atomic.Value(bool) = .init(true),
// Wakeup pipe: workers write to [1], main thread polls [0]
wakeup_pipe: [2]posix.fd_t = .{ -1, -1 },
@@ -355,6 +356,10 @@ pub fn bind(
ctx: *anyopaque,
on_accept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void,
) !void {
if (self.listener != null) return error.TooManyListeners;
self.accept.store(true, .release);
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
errdefer posix.close(listener);
@@ -374,8 +379,6 @@ pub fn bind(
try posix.getsockname(listener, @ptrCast(&bound), &bound_len);
address.* = net.Address.initPosix(@ptrCast(@alignCast(&bound)));
if (self.listener != null) return error.TooManyListeners;
self.listener = .{
.socket = listener,
.ctx = ctx,
@@ -388,6 +391,11 @@ pub fn bind(
};
}
pub fn unbind(self: *Network) void {
self.accept.store(false, .release);
self.wakeupPoll();
}
pub fn onTick(self: *Network, ctx: *anyopaque, callback: *const fn (*anyopaque) void) void {
self.callbacks_mutex.lock();
defer self.callbacks_mutex.unlock();
@@ -424,6 +432,12 @@ pub fn run(self: *Network) void {
// telemetry, but we stop accepting new connections. It is the responsibility
// of external code to terminate its requests upon shutdown.
while (true) {
if (self.listener != null and !self.accept.load(.acquire)) {
posix.close(self.listener.?.socket);
self.listener = null;
self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 };
}
self.drainQueue();
if (self.multi) |multi| {
@@ -574,6 +588,28 @@ fn acceptConnections(self: *Network) void {
}
};
// Liveness is enforced at the TCP layer via keepalive probes sent by the
// kernel. This is transparent to CDP clients — unlike a WebSocket ping, which
// go-rod panics on and chromedp logs as "malformed". Tunables in Config.zig.
posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(c_int, 1))) catch |err| {
log.warn(.app, "SO_KEEPALIVE", .{ .err = err });
return;
};
const option = switch (@import("builtin").os.tag) {
.macos, .ios => posix.TCP.KEEPALIVE,
else => posix.TCP.KEEPIDLE,
};
posix.setsockopt(socket, posix.IPPROTO.TCP, option, &std.mem.toBytes(Config.CDP_KEEPALIVE_IDLE_S)) catch |err| {
log.warn(.app, "TCP_KEEPIDLE", .{ .err = err });
};
posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPINTVL, &std.mem.toBytes(Config.CDP_KEEPALIVE_INTVL_S)) catch |err| {
log.warn(.app, "TCP_KEEPINTVL", .{ .err = err });
};
posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPCNT, &std.mem.toBytes(Config.CDP_KEEPALIVE_CNT)) catch |err| {
log.warn(.app, "TCP_KEEPCNT", .{ .err = err });
};
listener.onAccept(listener.ctx, socket);
}
}

View File

@@ -24,7 +24,8 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("lightpanda").log;
const assert = @import("lightpanda").assert;
const CDP_MAX_MESSAGE_SIZE = @import("../Config.zig").CDP_MAX_MESSAGE_SIZE;
const Config = @import("../Config.zig");
const CDP_MAX_MESSAGE_SIZE = Config.CDP_MAX_MESSAGE_SIZE;
const Fragments = struct {
type: Message.Type,
@@ -305,301 +306,429 @@ pub fn Reader(comptime EXPECT_MASK: bool) type {
};
}
pub const WsConnection = struct {
// CLOSE, 2 length, code
const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000
const CLOSE_GOING_AWAY = [_]u8{ 136, 2, 3, 233 }; // code: 1001
const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009
const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
pub const WsConnection = @This();
// CLOSE, 2 length, code
const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000
const CLOSE_GOING_AWAY = [_]u8{ 136, 2, 3, 233 }; // code: 1001
const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009
const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
socket: posix.socket_t,
socket_flags: usize,
reader: Reader(true),
send_arena: ArenaAllocator,
json_version_response: []const u8,
pub fn init(
self: *WsConnection,
socket: posix.socket_t,
socket_flags: usize,
reader: Reader(true),
send_arena: ArenaAllocator,
allocator: Allocator,
json_version_response: []const u8,
) !void {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
if (builtin.is_test == false) {
assert(socket_flags & nonblocking == nonblocking, "WsConnection.init blocking", .{});
}
pub fn init(socket: posix.socket_t, allocator: Allocator, json_version_response: []const u8) !WsConnection {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
if (builtin.is_test == false) {
assert(socket_flags & nonblocking == nonblocking, "WsConnection.init blocking", .{});
}
var reader = try Reader(true).init(allocator);
errdefer reader.deinit();
var reader = try Reader(true).init(allocator);
errdefer reader.deinit();
self.* = .{
.socket = socket,
.socket_flags = socket_flags,
.reader = reader,
.send_arena = ArenaAllocator.init(allocator),
.json_version_response = json_version_response,
};
}
return .{
.socket = socket,
.socket_flags = socket_flags,
.reader = reader,
.send_arena = ArenaAllocator.init(allocator),
.json_version_response = json_version_response,
pub fn deinit(self: *WsConnection) void {
self.reader.deinit();
self.send_arena.deinit();
}
pub fn send(self: *WsConnection, data: []const u8) !void {
var pos: usize = 0;
var changed_to_blocking: bool = false;
defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
defer if (changed_to_blocking) {
// We had to change our socket to blocking me to get our write out
// We need to change it back to non-blocking.
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
log.err(.app, "ws restore nonblocking", .{ .err = err });
};
}
};
pub fn deinit(self: *WsConnection) void {
self.reader.deinit();
self.send_arena.deinit();
}
pub fn send(self: *WsConnection, data: []const u8) !void {
var pos: usize = 0;
var changed_to_blocking: bool = false;
defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
defer if (changed_to_blocking) {
// We had to change our socket to blocking me to get our write out
// We need to change it back to non-blocking.
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
log.err(.app, "ws restore nonblocking", .{ .err = err });
};
LOOP: while (pos < data.len) {
const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
error.WouldBlock => {
// self.socket is nonblocking, because we don't want to block
// reads. But our life is a lot easier if we block writes,
// largely, because we don't have to maintain a queue of pending
// writes (which would each need their own allocations). So
// if we get a WouldBlock error, we'll switch the socket to
// blocking and switch it back to non-blocking after the write
// is complete. Doesn't seem particularly efficiently, but
// this should virtually never happen.
assert(changed_to_blocking == false, "WsConnection.double block", .{});
changed_to_blocking = true;
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
continue :LOOP;
},
else => return err,
};
LOOP: while (pos < data.len) {
const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
error.WouldBlock => {
// self.socket is nonblocking, because we don't want to block
// reads. But our life is a lot easier if we block writes,
// largely, because we don't have to maintain a queue of pending
// writes (which would each need their own allocations). So
// if we get a WouldBlock error, we'll switch the socket to
// blocking and switch it back to non-blocking after the write
// is complete. Doesn't seem particularly efficiently, but
// this should virtually never happen.
assert(changed_to_blocking == false, "WsConnection.double block", .{});
changed_to_blocking = true;
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
continue :LOOP;
},
else => return err,
};
if (written == 0) {
return error.Closed;
}
pos += written;
if (written == 0) {
return error.Closed;
}
pos += written;
}
}
const EMPTY_PONG = [_]u8{ 138, 0 };
const EMPTY_PONG = [_]u8{ 138, 0 };
fn sendPong(self: *WsConnection, data: []const u8) !void {
if (data.len == 0) {
return self.send(&EMPTY_PONG);
fn sendPong(self: *WsConnection, data: []const u8) !void {
if (data.len == 0) {
return self.send(&EMPTY_PONG);
}
var header_buf: [10]u8 = undefined;
const header = websocketHeader(&header_buf, .pong, data.len);
const allocator = self.send_arena.allocator();
const framed = try allocator.alloc(u8, header.len + data.len);
@memcpy(framed[0..header.len], header);
@memcpy(framed[header.len..], data);
return self.send(framed);
}
// called by CDP
// Websocket frames have a variable length header. For server-client,
// it could be anywhere from 2 to 10 bytes. Our IO.Loop doesn't have
// writev, so we need to get creative. We'll JSON serialize to a
// buffer, where the first 10 bytes are reserved. We can then backfill
// the header and send the slice.
pub fn sendJSON(self: *WsConnection, message: anytype, opts: std.json.Stringify.Options) !void {
const allocator = self.send_arena.allocator();
var aw = try std.Io.Writer.Allocating.initCapacity(allocator, 512);
// reserve space for the maximum possible header
try aw.writer.writeAll(&[_]u8{0} ** 10);
try std.json.Stringify.value(message, opts, &aw.writer);
const framed = fillWebsocketHeader(aw.toArrayList());
return self.send(framed);
}
pub fn sendJSONRaw(
self: *WsConnection,
buf: std.ArrayList(u8),
) !void {
// Dangerous API!. We assume the caller has reserved the first 10
// bytes in `buf`.
const framed = fillWebsocketHeader(buf);
return self.send(framed);
}
pub const HttpResult = enum { more, upgraded, close };
pub fn handshake(self: *WsConnection) !bool {
// Liveness is enforced by TCP keepalive configured in
// Server.setTcpKeepalive; a dead peer surfaces as a poll error or
// EOF from read(). The poll blocks for ~24 days rather than tracking
// an app-level timeout. Capped at i32-max because posix.poll narrows
// to c_int.
const wait_ms: i32 = std.math.maxInt(i32);
while (true) {
var pfds = [_]posix.pollfd{.{
.fd = self.socket,
.events = posix.POLL.IN,
.revents = 0,
}};
const n = try posix.poll(&pfds, wait_ms);
if (n == 0) {
log.info(.app, "CDP timeout", .{});
return false;
}
var header_buf: [10]u8 = undefined;
const header = websocketHeader(&header_buf, .pong, data.len);
const allocator = self.send_arena.allocator();
const framed = try allocator.alloc(u8, header.len + data.len);
@memcpy(framed[0..header.len], header);
@memcpy(framed[header.len..], data);
return self.send(framed);
}
// called by CDP
// Websocket frames have a variable length header. For server-client,
// it could be anywhere from 2 to 10 bytes. Our IO.Loop doesn't have
// writev, so we need to get creative. We'll JSON serialize to a
// buffer, where the first 10 bytes are reserved. We can then backfill
// the header and send the slice.
pub fn sendJSON(self: *WsConnection, message: anytype, opts: std.json.Stringify.Options) !void {
const allocator = self.send_arena.allocator();
var aw = try std.Io.Writer.Allocating.initCapacity(allocator, 512);
// reserve space for the maximum possible header
try aw.writer.writeAll(&.{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 });
try std.json.Stringify.value(message, opts, &aw.writer);
const framed = fillWebsocketHeader(aw.toArrayList());
return self.send(framed);
}
pub fn sendJSONRaw(
self: *WsConnection,
buf: std.ArrayList(u8),
) !void {
// Dangerous API!. We assume the caller has reserved the first 10
// bytes in `buf`.
const framed = fillWebsocketHeader(buf);
return self.send(framed);
}
pub fn read(self: *WsConnection) !usize {
const n = try posix.read(self.socket, self.reader.readBuf());
self.reader.len += n;
return n;
}
pub fn processMessages(self: *WsConnection, handler: anytype) !bool {
var reader = &self.reader;
while (true) {
const msg = reader.next() catch |err| {
switch (err) {
error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {},
error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.NestedFragmentation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.OutOfMemory => {}, // don't borther trying to send an error in this case
}
return err;
} orelse break;
switch (msg.type) {
.pong => {},
.ping => try self.sendPong(msg.data),
.close => {
self.send(&CLOSE_NORMAL) catch {};
return false;
},
.text, .binary => if (handler.handleMessage(msg.data) == false) {
return false;
},
}
if (msg.cleanup_fragment) {
reader.cleanup();
}
}
// We might have read part of the next message. Our reader potentially
// has to move data around in its buffer to make space.
reader.compact();
return true;
}
pub fn upgrade(self: *WsConnection, request: []u8) !void {
// our caller already confirmed that we have a trailing \r\n\r\n
const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable;
const request_line = request[0..request_line_end];
if (!std.ascii.endsWithIgnoreCase(request_line, "http/1.1")) {
return error.InvalidProtocol;
}
// we need to extract the sec-websocket-key value
var key: []const u8 = "";
// we need to make sure that we got all the necessary headers + values
var required_headers: u8 = 0;
// can't std.mem.split because it forces the iterated value to be const
// (we could @constCast...)
var buf = request[request_line_end + 2 ..];
while (buf.len > 4) {
const index = std.mem.indexOfScalar(u8, buf, '\r') orelse unreachable;
const separator = std.mem.indexOfScalar(u8, buf[0..index], ':') orelse return error.InvalidRequest;
const name = std.mem.trim(u8, toLower(buf[0..separator]), &std.ascii.whitespace);
const value = std.mem.trim(u8, buf[(separator + 1)..index], &std.ascii.whitespace);
if (std.mem.eql(u8, name, "upgrade")) {
if (!std.ascii.eqlIgnoreCase("websocket", value)) {
return error.InvalidUpgradeHeader;
}
required_headers |= 1;
} else if (std.mem.eql(u8, name, "sec-websocket-version")) {
if (value.len != 2 or value[0] != '1' or value[1] != '3') {
return error.InvalidVersionHeader;
}
required_headers |= 2;
} else if (std.mem.eql(u8, name, "connection")) {
// find if connection header has upgrade in it, example header:
// Connection: keep-alive, Upgrade
if (std.ascii.indexOfIgnoreCase(value, "upgrade") == null) {
return error.InvalidConnectionHeader;
}
required_headers |= 4;
} else if (std.mem.eql(u8, name, "sec-websocket-key")) {
key = value;
required_headers |= 8;
}
const next = index + 2;
buf = buf[next..];
}
if (required_headers != 15) {
return error.MissingHeaders;
}
// our caller has already made sure this request ended in \r\n\r\n
// so it isn't something we need to check again
const alloc = self.send_arena.allocator();
const response = blk: {
// Response to an upgrade request is always this, with
// the Sec-Websocket-Accept value a spacial sha1 hash of the
// request "sec-websocket-version" and a magic value.
const template =
"HTTP/1.1 101 Switching Protocols\r\n" ++
"Upgrade: websocket\r\n" ++
"Connection: upgrade\r\n" ++
"Sec-Websocket-Accept: 0000000000000000000000000000\r\n\r\n";
// The response will be sent via the IO Loop and thus has to have its
// own lifetime.
const res = try alloc.dupe(u8, template);
// magic response
const key_pos = res.len - 32;
var h: [20]u8 = undefined;
var hasher = std.crypto.hash.Sha1.init(.{});
hasher.update(key);
// websocket spec always used this value
hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
hasher.final(&h);
_ = std.base64.standard.Encoder.encode(res[key_pos .. key_pos + 28], h[0..]);
break :blk res;
const read_bytes = self.read() catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
return self.send(response);
}
pub fn sendHttpError(self: *WsConnection, comptime status: u16, comptime body: []const u8) void {
const response = std.fmt.comptimePrint(
"HTTP/1.1 {d} \r\nConnection: Close\r\nContent-Length: {d}\r\n\r\n{s}",
.{ status, body.len, body },
);
// we're going to close this connection anyways, swallowing any
// error seems safe
self.send(response) catch {};
}
pub fn getAddress(self: *WsConnection) !std.net.Address {
var address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(std.net.Address);
try posix.getpeername(self.socket, &address.any, &socklen);
return address;
}
pub fn sendClose(self: *WsConnection) void {
self.send(&CLOSE_GOING_AWAY) catch {};
}
pub fn shutdown(self: *WsConnection) void {
posix.shutdown(self.socket, .recv) catch {};
}
pub fn setBlocking(self: *WsConnection, blocking: bool) !void {
if (blocking) {
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
} else {
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags);
if (read_bytes == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
}
const result = self.processHttpRequest() catch return false;
switch (result) {
.more => continue,
.upgraded => return true,
.close => return false,
}
}
};
}
pub fn read(self: *WsConnection) !usize {
const n = try posix.read(self.socket, self.reader.readBuf());
self.reader.len += n;
return n;
}
fn processHttpRequest(self: *WsConnection) !HttpResult {
assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos });
const request = self.reader.buf[0..self.reader.len];
if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) {
self.sendHttpError(413, "Request too large");
return error.RequestTooLarge;
}
// we're only expecting [body-less] GET requests.
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
// we need more data, put any more data here
return .more;
}
// the next incoming data can go to the front of our buffer
defer self.reader.len = 0;
return self.handleHttpRequest(request) catch |err| {
switch (err) {
error.NotFound => self.sendHttpError(404, "Not found"),
error.InvalidRequest => self.sendHttpError(400, "Invalid request"),
error.InvalidProtocol => self.sendHttpError(400, "Invalid HTTP protocol"),
error.MissingHeaders => self.sendHttpError(400, "Missing required header"),
error.InvalidUpgradeHeader => self.sendHttpError(400, "Unsupported upgrade type"),
error.InvalidVersionHeader => self.sendHttpError(400, "Invalid websocket version"),
error.InvalidConnectionHeader => self.sendHttpError(400, "Invalid connection header"),
else => {
log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] });
self.sendHttpError(500, "Internal Server Error");
},
}
return err;
};
}
fn handleHttpRequest(self: *WsConnection, request: []u8) !HttpResult {
if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest;
}
if (std.mem.eql(u8, request[0..4], "GET ") == false) {
return error.NotFound;
}
const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse {
return error.InvalidRequest;
};
const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) {
try self.upgrade(request);
return .upgraded;
}
if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) {
try self.send(self.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the
// websocket upgrade) blocks until the first one times out.
// We can avoid that by closing the connection. json_version_response
// has a Connection: Close header too.
self.shutdown();
return .close;
}
if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or
std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/"))
{
try self.send(empty_json_list_response);
self.shutdown();
return .close;
}
return error.NotFound;
}
const empty_json_list_response =
"HTTP/1.1 200 OK\r\n" ++
"Content-Length: 2\r\n" ++
"Connection: Close\r\n" ++
"Content-Type: application/json; charset=UTF-8\r\n\r\n" ++
"[]";
pub fn processMessages(self: *WsConnection, handler: anytype) !bool {
var reader = &self.reader;
while (true) {
const msg = reader.next() catch |err| {
switch (err) {
error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {},
error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.NestedFragmentation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.OutOfMemory => {}, // don't borther trying to send an error in this case
}
return err;
} orelse break;
switch (msg.type) {
.pong => {},
.ping => try self.sendPong(msg.data),
.close => {
self.send(&CLOSE_NORMAL) catch {};
return false;
},
.text, .binary => if (handler.handleMessage(msg.data) == false) {
return false;
},
}
if (msg.cleanup_fragment) {
reader.cleanup();
}
}
// We might have read part of the next message. Our reader potentially
// has to move data around in its buffer to make space.
reader.compact();
return true;
}
pub fn upgrade(self: *WsConnection, request: []u8) !void {
// our caller already confirmed that we have a trailing \r\n\r\n
const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable;
const request_line = request[0..request_line_end];
if (!std.ascii.endsWithIgnoreCase(request_line, "http/1.1")) {
return error.InvalidProtocol;
}
// we need to extract the sec-websocket-key value
var key: []const u8 = "";
// we need to make sure that we got all the necessary headers + values
var required_headers: u8 = 0;
// can't std.mem.split because it forces the iterated value to be const
// (we could @constCast...)
var buf = request[request_line_end + 2 ..];
while (buf.len > 4) {
const index = std.mem.indexOfScalar(u8, buf, '\r') orelse unreachable;
const separator = std.mem.indexOfScalar(u8, buf[0..index], ':') orelse return error.InvalidRequest;
const name = std.mem.trim(u8, toLower(buf[0..separator]), &std.ascii.whitespace);
const value = std.mem.trim(u8, buf[(separator + 1)..index], &std.ascii.whitespace);
if (std.mem.eql(u8, name, "upgrade")) {
if (!std.ascii.eqlIgnoreCase("websocket", value)) {
return error.InvalidUpgradeHeader;
}
required_headers |= 1;
} else if (std.mem.eql(u8, name, "sec-websocket-version")) {
if (value.len != 2 or value[0] != '1' or value[1] != '3') {
return error.InvalidVersionHeader;
}
required_headers |= 2;
} else if (std.mem.eql(u8, name, "connection")) {
// find if connection header has upgrade in it, example header:
// Connection: keep-alive, Upgrade
if (std.ascii.indexOfIgnoreCase(value, "upgrade") == null) {
return error.InvalidConnectionHeader;
}
required_headers |= 4;
} else if (std.mem.eql(u8, name, "sec-websocket-key")) {
key = value;
required_headers |= 8;
}
const next = index + 2;
buf = buf[next..];
}
if (required_headers != 15) {
return error.MissingHeaders;
}
// our caller has already made sure this request ended in \r\n\r\n
// so it isn't something we need to check again
const alloc = self.send_arena.allocator();
const response = blk: {
// Response to an upgrade request is always this, with
// the Sec-Websocket-Accept value a spacial sha1 hash of the
// request "sec-websocket-version" and a magic value.
const template =
"HTTP/1.1 101 Switching Protocols\r\n" ++
"Upgrade: websocket\r\n" ++
"Connection: upgrade\r\n" ++
"Sec-Websocket-Accept: 0000000000000000000000000000\r\n\r\n";
// The response will be sent via the IO Loop and thus has to have its
// own lifetime.
const res = try alloc.dupe(u8, template);
// magic response
const key_pos = res.len - 32;
var h: [20]u8 = undefined;
var hasher = std.crypto.hash.Sha1.init(.{});
hasher.update(key);
// websocket spec always used this value
hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
hasher.final(&h);
_ = std.base64.standard.Encoder.encode(res[key_pos .. key_pos + 28], h[0..]);
break :blk res;
};
return self.send(response);
}
pub fn sendHttpError(self: *WsConnection, comptime status: u16, comptime body: []const u8) void {
const response = std.fmt.comptimePrint(
"HTTP/1.1 {d} \r\nConnection: Close\r\nContent-Length: {d}\r\n\r\n{s}",
.{ status, body.len, body },
);
// we're going to close this connection anyways, swallowing any
// error seems safe
self.send(response) catch {};
}
pub fn getAddress(self: *WsConnection) !std.net.Address {
var address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(std.net.Address);
try posix.getpeername(self.socket, &address.any, &socklen);
return address;
}
pub fn sendClose(self: *WsConnection) void {
self.send(&CLOSE_GOING_AWAY) catch {};
}
pub fn shutdown(self: *WsConnection) void {
posix.shutdown(self.socket, .recv) catch {};
}
pub fn setBlocking(self: *WsConnection, blocking: bool) !void {
if (blocking) {
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
} else {
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags);
}
}
fn fillWebsocketHeader(buf: std.ArrayList(u8)) []const u8 {
// can't use buf[0..10] here, because the header length

View File

@@ -333,7 +333,6 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool {
}
pub var test_app: *App = undefined;
pub var test_http: *HttpClient = undefined;
pub var test_browser: Browser = undefined;
pub var test_notification: *Notification = undefined;
pub var test_session: *Session = undefined;
@@ -499,10 +498,7 @@ test "tests:beforeAll" {
test_app = try App.init(test_allocator, &test_config);
errdefer test_app.deinit();
test_http = try HttpClient.init(test_allocator, &test_app.network);
errdefer test_http.deinit();
test_browser = try Browser.init(test_app, .{ .http_client = test_http });
try test_browser.init(test_app, .{}, null);
errdefer test_browser.deinit();
// Create notification for testing
@@ -557,7 +553,6 @@ test "tests:afterAll" {
test_notification.deinit();
test_browser.deinit();
test_http.deinit();
test_app.deinit();
test_config.deinit(@import("root").tracking_allocator);
}