add optional Abort handler to runNextTick

This commit is contained in:
Muki Kiboigo
2026-05-25 10:51:20 -07:00
parent aa90575f82
commit a4c535370b
2 changed files with 47 additions and 13 deletions

View File

@@ -186,10 +186,12 @@ fn layerWith(self: anytype, next: Layer) Layer {
pub const NextTickNode = struct {
pub const Run =
*const fn (*Transfer, *anyopaque) void;
pub const Abort = *const fn (*anyopaque) void;
node: std.DoublyLinkedList.Node = .{},
ctx: *anyopaque,
run: Run,
abort: ?Abort = null,
};
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP) !void {
@@ -243,6 +245,15 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP)
pub fn deinit(self: *Client) void {
self.abort();
// Drain Next Tick Queue
while (self.next_tick_queue.popFirst()) |node| {
const n: *NextTickNode = @fieldParentPtr("node", node);
if (n.abort) |abort_cb| {
abort_cb(n.ctx);
}
}
self.next_tick_count = 0;
self.handles.deinit();
self.clearUserAgentOverride();
@@ -431,8 +442,13 @@ pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void {
try self.drainInbox(mode);
}
pub fn runNextTick(self: *Client, transfer: *Transfer, ctx: *anyopaque, run: NextTickNode.Run) !void {
transfer._next_tick_node = .{ .ctx = ctx, .run = run };
pub fn runNextTick(
self: *Client,
transfer: *Transfer,
ctx: *anyopaque,
params: struct { run: NextTickNode.Run, abort: ?NextTickNode.Abort = null },
) !void {
transfer._next_tick_node = .{ .ctx = ctx, .run = params.run, .abort = params.abort };
self.next_tick_count += 1;
self.next_tick_queue.append(&transfer._next_tick_node.?.node);
@@ -441,6 +457,9 @@ pub fn runNextTick(self: *Client, transfer: *Transfer, ctx: *anyopaque, run: Nex
fn cancelNextTick(self: *Client, transfer: *Transfer) void {
if (transfer._next_tick_node) |*ntn| {
self.next_tick_queue.remove(&ntn.node);
if (ntn.abort) |abort_cb| {
abort_cb(ntn.ctx);
}
}
}
@@ -449,15 +468,19 @@ fn drainNextTickQueue(self: *Client) !void {
self.next_tick_queue = .{};
while (current) |node| {
defer current = node.next;
defer self.next_tick_count -= 1;
const n: *NextTickNode = @fieldParentPtr("node", node);
current = node.next;
const transfer: *Transfer = @fieldParentPtr(
"_next_tick_node",
@as(*?NextTickNode, @ptrCast(n)),
);
n.run(transfer, n.ctx);
const ntn = n.*;
transfer._next_tick_node = null;
ntn.run(transfer, ntn.ctx);
}
}

View File

@@ -76,16 +76,27 @@ fn request(ptr: *anyopaque, transfer: *Transfer) anyerror!void {
const ctx = try arena.create(CachedResponse);
ctx.* = cached;
try transfer.client.runNextTick(transfer, ctx, struct {
fn run(t: *Transfer, ctx_ptr: *anyopaque) void {
defer t.deinit();
try transfer.client.runNextTick(transfer, ctx, .{
.run = struct {
fn run(t: *Transfer, ctx_ptr: *anyopaque) void {
defer t.deinit();
const c: *CachedResponse = @ptrCast(@alignCast(ctx_ptr));
serveFromCache(&t.req, c) catch |err| {
t.req.error_callback(t.req.ctx, err);
};
}
}.run);
const c: *CachedResponse = @ptrCast(@alignCast(ctx_ptr));
serveFromCache(&t.req, c) catch |err| {
t.req.error_callback(t.req.ctx, err);
};
}
}.run,
.abort = struct {
fn abort(ctx_ptr: *anyopaque) void {
const c: *CachedResponse = @ptrCast(@alignCast(ctx_ptr));
switch (c.data) {
.buffer => |_| {},
.file => |f| f.file.close(),
}
}
}.abort,
});
return;
}