From 3b76b8eaedcefcbd9c8bffcb85745029d19e8c74 Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Thu, 4 Jun 2026 09:32:28 +0200 Subject: [PATCH] fix(pnpr): access-gate install-accelerator files and remove unauthenticated /v1/files (#12181) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(pnpr): authorize served packages against pnpr's policy in /v1/install A content-addressed digest in the install-accelerator store is shared across packages and says nothing about access, so the store's possession of a package's bytes is not a capability to receive them. `/v1/install` served files for any package found in the store, including ones reached only on the cache-hit / frozen-lockfile path where no access check happened — letting a caller who knows a private package's digest pull bytes the registry routes would 401 on. Check every served package against pnpr's own `packages:` policy before serving — the same decision `serve_packument` / `serve_tarball` make, in process, with no network round trip (so a warm shared server keeps its resolution advantage). `serve_install` resolves the caller's identity from `Authorization`; `deny_unauthorized_packages` denies the install (401 anonymous / 403 authenticated-but-outside-the-allowed-set) when any served package is not readable by the caller. This authorizes against pnpr's own surface, the authority for everything the store can hold today (pnpr fetches anonymously, so cached content is pnpr-hosted or publicly fetchable). When credential forwarding lands, packages the client resolved from external registries under its own token carry no pnpr policy and will need per-caller re-verification against the owning registry (TTL-cached) — noted at the check and tracked in #12184. The raw `/v1/files` endpoint is still unauthenticated; removing it (it is superseded by the inline single-response path) is a follow-up (#12184) that also ports the TS `@pnpm/pnpr.client` + worker off the two-trip path. --- Written by an agent (Claude Code, claude-opus-4-8). * fix(pnpr): remove the unauthenticated /v1/files endpoint `POST /v1/files` served any CAFS file by digest with no authentication and no package identity, so the access gate on `/v1/install` (which is per package) couldn't cover it — it had to be removed, not gated. It was already superseded by the single-response inline path (#12178). * Server: `/v1/install` always answers with the inline gzipped body (lockfile + stats + store-index entries + the missing files' contents); the NDJSON two-trip path, the `/v1/files` route, `handle_files`, and the `FilesRequest`/`is_valid_sha512_hex` helpers are gone. * TS client + worker: `@pnpm/pnpr.client` now does the one inline request and hands the file frames to `@pnpm/worker`'s `writeCafsFiles`, which writes them to the CAFS; the `fetchAndWriteCafsFiles` /v1/files fetcher is replaced. Error bodies are decompressed before being surfaced, since the server also gzips its JSON error responses (e.g. an access denial). Verified end to end by `pnpm/test/install/pnpmRegistry.ts` (11 tests: install / add / remove / workspace through a real pnpr server). Closes the second half of the install-accelerator access work (#12184); file-bearing responses are now both inline-only and access-gated. --- .changeset/pnpr-inline-only-access.md | 7 + pnpm/test/install/pnpmRegistry.ts | 4 +- pnpr/client/src/fetchFromPnpmRegistry.ts | 229 ++++++---------- pnpr/crates/pnpr/src/install_accelerator.rs | 242 ++++++++--------- .../pnpr/src/install_accelerator/diff.rs | 7 +- .../pnpr/src/install_accelerator/protocol.rs | 32 --- .../pnpr/src/install_accelerator/tests.rs | 73 +++++ pnpr/crates/pnpr/src/server.rs | 48 ++-- pnpr/crates/pnpr/tests/install_accelerator.rs | 117 -------- worker/src/index.ts | 16 +- worker/src/start.ts | 249 +++++------------- worker/src/types.ts | 13 +- 12 files changed, 389 insertions(+), 648 deletions(-) create mode 100644 .changeset/pnpr-inline-only-access.md create mode 100644 pnpr/crates/pnpr/src/install_accelerator/tests.rs delete mode 100644 pnpr/crates/pnpr/tests/install_accelerator.rs diff --git a/.changeset/pnpr-inline-only-access.md b/.changeset/pnpr-inline-only-access.md new file mode 100644 index 0000000000..c1426d0f38 --- /dev/null +++ b/.changeset/pnpr-inline-only-access.md @@ -0,0 +1,7 @@ +--- +"@pnpm/pnpr.client": patch +"@pnpm/worker": patch +"pnpm": patch +--- + +The pnpr install accelerator now serves resolved files only in the single gzipped `POST /v1/install` response and authorizes every package whose bytes it serves against the server's access policy. The separate unauthenticated `POST /v1/files` endpoint has been removed: the client materializes the inlined files straight into its content-addressable store, and a content-addressed digest is no longer a bearer capability for a package the caller cannot read. diff --git a/pnpm/test/install/pnpmRegistry.ts b/pnpm/test/install/pnpmRegistry.ts index c45b86d3d4..0a0393f271 100644 --- a/pnpm/test/install/pnpmRegistry.ts +++ b/pnpm/test/install/pnpmRegistry.ts @@ -11,8 +11,8 @@ import { writeYamlFileSync } from 'write-yaml-file' import { execPnpm } from '../utils/index.js' // The pnpr server started by the test harness (see the with-registry jest -// preset) serves the install-accelerator endpoints (/v1/install, /v1/files) -// on the registry-mock port, so it doubles as the pnpr server under test. +// preset) serves the install-accelerator endpoint (/v1/install) on the +// registry-mock port, so it doubles as the pnpr server under test. const PNPR = `http://localhost:${REGISTRY_MOCK_PORT}` let server: http.Server diff --git a/pnpr/client/src/fetchFromPnpmRegistry.ts b/pnpr/client/src/fetchFromPnpmRegistry.ts index bf8b34bf7a..02723926c1 100644 --- a/pnpr/client/src/fetchFromPnpmRegistry.ts +++ b/pnpr/client/src/fetchFromPnpmRegistry.ts @@ -1,11 +1,12 @@ import http from 'node:http' import https from 'node:https' import { URL } from 'node:url' +import { gunzip } from 'node:zlib' import { convertToLockfileObject } from '@pnpm/lockfile.fs' import type { LockfileFile, LockfileObject } from '@pnpm/lockfile.types' import { StoreIndex } from '@pnpm/store.index' -import { fetchAndWriteCafsFiles } from '@pnpm/worker' +import { writeCafsFiles } from '@pnpm/worker' import type { ResponseMetadata } from './protocol.js' @@ -47,10 +48,9 @@ export interface FetchFromPnpmRegistryOptions { /** * `--lockfile-only`: resolve and return only the lockfile — fetch no * files into the local store. Forwarded to the server (which skips the - * file diff when it understands the flag); the client also ignores any - * `D`/`I` lines so the store stays untouched even against an older - * server. Mirrors pnpm's resolve + write, fetch nothing, link nothing. - * See https://github.com/pnpm/pnpm/issues/12146. + * file diff); the client ignores the (empty) file payload so the store + * stays untouched. Mirrors pnpm's resolve + write, fetch nothing, link + * nothing. See https://github.com/pnpm/pnpm/issues/12146. */ lockfileOnly?: boolean } @@ -64,16 +64,21 @@ export interface FetchFromPnpmRegistryResult { indexEntries: Array<{ key: string, buffer: Uint8Array }> } +interface InstallResponseHeader { + lockfile: LockfileFile + stats: ResponseMetadata['stats'] + indexEntries?: Array<{ key: string, b64: string }> + violations?: Array<{ name: string, version: string, code: string, reason: string }> +} + /** - * Fetch resolved dependencies from a pnpr server. + * Fetch resolved dependencies from a pnpr server in a single round trip. * - * The response is a streaming NDJSON where each line is one message: - * - `D\t{digest}\t{size}\t{executable}\n` — file digest (streamed as packages resolve) - * - `L\t{json}\n` — final lockfile (on-disk format) + stats (after resolution) - * - `I\t{key}\t{base64}\n` — pre-packed msgpack index entry - * - * As digest lines arrive, we batch them and dispatch workers to /v1/files. - * File downloads happen IN PARALLEL with server-side resolution. + * `POST /v1/install` (with `inlineFiles`) answers with one gzipped binary + * body: a length-prefixed JSON header (lockfile, stats, store-index + * entries, or verification violations) followed by the missing files' + * contents as binary frames. We parse the header here and hand the file + * frames to a worker that writes them straight into the CAFS. */ export async function fetchFromPnpmRegistry ( opts: FetchFromPnpmRegistryOptions @@ -100,103 +105,48 @@ export async function fetchFromPnpmRegistry ( lockfile: opts.lockfile, lockfileOnly: opts.lockfileOnly, storeIntegrities, + inlineFiles: true, }) - const indexEntries: Array<{ key: string, buffer: Uint8Array }> = [] - const workerPromises: Array> = [] - let currentBatch: Array<{ digest: string, size: number, executable: boolean }> = [] + const body = await postInstall(opts.registryUrl, requestBody) - const dispatchBatch = () => { - if (currentBatch.length === 0) return - const digests = currentBatch - currentBatch = [] - workerPromises.push(fetchAndWriteCafsFiles({ - registryUrl: opts.registryUrl, - storeDir: opts.storeDir, - digests, - })) + // The combined response is `[u32 header length][header JSON][file frames]`. + if (body.length < 4) { + throw new Error('pnpr server returned a truncated /v1/install response') + } + const headerLength = body.readUInt32BE(0) + const header = JSON.parse(body.subarray(4, 4 + headerLength).toString('utf-8')) as InstallResponseHeader + + if (header.violations != null && header.violations.length > 0) { + const rendered = header.violations + .map((violation) => ` ${violation.name}@${violation.version}: ${violation.reason}`) + .join('\n') + throw new Error(`pnpr server rejected the lockfile under the verification policy:\n${rendered}`) } - // Returns as soon as the lockfile arrives — the stream continues - // in the background, dispatching more file download workers. - // fileDownloads covers ALL workers (past and future). - return new Promise((resolve, reject) => { - let resolved = false - let serverError: Error | undefined - const handleLine = (line: string) => { - if (line.length === 0) return - const tabIdx = line.indexOf('\t') - const type = line.charAt(0) - if (type === 'D') { - // `--lockfile-only` fetches nothing — ignore any file digests an - // older server still streams rather than writing them to the store. - if (opts.lockfileOnly) return - const parts = line.split('\t') - currentBatch.push({ - digest: parts[1], - size: parseInt(parts[2], 10), - executable: parts[3] === '1', - }) - if (currentBatch.length >= FILES_PER_WORKER) { - dispatchBatch() - } - } else if (type === 'L') { - const payload = JSON.parse(line.substring(tabIdx + 1)) as { - lockfile: LockfileFile - stats: ResponseMetadata['stats'] - } - dispatchBatch() - resolved = true - // Resolve immediately — the caller can start headless install - // while the stream continues dispatching remaining D/I lines. - resolve({ - // The server speaks the on-disk lockfile format; convert it to the - // in-memory `LockfileObject` the rest of pnpm consumes. - lockfile: convertToLockfileObject(payload.lockfile), - stats: payload.stats, - fileDownloads: streamComplete.then(() => - Promise.all(workerPromises) - ).then(() => {}), - indexEntries, - }) - } else if (type === 'I') { - // `--lockfile-only` writes no store-index entries — ignore any an - // older server still streams. - if (opts.lockfileOnly) return - // Format: I\t{integrity}\t{pkgId}\t{base64} - // Key is "{integrity}\t{pkgId}" — everything between first and last tab - const rest = line.substring(tabIdx + 1) - const lastTab = rest.lastIndexOf('\t') - const key = rest.substring(0, lastTab) - const buffer = new Uint8Array(Buffer.from(rest.substring(lastTab + 1), 'base64')) - indexEntries.push({ key, buffer }) - } else if (type === 'E') { - // Server emitted a structured error after headers were sent. - // Record it so stream `end` / `catch` can reject with the payload. - let message = 'pnpr server error' - try { - const payload = JSON.parse(line.substring(tabIdx + 1)) as { error?: string } - if (payload?.error) message = payload.error - } catch { - // Fall back to the raw payload if it isn't JSON. - message = line.substring(tabIdx + 1) || message - } - serverError = new Error(message) - } - } + const indexEntries = (header.indexEntries ?? []).map(({ key, b64 }) => ({ + key, + buffer: new Uint8Array(Buffer.from(b64, 'base64')), + })) - const streamComplete = streamNdjsonRequest( - opts.registryUrl, 'v1/install', requestBody, handleLine - ) + // `--lockfile-only` fetches nothing: there are no file frames to write + // (the server sends only the end-of-stream marker), so leave the store + // untouched. + const fileDownloads = opts.lockfileOnly + ? Promise.resolve() + : writeCafsFiles({ + storeDir: opts.storeDir, + payload: body.subarray(4 + headerLength), + }).then(() => {}) - streamComplete.then(() => { - if (serverError) { - reject(serverError) - } else if (!resolved) { - reject(new Error('pnpr server closed the stream without emitting a lockfile')) - } - }, reject) - }) + return { + // The server speaks the on-disk lockfile format; convert it to the + // in-memory `LockfileObject` the rest of pnpm consumes. + lockfile: convertToLockfileObject(header.lockfile), + stats: header.stats, + fileDownloads, + indexEntries, + } } function readStoreIntegrities (storeIndex: StoreIndex): string[] { @@ -220,59 +170,54 @@ function isIntegrityLike (value: string): boolean { value.startsWith('sha1-') } -const FILES_PER_WORKER = 4000 const REQUEST_TIMEOUT = 600_000 // 10 minutes — server-side resolution can be slow on first run /** - * Stream an NDJSON response, calling `onLine` for each complete line as - * it arrives. Chunks are buffered until a newline is seen. + * `POST /v1/install` and return the full response body, decompressed. + * + * `urlPath` resolution normalizes the base to end with "/" so a path + * prefix configured on the pnpr server URL (e.g. https://host/pnpr/) is + * preserved. */ -async function streamNdjsonRequest ( - registryUrl: string, - urlPath: string, - body: string, - onLine: (line: string) => void -): Promise { - // `urlPath` is expected to be relative (e.g. "v1/install"). We normalize - // the base to end with "/" so `new URL(rel, base)` preserves any path - // prefix configured on the pnpr server URL (e.g. https://host/pnpr/). +async function postInstall (registryUrl: string, body: string): Promise { const base = registryUrl.endsWith('/') ? registryUrl : `${registryUrl}/` - const url = new URL(urlPath, base) - const isHttps = url.protocol === 'https:' - const requestFn = isHttps ? https.request : http.request + const url = new URL('v1/install', base) + const requestFn = url.protocol === 'https:' ? https.request : http.request - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { const req = requestFn(url, { method: 'POST', timeout: REQUEST_TIMEOUT, headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body), - 'Accept': 'application/x-ndjson', + 'Accept-Encoding': 'gzip', }, }, (res) => { - if (res.statusCode !== 200) { - const chunks: Buffer[] = [] - res.on('data', (chunk: Buffer) => chunks.push(chunk)) - res.on('end', () => { - reject(new Error(`pnpr server responded with ${res.statusCode}: ${Buffer.concat(chunks).toString('utf-8')}`)) - }) - return - } - - let leftover = '' - res.setEncoding('utf-8') - res.on('data', (chunk: string) => { - const data = leftover + chunk - const lines = data.split('\n') - leftover = lines.pop() ?? '' - for (const line of lines) { - onLine(line) - } - }) + const chunks: Buffer[] = [] + res.on('data', (chunk: Buffer) => chunks.push(chunk)) res.on('end', () => { - if (leftover) onLine(leftover) - resolve() + const raw = Buffer.concat(chunks) + // The server gzips both the install body and its JSON error bodies + // (e.g. a 401/403 access denial), so decompress *before* branching + // on the status code — otherwise an error surfaces as binary + // garbage instead of the server's message. Skip it only when the + // HTTP stack already decompressed (no gzip magic bytes). + const finish = (body: Buffer): void => { + if (res.statusCode !== 200) { + reject(new Error(`pnpr server responded with ${res.statusCode}: ${body.toString('utf-8')}`)) + } else { + resolve(body) + } + } + if (res.headers['content-encoding'] === 'gzip' || (raw[0] === 0x1f && raw[1] === 0x8b)) { + gunzip(raw, (err, decompressed) => { + if (err) reject(err) + else finish(decompressed) + }) + } else { + finish(raw) + } }) res.on('error', reject) }) @@ -301,5 +246,3 @@ export function writeRawIndexEntries ( storeIndex.setRawMany(writes) } } - - diff --git a/pnpr/crates/pnpr/src/install_accelerator.rs b/pnpr/crates/pnpr/src/install_accelerator.rs index d32ef2e4bd..88ddf78a47 100644 --- a/pnpr/crates/pnpr/src/install_accelerator.rs +++ b/pnpr/crates/pnpr/src/install_accelerator.rs @@ -3,23 +3,23 @@ //! alongside pnpr's npm-compatible API. The handshake + endpoints are //! served under one base URL (the `pnprServer`). //! -//! Three routes, built on pacquet's resolver and content-addressable +//! Two routes, built on pacquet's resolver and content-addressable //! store: //! //! * `GET /-/pnpr` — capability handshake; advertises the supported //! protocol versions so a client can negotiate or fail fast. //! * `POST /v1/install` — resolve a project **against the registries //! the client sends** (so the server uses the same source of truth as -//! the client), then return the missing-file digests, pre-packed -//! store-index entries, lockfile, and stats. With `inlineFiles` the -//! client also gets the missing files' *contents* in the same response -//! (a length-prefixed JSON header followed by the `/v1/files` binary -//! frames, gzipped), collapsing the cold path to a single round trip -//! ([pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165)); -//! otherwise the response is the NDJSON stream of `D`/`I`/`L`/`E` -//! lines and the client fetches files separately. -//! * `POST /v1/files` — serve a batch of files by digest as a gzip -//! binary stream the client writes straight into its CAFS. +//! the client), then return, in a single gzipped binary response, the +//! lockfile, stats, pre-packed store-index entries, and the contents of +//! the files the client is missing (a length-prefixed JSON header +//! followed by the binary file frames). One round trip +//! ([pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165)). +//! +//! Files are bound to access: every package whose bytes are served is +//! checked against pnpr's `packages:` policy first +//! ([`deny_unauthorized_packages`]), so a content-addressed digest is +//! never a bearer capability for a package the caller can't read. //! //! The client's `registry`, `namedRegistries`, `overrides`, and the //! verification policy (`minimumReleaseAge`, `trustPolicy`, ...) drive @@ -38,14 +38,20 @@ mod protocol; mod resolve; mod verdict_cache; +#[cfg(test)] +mod tests; + use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, io::Write as _, path::PathBuf, sync::{Arc, Mutex, OnceLock}, }; -use crate::config::Config as RegistryConfig; +use crate::{ + config::Config as RegistryConfig, + policy::{Identity, PackagePolicies}, +}; use axum::{ body::{Body, Bytes}, @@ -64,10 +70,7 @@ use pacquet_resolving_npm_resolver::{InMemoryPackageMetaCache, PackageMetaCache} use pacquet_resolving_resolver_base::ResolutionVerifier; use pacquet_store_dir::{StoreDir, StoreIndex}; -use self::{ - protocol::{FilesRequest, InstallRequest, is_valid_sha512_hex}, - verdict_cache::VerdictCache, -}; +use self::{protocol::InstallRequest, verdict_cache::VerdictCache}; /// Per-server engine backing the pnpr install endpoints: it holds the /// store, cache, and HTTP client used to resolve a client's project and @@ -177,8 +180,16 @@ impl InstallAccelerator { } } -/// Handle `POST /v1/install`. -pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> Response { +/// Handle `POST /v1/install`. `identity` is the resolved caller; the +/// store's possession of a package's bytes is not a capability to read +/// them, so every served package is checked against `policies` — see +/// [`deny_unauthorized_packages`]. +pub(crate) async fn handle_install( + runtime: &InstallAccelerator, + policies: &PackagePolicies, + identity: Identity, + body: Bytes, +) -> Response { let request: InstallRequest = match serde_json::from_slice(&body) { Ok(request) => request, Err(err) => return json_error(StatusCode::BAD_REQUEST, &err.to_string()), @@ -202,7 +213,7 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> { return match failure { VerifyFailure::Internal(response) => response, - VerifyFailure::Violations(violations) => violation_response(&violations, &request), + VerifyFailure::Violations(violations) => violation_response(&violations), }; } @@ -248,38 +259,12 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> } }; + if let Some(denied) = deny_unauthorized_packages(policies, &identity, &result.package_index) { + return denied; + } + let stats_json = stats_json(&result.stats); - - if request.inline_files { - return inline_response(runtime, &lockfile, &stats_json, &result); - } - - let mut ndjson: Vec = Vec::new(); - for file in &result.missing_files { - let _ = - writeln!(ndjson, "D\t{}\t{}\t{}", file.digest, file.size, u8::from(file.executable)); - } - for entry in &result.package_index { - let _ = writeln!( - ndjson, - "I\t{}\t{}\t{}", - entry.integrity, - entry.pkg_id, - BASE64.encode(&entry.raw), - ); - } - - let payload = serde_json::json!({ - "lockfile": serde_json::to_value(&lockfile).unwrap_or(serde_json::Value::Null), - "stats": stats_json, - }); - let _ = writeln!(ndjson, "L\t{}", payload); - - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/x-ndjson") - .body(Body::from(ndjson)) - .expect("static ndjson response is always valid") + inline_response(runtime, &lockfile, &stats_json, &result) } fn stats_json(stats: &diff::Stats) -> serde_json::Value { @@ -294,23 +279,70 @@ fn stats_json(stats: &diff::Stats) -> serde_json::Value { }) } -/// gzip level for the file-bearing responses (`/v1/files` and the -/// `inlineFiles` install body). Level 6 (the gzip default) shrinks the -/// payload ~16% over level 1 — the win that matters once the server is -/// across a latency link, where fewer bytes means fewer TCP slow-start -/// round trips — while level 9 adds under a percent for several times -/// the CPU. +/// Deny the install when the caller may not read a package whose files +/// are about to be served. A content-addressed digest is shared across +/// packages and reveals nothing about access, so possession of a +/// package's bytes in the store is never a capability to receive them: +/// every served package is checked against pnpr's own `packages:` policy +/// — the same decision `serve_packument` / `serve_tarball` make, in +/// process, with no network round trip. Returns the denial response (401 +/// for an anonymous caller who could authenticate, 403 for an +/// authenticated caller outside the allowed set) or `None` when every +/// served package is readable. +/// +/// This authorizes against pnpr's own surface, which is the authority for +/// everything the store can hold today: pnpr fetches anonymously (no +/// credential forwarding yet), so cached content is either pnpr-hosted or +/// publicly fetchable. When credential forwarding lands, packages the +/// client resolved from *external* registries under its own token become +/// reachable, and those carry no pnpr policy — their access must then be +/// re-verified per caller against the owning registry (with a TTL'd +/// verdict cache), which this local check does not cover. +fn deny_unauthorized_packages( + policies: &PackagePolicies, + identity: &Identity, + served: &[diff::PackageIndexEntry], +) -> Option { + let mut checked: HashSet<&str> = HashSet::new(); + for entry in served { + let Some(name) = package_name(&entry.pkg_id) else { continue }; + // Access is decided per package name, so evaluate each name once. + if !checked.insert(name) { + continue; + } + if !policies.for_package(name).access.allows(identity) { + let status = match identity { + Identity::Anonymous => StatusCode::UNAUTHORIZED, + Identity::User { .. } => StatusCode::FORBIDDEN, + }; + return Some(json_error(status, &format!("not authorized to access {name:?}"))); + } + } + None +} + +/// The package name from a `name@version` package id, tolerating a +/// leading scope `@` (`@scope/foo@1.0.0` → `@scope/foo`). +fn package_name(pkg_id: &str) -> Option<&str> { + let at = pkg_id.rfind('@')?; + (at > 0).then_some(&pkg_id[..at]) +} + +/// gzip level for the install response body. Level 6 (the gzip default) +/// shrinks the payload ~16% over level 1 — the win that matters once the +/// server is across a latency link, where fewer bytes means fewer TCP +/// slow-start round trips — while level 9 adds under a percent for several +/// times the CPU. const FILES_GZIP_LEVEL: u32 = 6; -/// Content type of the combined `inlineFiles` install response: a -/// length-prefixed JSON header followed by the [`build_files_payload`] -/// binary frames, gzip-compressed. +/// Content type of the install response: a length-prefixed JSON header +/// followed by the [`build_files_payload`] binary frames, gzip-compressed. const INLINE_CONTENT_TYPE: &str = "application/x-pnpr-install-inline"; -/// Build the combined single-response body for an `inlineFiles` request: -/// the lockfile, stats, and store-index entries in a length-prefixed JSON -/// header, followed by the missing files' contents as `/v1/files` binary -/// frames — so the client materializes everything from one round trip. +/// Build the single-response body: the lockfile, stats, and store-index +/// entries in a length-prefixed JSON header, followed by the contents of +/// the files the client is missing as binary frames — so the client +/// materializes everything from one round trip. fn inline_response( runtime: &InstallAccelerator, lockfile: &Lockfile, @@ -439,27 +471,16 @@ async fn verify_input_lockfile( Err(VerifyFailure::Violations(rendered)) } -/// Render input-lockfile policy violations in the protocol the client -/// asked for: the inline header (`{ "violations": [...] }`, no files) for -/// an `inlineFiles` request, otherwise a 200 NDJSON `E` line. Either way -/// the client rebuilds the identical `VerifyError` and aborts the same -/// way the local gate would. -fn violation_response(violations: &[serde_json::Value], request: &InstallRequest) -> Response { - if request.inline_files { - let header = serde_json::json!({ "violations": violations }); - // No files follow a verification failure: just the end-of-stream - // marker so the client's frame parser terminates cleanly. - let files_payload = empty_files_payload(); - return finish_inline_response(&header, &files_payload); - } - - let payload = serde_json::json!({ "violations": violations }); - let body = format!("E\t{payload}\n"); - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/x-ndjson") - .body(Body::from(body)) - .expect("static ndjson violation response is always valid") +/// Render input-lockfile policy violations into the inline response +/// header (`{ "violations": [...] }`, no files following) so the client +/// rebuilds the identical `VerifyError` and aborts the same way the local +/// gate would. +fn violation_response(violations: &[serde_json::Value]) -> Response { + let header = serde_json::json!({ "violations": violations }); + // No files follow a verification failure: just the end-of-stream + // marker so the client's frame parser terminates cleanly. + let files_payload = empty_files_payload(); + finish_inline_response(&header, &files_payload) } /// Merge every active verifier's policy snapshot into one bag, the key @@ -479,52 +500,11 @@ fn merge_policies( merged } -/// Handle `POST /v1/files`. -pub(crate) async fn handle_files(runtime: &InstallAccelerator, body: Bytes) -> Response { - let request: FilesRequest = match serde_json::from_slice(&body) { - Ok(request) => request, - Err(err) => return json_error(StatusCode::BAD_REQUEST, &err.to_string()), - }; - - for (index, file) in request.digests.iter().enumerate() { - if !is_valid_sha512_hex(&file.digest) { - return json_error( - StatusCode::BAD_REQUEST, - &format!("digests[{index}].digest must be a valid sha512 hex string"), - ); - } - } - - // Build the binary payload up front so a missing file surfaces as a - // clean 500 before any bytes are committed to the response. - let files = request.digests.iter().map(|file| (file.digest.as_str(), file.executable)); - let payload = match build_files_payload(&runtime.store_dir, files) { - Ok(payload) => payload, - Err((status, message)) => return json_error(status, &message), - }; - - let mut encoder = GzEncoder::new(Vec::new(), Compression::new(FILES_GZIP_LEVEL)); - if encoder.write_all(&payload).is_err() { - return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"); - } - let gzipped = match encoder.finish() { - Ok(gzipped) => gzipped, - Err(_) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"), - }; - - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/x-pnpm-install") - .header(header::CONTENT_ENCODING, "gzip") - .body(Body::from(gzipped)) - .expect("binary response is always valid") -} - -/// The binary frames `/v1/files` serves and the `inlineFiles` install -/// response embeds: a 2-byte `{}` JSON header (length-prefixed) followed -/// by one `[64-byte digest][u32 size][1-byte exec][content]` frame per -/// file, terminated by 64 zero bytes. Reads each file's content from the -/// store by digest; an `Err` is a ready-made error response. +/// The binary file frames the install response embeds: a 2-byte `{}` JSON +/// header (length-prefixed) followed by one +/// `[64-byte digest][u32 size][1-byte exec][content]` frame per file, +/// terminated by 64 zero bytes. Reads each file's content from the store +/// by digest; an `Err` is a ready-made error response. fn build_files_payload<'a>( store_dir: &StoreDir, files: impl Iterator, diff --git a/pnpr/crates/pnpr/src/install_accelerator/diff.rs b/pnpr/crates/pnpr/src/install_accelerator/diff.rs index 380376148d..d45fc3b8ac 100644 --- a/pnpr/crates/pnpr/src/install_accelerator/diff.rs +++ b/pnpr/crates/pnpr/src/install_accelerator/diff.rs @@ -27,7 +27,6 @@ pub struct ResolvedPackage { pub struct MissingFile { /// Lowercase sha512 hex digest (no `sha512-` prefix). pub digest: String, - pub size: u64, pub executable: bool, } @@ -141,11 +140,7 @@ pub fn compute_diff( if client_digests.insert(key) { stats.files_to_download += 1; stats.download_bytes += file.size; - missing_files.push(MissingFile { - digest: file.digest.clone(), - size: file.size, - executable, - }); + missing_files.push(MissingFile { digest: file.digest.clone(), executable }); } else { stats.files_already_in_cafs += 1; } diff --git a/pnpr/crates/pnpr/src/install_accelerator/protocol.rs b/pnpr/crates/pnpr/src/install_accelerator/protocol.rs index 7ef0aba360..5c04ea471a 100644 --- a/pnpr/crates/pnpr/src/install_accelerator/protocol.rs +++ b/pnpr/crates/pnpr/src/install_accelerator/protocol.rs @@ -115,17 +115,6 @@ pub struct InstallRequest { /// check. #[serde(default)] pub trust_policy_ignore_after: Option, - /// When `true`, the client wants the file contents it's missing - /// streamed inline in this response rather than fetched in a second - /// `POST /v1/files` round trip. The server answers with a single - /// gzipped binary body (a length-prefixed header carrying the - /// lockfile, stats, and store-index entries, followed by the - /// `/v1/files` binary frames) instead of the NDJSON stream. Cuts the - /// cold-path round trips from three (handshake + install + files) to - /// one. See - /// [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165). - #[serde(default)] - pub inline_files: bool, } /// One project's importer dir and its dependency maps, normalized @@ -162,24 +151,3 @@ impl InstallRequest { }] } } - -#[derive(Debug, Deserialize)] -pub struct FilesRequest { - pub digests: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct FileDigest { - pub digest: String, - #[serde(default)] - pub executable: bool, -} - -/// A valid sha512 digest is 128 lowercase hex chars. The all-zero -/// digest is rejected because it collides with the 64-byte end-of- -/// stream marker in the `/v1/files` binary framing. -pub fn is_valid_sha512_hex(digest: &str) -> bool { - digest.len() == 128 - && digest.bytes().all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte)) - && digest.bytes().any(|byte| byte != b'0') -} diff --git a/pnpr/crates/pnpr/src/install_accelerator/tests.rs b/pnpr/crates/pnpr/src/install_accelerator/tests.rs new file mode 100644 index 0000000000..08cca8bafc --- /dev/null +++ b/pnpr/crates/pnpr/src/install_accelerator/tests.rs @@ -0,0 +1,73 @@ +//! Tests for the per-caller access gate the install accelerator applies +//! before serving a package's files: a digest in the store is not a +//! bearer capability, so [`deny_unauthorized_packages`] checks every +//! served package against pnpr's own `packages:` policy. + +use axum::http::StatusCode; + +use super::{deny_unauthorized_packages, diff::PackageIndexEntry}; +use crate::policy::{AccessList, Identity, PackagePolicies, PackagePolicy}; + +fn served(name: &str) -> Vec { + vec![PackageIndexEntry { + integrity: "sha512-deadbeef".to_string(), + pkg_id: format!("{name}@1.0.0"), + raw: Vec::new(), + }] +} + +fn anonymous() -> Identity { + Identity::Anonymous +} + +fn user() -> Identity { + Identity::User { username: "alice".to_string() } +} + +/// `registry_mock_defaults` gates `@private/*` to `$authenticated`. +fn policies() -> PackagePolicies { + PackagePolicies::registry_mock_defaults() +} + +/// `@team/*` is restricted to the single user `alice`, so an authenticated +/// caller who isn't `alice` is forbidden rather than merely unauthenticated. +fn team_owned_by_alice() -> PackagePolicies { + let team = + PackagePolicy::new("@team/*", AccessList::parse("alice"), AccessList::parse("alice")) + .expect("pattern compiles"); + let rest = + PackagePolicy::new("**", AccessList::parse("$all"), AccessList::parse("$authenticated")) + .expect("pattern compiles"); + PackagePolicies::new(vec![team, rest]) +} + +#[test] +fn anonymous_caller_is_denied_a_private_package() { + let denied = deny_unauthorized_packages(&policies(), &anonymous(), &served("@private/foo")); + assert_eq!(denied.map(|response| response.status()), Some(StatusCode::UNAUTHORIZED)); +} + +#[test] +fn authenticated_caller_is_allowed_a_private_package() { + let denied = deny_unauthorized_packages(&policies(), &user(), &served("@private/foo")); + assert!(denied.is_none()); +} + +#[test] +fn anonymous_caller_is_allowed_a_public_package() { + let denied = deny_unauthorized_packages(&policies(), &anonymous(), &served("is-positive")); + assert!(denied.is_none()); +} + +#[test] +fn authenticated_caller_outside_the_allowed_set_is_forbidden() { + let bob = Identity::User { username: "bob".to_string() }; + let denied = deny_unauthorized_packages(&team_owned_by_alice(), &bob, &served("@team/foo")); + assert_eq!(denied.map(|response| response.status()), Some(StatusCode::FORBIDDEN)); +} + +#[test] +fn authenticated_caller_in_the_allowed_set_is_allowed() { + let denied = deny_unauthorized_packages(&team_owned_by_alice(), &user(), &served("@team/foo")); + assert!(denied.is_none()); +} diff --git a/pnpr/crates/pnpr/src/server.rs b/pnpr/crates/pnpr/src/server.rs index 42bb008fcf..be7c70a530 100644 --- a/pnpr/crates/pnpr/src/server.rs +++ b/pnpr/crates/pnpr/src/server.rs @@ -64,9 +64,8 @@ struct AppInner { upstreams: IndexMap, config: Config, auth: AuthState, - /// Lazily-built engine backing the `/v1/install` and `/v1/files` - /// endpoints. Built on first such request so servers that never - /// receive one pay nothing. + /// Lazily-built engine backing the `/v1/install` endpoint. Built on + /// first such request so servers that never receive one pay nothing. install_accelerator: std::sync::OnceLock, } @@ -111,7 +110,6 @@ pub fn router_with_auth(config: Config, auth: AuthState) -> Router { // is the capability handshake (404 on a plain registry). .route("/-/pnpr", get(serve_pnpr_handshake)) .route("/v1/install", post(serve_install)) - .route("/v1/files", post(serve_files)) .route("/{name}", get(get_packument_unscoped).put(put_one_segment)) .route("/{first}/{second}", get(get_two_segments).put(put_two_segments)) .route( @@ -133,17 +131,15 @@ pub fn router_with_auth(config: Config, auth: AuthState) -> Router { // front, so the application is the only layer that can compress. // Scoped to JSON: the binary endpoints are excluded so we never // re-gzip an already-compressed payload — tarballs - // (`application/octet-stream`, already `.tgz`), the install - // accelerator's file stream (`application/x-pnpm-install`, already - // gzipped), and its NDJSON resolve response - // (`application/x-ndjson`). Already-`Content-Encoding` responses - // are skipped by the layer regardless. + // (`application/octet-stream`, already `.tgz`) and the install + // accelerator response (`application/x-pnpr-install-inline`, + // already gzipped). Already-`Content-Encoding` responses are + // skipped by the layer regardless. .layer( CompressionLayer::new().compress_when( DefaultPredicate::new() .and(NotForContentType::const_new("application/octet-stream")) - .and(NotForContentType::const_new("application/x-pnpm-install")) - .and(NotForContentType::const_new("application/x-ndjson")), + .and(NotForContentType::const_new("application/x-pnpr-install-inline")), ), ) // One structured access record per HTTP request: a span @@ -1538,18 +1534,28 @@ async fn serve_pnpr_handshake() -> Response { (StatusCode::OK, axum::Json(serde_json::json!({ "pnpr": { "versions": [1] } }))).into_response() } -async fn serve_install(State(state): State, body: axum::body::Bytes) -> Response { +async fn serve_install( + State(state): State, + headers: HeaderMap, + body: axum::body::Bytes, +) -> Response { let runtime = crate::install_accelerator::InstallAccelerator::get_or_init( &state.inner.install_accelerator, &state.inner.config, ); - crate::install_accelerator::handle_install(runtime, body).await -} - -async fn serve_files(State(state): State, body: axum::body::Bytes) -> Response { - let runtime = crate::install_accelerator::InstallAccelerator::get_or_init( - &state.inner.install_accelerator, - &state.inner.config, - ); - crate::install_accelerator::handle_files(runtime, body).await + let identity = match identify( + headers.get(header::AUTHORIZATION).and_then(|value| value.to_str().ok()), + &state.inner.auth.users, + &state.inner.auth.tokens, + ) { + Some(username) => Identity::User { username }, + None => Identity::Anonymous, + }; + crate::install_accelerator::handle_install( + runtime, + &state.inner.config.policies, + identity, + body, + ) + .await } diff --git a/pnpr/crates/pnpr/tests/install_accelerator.rs b/pnpr/crates/pnpr/tests/install_accelerator.rs deleted file mode 100644 index 4d508960c4..0000000000 --- a/pnpr/crates/pnpr/tests/install_accelerator.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! Integration tests for the pnpr install-accelerator endpoints. -//! -//! `/v1/install` resolves against an upstream registry and is covered -//! by the broader install suite; these tests exercise the network-free -//! `/v1/files` binary framing end to end through the axum router. - -use std::{ - io::Read as _, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, -}; - -use axum::{ - body::{Body, to_bytes}, - http::{Request, StatusCode}, -}; -use flate2::read::GzDecoder; -use pacquet_store_dir::StoreDir; -use pnpr::{Config, router}; -use serde_json::json; -use tempfile::TempDir; -use tower::ServiceExt; - -fn config_for(storage: std::path::PathBuf) -> Config { - let listen = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 4873)); - Config::proxy(listen, storage) -} - -/// Decode the `/v1/files` payload into `(digest_bytes, mode, content)` -/// frames, validating the JSON header and the end-of-stream marker. -fn parse_files_payload(payload: &[u8]) -> Vec<([u8; 64], u8, Vec)> { - let json_len = u32::from_be_bytes(payload[0..4].try_into().unwrap()) as usize; - assert_eq!(&payload[4..4 + json_len], b"{}"); - - let mut offset = 4 + json_len; - let mut frames = Vec::new(); - loop { - let mut digest = [0u8; 64]; - digest.copy_from_slice(&payload[offset..offset + 64]); - if digest == [0u8; 64] { - break; // end-of-stream marker - } - let size = - u32::from_be_bytes(payload[offset + 64..offset + 68].try_into().unwrap()) as usize; - let mode = payload[offset + 68]; - let content_start = offset + 69; - let content = payload[content_start..content_start + size].to_vec(); - frames.push((digest, mode, content)); - offset = content_start + size; - } - frames -} - -#[tokio::test] -async fn files_endpoint_serves_a_cafs_file_by_digest() { - let tmp = TempDir::new().unwrap(); - let config = config_for(tmp.path().to_path_buf()); - - // Seed a file into the same content-addressable store the pnpr - // runtime reads from (`/pnpr-store`). - let store = StoreDir::new(tmp.path().join("pnpr-store")); - let content = b"console.log('hello from pnpr')\n"; - let (_path, hash) = store.write_cas_file(content, false).expect("write cas file"); - let digest = format!("{hash:x}"); - - let app = router(config); - let request_body = json!({ "digests": [{ "digest": digest, "executable": false }] }); - let response = app - .oneshot( - Request::post("/v1/files") - .header("content-type", "application/json") - .body(Body::from(request_body.to_string())) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response.headers().get("content-encoding").map(|value| value.to_str().unwrap()), - Some("gzip"), - ); - - let gzipped = to_bytes(response.into_body(), usize::MAX).await.unwrap(); - let mut decoder = GzDecoder::new(&gzipped[..]); - let mut payload = Vec::new(); - decoder.read_to_end(&mut payload).unwrap(); - - let frames = parse_files_payload(&payload); - assert_eq!(frames.len(), 1); - let (digest_bytes, mode, served) = &frames[0]; - assert_eq!(&served[..], content); - assert_eq!(*mode, 0); - assert_eq!(format!("{:x}", hash), hex(digest_bytes)); -} - -#[tokio::test] -async fn files_endpoint_rejects_an_invalid_digest() { - let tmp = TempDir::new().unwrap(); - let app = router(config_for(tmp.path().to_path_buf())); - - let request_body = json!({ "digests": [{ "digest": "not-a-sha512", "executable": false }] }); - let response = app - .oneshot( - Request::post("/v1/files") - .header("content-type", "application/json") - .body(Body::from(request_body.to_string())) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), StatusCode::BAD_REQUEST); -} - -fn hex(bytes: &[u8]) -> String { - bytes.iter().map(|byte| format!("{byte:02x}")).collect() -} diff --git a/worker/src/index.ts b/worker/src/index.ts index 0d50f982d4..bf5910fa91 100644 --- a/worker/src/index.ts +++ b/worker/src/index.ts @@ -14,11 +14,11 @@ import pLimit from 'p-limit' import type { AddDirToStoreMessage, - FetchAndWriteCafsMessage, HardLinkDirMessage, LinkPkgMessage, SymlinkAllModulesMessage, TarballExtractMessage, + WriteCafsFilesMessage, } from './types.js' let workerPool: WorkerPool | undefined @@ -201,10 +201,9 @@ export async function addFilesFromTarball (opts: AddFilesFromTarballOptions): Pr } -export async function fetchAndWriteCafsFiles (opts: { - registryUrl: string +export async function writeCafsFiles (opts: { storeDir: string - digests: FetchAndWriteCafsMessage['digests'] + payload: Uint8Array }): Promise { if (!workerPool) { workerPool = createTarballWorkerPool() @@ -214,17 +213,16 @@ export async function fetchAndWriteCafsFiles (opts: { localWorker.once('message', ({ status, error, filesWritten }) => { workerPool!.checkinWorker(localWorker) if (status === 'error') { - reject(new PnpmError('CAFS_FETCH_WRITE', error.message)) + reject(new PnpmError('CAFS_WRITE', error.message)) return } resolve(filesWritten) }) localWorker.postMessage({ - type: 'fetch-and-write-cafs', - registryUrl: opts.registryUrl, + type: 'write-cafs-files', storeDir: opts.storeDir, - digests: opts.digests, - } satisfies FetchAndWriteCafsMessage) + payload: opts.payload, + } satisfies WriteCafsFilesMessage) }) } diff --git a/worker/src/start.ts b/worker/src/start.ts index e093904ca0..be1e58459f 100644 --- a/worker/src/start.ts +++ b/worker/src/start.ts @@ -28,13 +28,13 @@ import type { BundledManifest, DependencyManifest } from '@pnpm/types' import { equalOrSemverEqual } from './equalOrSemverEqual.js' import type { AddDirToStoreMessage, - FetchAndWriteCafsMessage, HardLinkDirMessage, InitStoreMessage, LinkPkgMessage, ReadPkgFromCafsMessage, SymlinkAllModulesMessage, TarballExtractMessage, + WriteCafsFilesMessage, } from './types.js' export function startWorker (): void { @@ -65,7 +65,7 @@ async function handleMessage ( | SymlinkAllModulesMessage | HardLinkDirMessage | InitStoreMessage - | FetchAndWriteCafsMessage + | WriteCafsFilesMessage | false ): Promise { if (message === false) { @@ -169,8 +169,8 @@ async function handleMessage ( parentPort!.postMessage({ status: 'success' }) break } - case 'fetch-and-write-cafs': { - parentPort!.postMessage(await fetchAndWriteCafs(message)) + case 'write-cafs-files': { + parentPort!.postMessage(await writeCafsFiles(message)) break } } @@ -501,190 +501,73 @@ function symlinkAllModules (opts: SymlinkAllModulesMessage): { status: 'success' return { status: 'success' } } -async function fetchAndWriteCafs (message: FetchAndWriteCafsMessage): Promise<{ status: string, filesWritten: number }> { - const http = await import('node:http') - const https = await import('node:https') - const { URL } = await import('node:url') - const { createGunzip } = await import('node:zlib') +async function writeCafsFiles (message: WriteCafsFilesMessage): Promise<{ status: string, filesWritten: number }> { const { contentPathFromHex } = await import('@pnpm/store.cafs') - // Preserve any path prefix on the pnpr server URL (e.g. https://host/pnpr/) - // by normalizing the base and using a relative URL. - const base = message.registryUrl.endsWith('/') ? message.registryUrl : `${message.registryUrl}/` - const url = new URL('v1/files', base) - const requestFn = url.protocol === 'https:' ? https.request : http.request - const body = JSON.stringify({ digests: message.digests }) + // `message.payload` is the already-decompressed file portion of a + // `/v1/install` response: a length-prefixed JSON header, then one + // `[64-byte digest][u32 size][1-byte exec][content]` frame per file, + // terminated by 64 zero bytes. + const payload = Buffer.from(message.payload.buffer, message.payload.byteOffset, message.payload.byteLength) + const END_MARKER = Buffer.alloc(64, 0) const createdDirs = new Set() - // Build a set of digests we actually requested, so we can reject a - // misbehaving pnpr server that streams unrelated entries and tries to write - // unbounded files into our CAFS. The set is keyed by `${digest}:${exec}` - // because the same digest may appear with different modes. - const requestedDigests = new Set() - for (const d of message.digests) { - requestedDigests.add(`${d.digest}:${d.executable ? 'x' : ''}`) + if (payload.length < 4) { + throw new Error('pnpr server /v1/install file payload is truncated') + } + // Skip the length-prefixed JSON header that precedes the frames. + const jsonLen = payload.readUInt32BE(0) + let offset = 4 + jsonLen + let filesWritten = 0 + let endMarkerSeen = false + + while (offset + 64 <= payload.length) { + if (payload.subarray(offset, offset + 64).equals(END_MARKER)) { + endMarkerSeen = true + offset += 64 + break + } + if (offset + 69 > payload.length) break // 64 digest + 4 size + 1 mode + const size = payload.readUInt32BE(offset + 64) + const entryLen = 69 + size + if (offset + entryLen > payload.length) break // incomplete entry + + const digest = payload.subarray(offset, offset + 64).toString('hex') + const executable = (payload[offset + 68] & 0x01) !== 0 + const content = payload.subarray(offset + 69, offset + entryLen) + + const relPath = contentPathFromHex(executable ? 'exec' : 'nonexec', digest) + const fullPath = path.join(message.storeDir, relPath) + const dir = path.dirname(fullPath) + if (!createdDirs.has(dir)) { + fs.mkdirSync(dir, { recursive: true }) + createdDirs.add(dir) + } + try { + fs.writeFileSync(fullPath, content, { flag: 'wx', mode: executable ? 0o755 : 0o644 }) + } catch (err: unknown) { + if (!(err instanceof Error && 'code' in err && (err as NodeJS.ErrnoException).code === 'EEXIST')) { + throw err + } + // EEXIST means the same digest is already at this CAFS path. CAFS is + // content-addressed, so a complete file is by definition correct. But a + // previous process could have crashed mid-write and left a truncated + // file — the pnpr path skips integrity verification, so we'd silently + // install garbage. Detect truncation by size and overwrite atomically. + const onDiskSize = fs.statSync(fullPath).size + if (onDiskSize !== content.length) { + const tmpPath = `${fullPath}.tmp-${process.pid}-${Date.now()}` + fs.writeFileSync(tmpPath, content, { mode: executable ? 0o755 : 0o644 }) + fs.renameSync(tmpPath, fullPath) + } + } + filesWritten++ + offset += entryLen } - // Stream: HTTP response → gunzip → parse entries → write to CAFS. - // No buffering — files are written as data arrives. - return new Promise<{ status: string, filesWritten: number }>((resolve, reject) => { - let filesWritten = 0 - let buf = Buffer.alloc(0) - let headerSkipped = false - let endMarkerSeen = false - const END_MARKER = Buffer.alloc(64, 0) - - const processBuffer = () => { - // Skip JSON header on first chunk - if (!headerSkipped && buf.length >= 4) { - const jsonLen = buf.readUInt32BE(0) - if (buf.length >= 4 + jsonLen) { - buf = buf.subarray(4 + jsonLen) - headerSkipped = true - } else { - return - } - } - - // Parse complete file entries from the buffer - while (headerSkipped && !endMarkerSeen) { - if (buf.length < 64) break - if (buf.subarray(0, 64).equals(END_MARKER)) { - buf = buf.subarray(64) - endMarkerSeen = true - break - } - if (buf.length < 69) break // 64 digest + 4 size + 1 mode - - const size = buf.readUInt32BE(64) - const entryLen = 69 + size - if (buf.length < entryLen) break // incomplete entry - - const digest = buf.subarray(0, 64).toString('hex') - const executable = (buf[68] & 0x01) !== 0 - - const requestKey = `${digest}:${executable ? 'x' : ''}` - if (!requestedDigests.has(requestKey)) { - throw new Error(`pnpr server /v1/files returned an entry that was not requested: digest=${digest} executable=${String(executable)}`) - } - // Consume the request so duplicates past the requested count also fail. - requestedDigests.delete(requestKey) - - const content = buf.subarray(69, entryLen) - - const relPath = contentPathFromHex(executable ? 'exec' : 'nonexec', digest) - const fullPath = path.join(message.storeDir, relPath) - const dir = path.dirname(fullPath) - if (!createdDirs.has(dir)) { - fs.mkdirSync(dir, { recursive: true }) - createdDirs.add(dir) - } - try { - fs.writeFileSync(fullPath, content, { flag: 'wx', mode: executable ? 0o755 : 0o644 }) - } catch (err: unknown) { - if (!(err instanceof Error && 'code' in err && (err as NodeJS.ErrnoException).code === 'EEXIST')) { - throw err - } - // EEXIST means the same digest is already at this CAFS path. CAFS - // is content-addressed, so a complete file is by definition correct. - // But a previous process could have crashed mid-write and left a - // truncated file — the pnpr server path skips integrity verification, so - // we'd silently install garbage. Detect truncation by size and - // overwrite atomically if the on-disk file is the wrong length. - const onDiskSize = fs.statSync(fullPath).size - if (onDiskSize !== content.length) { - const tmpPath = `${fullPath}.tmp-${process.pid}-${Date.now()}` - fs.writeFileSync(tmpPath, content, { mode: executable ? 0o755 : 0o644 }) - fs.renameSync(tmpPath, fullPath) - } - } - filesWritten++ - buf = buf.subarray(entryLen) - } - } - - // processBuffer is called from stream event handlers where a thrown - // exception would become `uncaughtException` and crash the worker. - // Surface errors via the Promise rejection instead. - const safeProcessBuffer = (): boolean => { - try { - processBuffer() - return true - } catch (err) { - reject(err) - req.destroy() - return false - } - } - - // Match the NDJSON client timeout — a stalled connection would otherwise - // hang the install indefinitely waiting on `fileDownloads`. - const FILES_REQUEST_TIMEOUT_MS = 600_000 - - const req = requestFn(url, { - method: 'POST', - timeout: FILES_REQUEST_TIMEOUT_MS, - headers: { - 'Content-Type': 'application/json', - 'Content-Length': Buffer.byteLength(body), - 'Accept-Encoding': 'gzip', - }, - }, (res: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any - // Non-2xx responses are JSON error bodies from the pnpr server; read - // and reject so we never try to gunzip an error payload as a file stream. - if (typeof res.statusCode === 'number' && (res.statusCode < 200 || res.statusCode >= 300)) { - const chunks: Buffer[] = [] - res.on('data', (chunk: Buffer) => chunks.push(chunk)) - res.on('end', () => { - reject(new Error(`pnpr server /v1/files responded with ${res.statusCode}: ${Buffer.concat(chunks).toString('utf-8')}`)) - }) - res.on('error', reject) - return - } - - let stream: NodeJS.ReadableStream = res - if (res.headers['content-encoding'] === 'gzip') { - const gunzip = createGunzip() - res.pipe(gunzip) - stream = gunzip - } - - stream.on('data', (chunk: Buffer) => { - buf = Buffer.concat([buf, chunk]) - safeProcessBuffer() - }) - stream.on('end', () => { - if (!safeProcessBuffer()) return - // Guard against a truncated response: the server must terminate the - // stream with the 64-byte end marker. If it didn't, or if there's a - // partial entry still in `buf`, fail — otherwise we'd silently leave - // the CAFS missing files. - if (!endMarkerSeen) { - reject(new Error('pnpr server /v1/files stream ended without the end marker')) - return - } - if (buf.length > 0) { - reject(new Error(`pnpr server /v1/files stream left ${buf.length} unparsed bytes after end marker`)) - return - } - // Every received entry was drained from `requestedDigests` as it was - // parsed; anything still in the set means the server ended cleanly - // but omitted files, which would silently leave the CAFS incomplete. - if (requestedDigests.size > 0) { - const sample = [...requestedDigests].slice(0, 3).join(', ') - reject(new Error(`pnpr server /v1/files omitted ${requestedDigests.size} requested entries (e.g. ${sample})`)) - return - } - resolve({ status: 'success', filesWritten }) - }) - stream.on('error', reject) - }) - req.on('timeout', () => { - req.destroy(new Error(`pnpr server /v1/files request timed out after ${FILES_REQUEST_TIMEOUT_MS / 1000}s`)) - }) - req.on('error', reject) - req.write(body) - req.end() - }) + if (!endMarkerSeen) { + throw new Error('pnpr server /v1/install file payload ended without the end marker') + } + return { status: 'success', filesWritten } } diff --git a/worker/src/types.ts b/worker/src/types.ts index 226e14a5f3..a3a8fd9f51 100644 --- a/worker/src/types.ts +++ b/worker/src/types.ts @@ -80,9 +80,14 @@ export interface HardLinkDirMessage { destDirs: string[] } -export interface FetchAndWriteCafsMessage { - type: 'fetch-and-write-cafs' - registryUrl: string +export interface WriteCafsFilesMessage { + type: 'write-cafs-files' storeDir: string - digests: Array<{ digest: string, size: number, executable: boolean }> + /** + * The binary file frames from a `/v1/install` response, already + * decompressed: a length-prefixed JSON header followed by one + * `[64-byte digest][u32 size][1-byte exec][content]` frame per file, + * terminated by 64 zero bytes. + */ + payload: Uint8Array }