Move ws framing to WsConnection

This commit is contained in:
Nikolay Govorov
2026-04-14 07:10:09 +01:00
parent ae8013f967
commit f1be6bf428
3 changed files with 199 additions and 174 deletions

View File

@@ -39,6 +39,7 @@ json_version_response: []const u8,
// Thread management
active_threads: std.atomic.Value(u32) = .init(0),
clients: std.ArrayList(*Client) = .{},
pending_handshakes: std.ArrayList(*Net.WsConnection) = .{},
client_mutex: std.Thread.Mutex = .{},
clients_pool: std.heap.MemoryPool(Client),
@@ -70,12 +71,16 @@ pub fn shutdown(self: *Server) void {
for (self.clients.items) |client| {
client.stop();
}
for (self.pending_handshakes.items) |ws| {
ws.shutdown();
}
}
pub fn deinit(self: *Server) void {
self.shutdown();
self.joinThreads();
self.clients.deinit(self.allocator);
self.pending_handshakes.deinit(self.allocator);
self.clients_pool.deinit();
self.allocator.free(self.json_version_response);
self.allocator.destroy(self);
@@ -124,6 +129,35 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void {
setTcpKeepalive(socket);
var ws = Net.WsConnection.init(socket, self.allocator, self.json_version_response) catch |err| {
log.err(.app, "CDP ws init", .{ .err = err });
return;
};
var ws_owned = true;
defer if (ws_owned) ws.deinit();
if (log.enabled(.app, .info)) {
const client_address = ws.getAddress() catch null;
log.info(.app, "client connected", .{ .ip = client_address });
}
const http = HttpClient.init(self.allocator, &self.app.network) catch |err| {
log.err(.app, "CDP http init", .{ .err = err });
return;
};
var http_owned = true;
defer if (http_owned) http.deinit();
self.registerHandshake(&ws);
const handshake_result = ws.handshake();
self.unregisterHandshake(&ws);
const upgraded = handshake_result catch |err| {
log.err(.app, "CDP handshake", .{ .err = err });
return;
};
if (!upgraded) return;
// Client is HUGE (> 512KB) because it has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const client = self.getClient() catch |err| {
@@ -132,16 +166,15 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void {
};
defer self.releaseClient(client);
client.* = Client.init(
socket,
self.allocator,
self.app,
self.json_version_response,
) catch |err| {
log.err(.app, "CDP client init", .{ .err = err });
client.* = Client.init(self.allocator, self.app, ws, http);
ws_owned = false;
http_owned = false;
defer client.deinit();
client.cdp = CDP.init(client) catch |err| {
log.err(.app, "CDP init", .{ .err = err });
return;
};
defer client.deinit();
self.registerClient(client);
defer self.unregisterClient(client);
@@ -185,6 +218,23 @@ fn unregisterClient(self: *Server, client: *Client) void {
}
}
fn registerHandshake(self: *Server, ws: *Net.WsConnection) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
self.pending_handshakes.append(self.allocator, ws) catch {};
}
fn unregisterHandshake(self: *Server, ws: *Net.WsConnection) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
for (self.pending_handshakes.items, 0..) |w, i| {
if (w == ws) {
_ = self.pending_handshakes.swapRemove(i);
break;
}
}
}
fn spawnWorker(self: *Server, socket: posix.socket_t) !void {
if (self.app.shutdown()) {
return error.ShuttingDown;
@@ -227,62 +277,33 @@ fn joinThreads(self: *Server) void {
}
}
// Handle exactly one TCP connection.
// Handle exactly one upgraded CDP websocket connection.
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
mode: union(enum) {
http: void,
cdp: CDP,
},
allocator: Allocator,
app: *App,
http: *HttpClient,
ws: Net.WsConnection,
cdp: ?CDP = null,
pub fn init(
socket: posix.socket_t,
allocator: Allocator,
app: *App,
json_version_response: []const u8,
) !Client {
var ws = try Net.WsConnection.init(socket, allocator, json_version_response);
errdefer ws.deinit();
if (log.enabled(.app, .info)) {
const client_address = ws.getAddress() catch null;
log.info(.app, "client connected", .{ .ip = client_address });
}
const http = try HttpClient.init(allocator, &app.network);
errdefer http.deinit();
pub fn init(allocator: Allocator, app: *App, ws: Net.WsConnection, http: *HttpClient) Client {
return .{
.allocator = allocator,
.app = app,
.http = http,
.ws = ws,
.mode = .{ .http = {} },
};
}
fn stop(self: *Client) void {
switch (self.mode) {
.http => {},
.cdp => |*cdp| {
cdp.browser.env.terminate();
self.ws.sendClose();
},
if (self.cdp) |*cdp| {
cdp.browser.env.terminate();
self.ws.sendClose();
}
self.ws.shutdown();
}
pub fn deinit(self: *Client) void {
switch (self.mode) {
.cdp => |*cdp| cdp.deinit(),
.http => {},
}
if (self.cdp) |*cdp| cdp.deinit();
self.ws.deinit();
self.http.deinit();
}
@@ -298,13 +319,13 @@ pub const Client = struct {
};
defer http.cdp_client = null;
self.httpLoop(http) catch |err| {
self.wsLoop(http) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
fn httpLoop(self: *Client, http: *HttpClient) !void {
lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{});
fn wsLoop(self: *Client, http: *HttpClient) !void {
var cdp = &self.cdp.?;
// Liveness is enforced by TCP keepalive configured in
// Server.setTcpKeepalive; the kernel closes dead sockets, which
@@ -313,24 +334,6 @@ pub const Client = struct {
// i32-max because HttpClient.tick narrows to c_int.
const wait_ms: u32 = std.math.maxInt(i32);
while (true) {
const status = http.tick(wait_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) continue;
if (self.readSocket() == false) {
return;
}
if (self.mode == .cdp) {
break;
}
}
var cdp = &self.mode.cdp;
while (true) {
const result = cdp.pageWait(wait_ms) catch |wait_err| switch (wait_err) {
error.NoPage => {
@@ -392,106 +395,7 @@ pub const Client = struct {
return false;
}
return self.processData() catch false;
}
fn processData(self: *Client) !bool {
switch (self.mode) {
.cdp => |*cdp| return self.processWebsocketMessage(cdp),
.http => return self.processHTTPRequest(),
}
}
fn processHTTPRequest(self: *Client) !bool {
lp.assert(self.ws.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.ws.reader.pos });
const request = self.ws.reader.buf[0..self.ws.reader.len];
if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) {
self.writeHTTPErrorResponse(413, "Request too large");
return error.RequestTooLarge;
}
// we're only expecting [body-less] GET requests.
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
// we need more data, put any more data here
return true;
}
// the next incoming data can go to the front of our buffer
defer self.ws.reader.len = 0;
return self.handleHTTPRequest(request) catch |err| {
switch (err) {
error.NotFound => self.writeHTTPErrorResponse(404, "Not found"),
error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"),
error.InvalidProtocol => self.writeHTTPErrorResponse(400, "Invalid HTTP protocol"),
error.MissingHeaders => self.writeHTTPErrorResponse(400, "Missing required header"),
error.InvalidUpgradeHeader => self.writeHTTPErrorResponse(400, "Unsupported upgrade type"),
error.InvalidVersionHeader => self.writeHTTPErrorResponse(400, "Invalid websocket version"),
error.InvalidConnectionHeader => self.writeHTTPErrorResponse(400, "Invalid connection header"),
else => {
log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] });
self.writeHTTPErrorResponse(500, "Internal Server Error");
},
}
return err;
};
}
fn handleHTTPRequest(self: *Client, request: []u8) !bool {
if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest;
}
if (std.mem.eql(u8, request[0..4], "GET ") == false) {
return error.NotFound;
}
const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse {
return error.InvalidRequest;
};
const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) {
try self.upgradeConnection(request);
return true;
}
if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) {
try self.ws.send(self.ws.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the
// websocket upgrade) blocks until the first one times out.
// We can avoid that by closing the connection. json_version_response
// has a Connection: Close header too.
self.ws.shutdown();
return false;
}
if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or
std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/"))
{
try self.ws.send(empty_json_list_response);
self.ws.shutdown();
return false;
}
return error.NotFound;
}
fn upgradeConnection(self: *Client, request: []u8) !void {
try self.ws.upgrade(request);
self.mode = .{ .cdp = try CDP.init(self) };
}
fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void {
self.ws.sendHttpError(status, body);
}
fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool {
return self.ws.processMessages(cdp);
return self.ws.processMessages(&self.cdp.?) catch false;
}
pub fn sendAllocator(self: *Client) Allocator {
@@ -544,13 +448,6 @@ fn buildJSONVersionResponse(
return try std.fmt.allocPrint(app.allocator, response_format, .{ body_len, host, port });
}
const empty_json_list_response =
"HTTP/1.1 200 OK\r\n" ++
"Content-Length: 2\r\n" ++
"Connection: Close\r\n" ++
"Content-Type: application/json; charset=UTF-8\r\n\r\n" ++
"[]";
pub const timestamp = @import("datetime.zig").timestamp;
pub const milliTimestamp = @import("datetime.zig").milliTimestamp;

View File

@@ -22,6 +22,8 @@ const posix = std.posix;
const CDP = @import("CDP.zig");
const Server = @import("../Server.zig");
const Net = @import("../network/websocket.zig");
const HttpClient = @import("../browser/HttpClient.zig");
const base = @import("../testing.zig");
pub const allocator = base.allocator;
@@ -315,7 +317,9 @@ pub fn context() !TestContext {
try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.RCVBUF, &std.mem.toBytes(@as(c_int, 32_768)));
try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.SNDBUF, &std.mem.toBytes(@as(c_int, 32_768)));
const client = try Server.Client.init(pair[1], base.arena_allocator, base.test_app, "json-version");
const ws = try Net.WsConnection.init(pair[1], base.arena_allocator, "json-version");
const http = try HttpClient.init(base.arena_allocator, &base.test_app.network);
const client = Server.Client.init(base.arena_allocator, base.test_app, ws, http);
return .{
.client = client,

View File

@@ -24,7 +24,8 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("lightpanda").log;
const assert = @import("lightpanda").assert;
const CDP_MAX_MESSAGE_SIZE = @import("../Config.zig").CDP_MAX_MESSAGE_SIZE;
const Config = @import("../Config.zig");
const CDP_MAX_MESSAGE_SIZE = Config.CDP_MAX_MESSAGE_SIZE;
const Fragments = struct {
type: Message.Type,
@@ -427,12 +428,135 @@ pub const WsConnection = struct {
return self.send(framed);
}
pub const HttpResult = enum { more, upgraded, close };
pub fn handshake(self: *WsConnection) !bool {
// Liveness is enforced by TCP keepalive configured in
// Server.setTcpKeepalive; a dead peer surfaces as a poll error or
// EOF from read(). The poll blocks for ~24 days rather than tracking
// an app-level timeout. Capped at i32-max because posix.poll narrows
// to c_int.
const wait_ms: i32 = std.math.maxInt(i32);
while (true) {
var pfds = [_]posix.pollfd{.{
.fd = self.socket,
.events = posix.POLL.IN,
.revents = 0,
}};
const n = try posix.poll(&pfds, wait_ms);
if (n == 0) {
log.info(.app, "CDP timeout", .{});
return false;
}
const read_bytes = self.read() catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (read_bytes == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
}
const result = self.processHttpRequest() catch return false;
switch (result) {
.more => continue,
.upgraded => return true,
.close => return false,
}
}
}
pub fn read(self: *WsConnection) !usize {
const n = try posix.read(self.socket, self.reader.readBuf());
self.reader.len += n;
return n;
}
fn processHttpRequest(self: *WsConnection) !HttpResult {
assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos });
const request = self.reader.buf[0..self.reader.len];
if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) {
self.sendHttpError(413, "Request too large");
return error.RequestTooLarge;
}
// we're only expecting [body-less] GET requests.
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
// we need more data, put any more data here
return .more;
}
// the next incoming data can go to the front of our buffer
defer self.reader.len = 0;
return self.handleHttpRequest(request) catch |err| {
switch (err) {
error.NotFound => self.sendHttpError(404, "Not found"),
error.InvalidRequest => self.sendHttpError(400, "Invalid request"),
error.InvalidProtocol => self.sendHttpError(400, "Invalid HTTP protocol"),
error.MissingHeaders => self.sendHttpError(400, "Missing required header"),
error.InvalidUpgradeHeader => self.sendHttpError(400, "Unsupported upgrade type"),
error.InvalidVersionHeader => self.sendHttpError(400, "Invalid websocket version"),
error.InvalidConnectionHeader => self.sendHttpError(400, "Invalid connection header"),
else => {
log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] });
self.sendHttpError(500, "Internal Server Error");
},
}
return err;
};
}
fn handleHttpRequest(self: *WsConnection, request: []u8) !HttpResult {
if (request.len < 18) {
// 18 is [generously] the smallest acceptable HTTP request
return error.InvalidRequest;
}
if (std.mem.eql(u8, request[0..4], "GET ") == false) {
return error.NotFound;
}
const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse {
return error.InvalidRequest;
};
const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) {
try self.upgrade(request);
return .upgraded;
}
if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) {
try self.send(self.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the
// websocket upgrade) blocks until the first one times out.
// We can avoid that by closing the connection. json_version_response
// has a Connection: Close header too.
self.shutdown();
return .close;
}
if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or
std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/"))
{
try self.send(empty_json_list_response);
self.shutdown();
return .close;
}
return error.NotFound;
}
const empty_json_list_response =
"HTTP/1.1 200 OK\r\n" ++
"Content-Length: 2\r\n" ++
"Connection: Close\r\n" ++
"Content-Type: application/json; charset=UTF-8\r\n\r\n" ++
"[]";
pub fn processMessages(self: *WsConnection, handler: anytype) !bool {
var reader = &self.reader;
while (true) {