diff --git a/.changeset/pnpr-inline-only-access.md b/.changeset/pnpr-inline-only-access.md new file mode 100644 index 0000000000..c1426d0f38 --- /dev/null +++ b/.changeset/pnpr-inline-only-access.md @@ -0,0 +1,7 @@ +--- +"@pnpm/pnpr.client": patch +"@pnpm/worker": patch +"pnpm": patch +--- + +The pnpr install accelerator now serves resolved files only in the single gzipped `POST /v1/install` response and authorizes every package whose bytes it serves against the server's access policy. The separate unauthenticated `POST /v1/files` endpoint has been removed: the client materializes the inlined files straight into its content-addressable store, and a content-addressed digest is no longer a bearer capability for a package the caller cannot read. diff --git a/pnpm/test/install/pnpmRegistry.ts b/pnpm/test/install/pnpmRegistry.ts index c45b86d3d4..0a0393f271 100644 --- a/pnpm/test/install/pnpmRegistry.ts +++ b/pnpm/test/install/pnpmRegistry.ts @@ -11,8 +11,8 @@ import { writeYamlFileSync } from 'write-yaml-file' import { execPnpm } from '../utils/index.js' // The pnpr server started by the test harness (see the with-registry jest -// preset) serves the install-accelerator endpoints (/v1/install, /v1/files) -// on the registry-mock port, so it doubles as the pnpr server under test. +// preset) serves the install-accelerator endpoint (/v1/install) on the +// registry-mock port, so it doubles as the pnpr server under test. const PNPR = `http://localhost:${REGISTRY_MOCK_PORT}` let server: http.Server diff --git a/pnpr/client/src/fetchFromPnpmRegistry.ts b/pnpr/client/src/fetchFromPnpmRegistry.ts index bf8b34bf7a..02723926c1 100644 --- a/pnpr/client/src/fetchFromPnpmRegistry.ts +++ b/pnpr/client/src/fetchFromPnpmRegistry.ts @@ -1,11 +1,12 @@ import http from 'node:http' import https from 'node:https' import { URL } from 'node:url' +import { gunzip } from 'node:zlib' import { convertToLockfileObject } from '@pnpm/lockfile.fs' import type { LockfileFile, LockfileObject } from '@pnpm/lockfile.types' import { StoreIndex } from '@pnpm/store.index' -import { fetchAndWriteCafsFiles } from '@pnpm/worker' +import { writeCafsFiles } from '@pnpm/worker' import type { ResponseMetadata } from './protocol.js' @@ -47,10 +48,9 @@ export interface FetchFromPnpmRegistryOptions { /** * `--lockfile-only`: resolve and return only the lockfile — fetch no * files into the local store. Forwarded to the server (which skips the - * file diff when it understands the flag); the client also ignores any - * `D`/`I` lines so the store stays untouched even against an older - * server. Mirrors pnpm's resolve + write, fetch nothing, link nothing. - * See https://github.com/pnpm/pnpm/issues/12146. + * file diff); the client ignores the (empty) file payload so the store + * stays untouched. Mirrors pnpm's resolve + write, fetch nothing, link + * nothing. See https://github.com/pnpm/pnpm/issues/12146. */ lockfileOnly?: boolean } @@ -64,16 +64,21 @@ export interface FetchFromPnpmRegistryResult { indexEntries: Array<{ key: string, buffer: Uint8Array }> } +interface InstallResponseHeader { + lockfile: LockfileFile + stats: ResponseMetadata['stats'] + indexEntries?: Array<{ key: string, b64: string }> + violations?: Array<{ name: string, version: string, code: string, reason: string }> +} + /** - * Fetch resolved dependencies from a pnpr server. + * Fetch resolved dependencies from a pnpr server in a single round trip. * - * The response is a streaming NDJSON where each line is one message: - * - `D\t{digest}\t{size}\t{executable}\n` — file digest (streamed as packages resolve) - * - `L\t{json}\n` — final lockfile (on-disk format) + stats (after resolution) - * - `I\t{key}\t{base64}\n` — pre-packed msgpack index entry - * - * As digest lines arrive, we batch them and dispatch workers to /v1/files. - * File downloads happen IN PARALLEL with server-side resolution. + * `POST /v1/install` (with `inlineFiles`) answers with one gzipped binary + * body: a length-prefixed JSON header (lockfile, stats, store-index + * entries, or verification violations) followed by the missing files' + * contents as binary frames. We parse the header here and hand the file + * frames to a worker that writes them straight into the CAFS. */ export async function fetchFromPnpmRegistry ( opts: FetchFromPnpmRegistryOptions @@ -100,103 +105,48 @@ export async function fetchFromPnpmRegistry ( lockfile: opts.lockfile, lockfileOnly: opts.lockfileOnly, storeIntegrities, + inlineFiles: true, }) - const indexEntries: Array<{ key: string, buffer: Uint8Array }> = [] - const workerPromises: Array> = [] - let currentBatch: Array<{ digest: string, size: number, executable: boolean }> = [] + const body = await postInstall(opts.registryUrl, requestBody) - const dispatchBatch = () => { - if (currentBatch.length === 0) return - const digests = currentBatch - currentBatch = [] - workerPromises.push(fetchAndWriteCafsFiles({ - registryUrl: opts.registryUrl, - storeDir: opts.storeDir, - digests, - })) + // The combined response is `[u32 header length][header JSON][file frames]`. + if (body.length < 4) { + throw new Error('pnpr server returned a truncated /v1/install response') + } + const headerLength = body.readUInt32BE(0) + const header = JSON.parse(body.subarray(4, 4 + headerLength).toString('utf-8')) as InstallResponseHeader + + if (header.violations != null && header.violations.length > 0) { + const rendered = header.violations + .map((violation) => ` ${violation.name}@${violation.version}: ${violation.reason}`) + .join('\n') + throw new Error(`pnpr server rejected the lockfile under the verification policy:\n${rendered}`) } - // Returns as soon as the lockfile arrives — the stream continues - // in the background, dispatching more file download workers. - // fileDownloads covers ALL workers (past and future). - return new Promise((resolve, reject) => { - let resolved = false - let serverError: Error | undefined - const handleLine = (line: string) => { - if (line.length === 0) return - const tabIdx = line.indexOf('\t') - const type = line.charAt(0) - if (type === 'D') { - // `--lockfile-only` fetches nothing — ignore any file digests an - // older server still streams rather than writing them to the store. - if (opts.lockfileOnly) return - const parts = line.split('\t') - currentBatch.push({ - digest: parts[1], - size: parseInt(parts[2], 10), - executable: parts[3] === '1', - }) - if (currentBatch.length >= FILES_PER_WORKER) { - dispatchBatch() - } - } else if (type === 'L') { - const payload = JSON.parse(line.substring(tabIdx + 1)) as { - lockfile: LockfileFile - stats: ResponseMetadata['stats'] - } - dispatchBatch() - resolved = true - // Resolve immediately — the caller can start headless install - // while the stream continues dispatching remaining D/I lines. - resolve({ - // The server speaks the on-disk lockfile format; convert it to the - // in-memory `LockfileObject` the rest of pnpm consumes. - lockfile: convertToLockfileObject(payload.lockfile), - stats: payload.stats, - fileDownloads: streamComplete.then(() => - Promise.all(workerPromises) - ).then(() => {}), - indexEntries, - }) - } else if (type === 'I') { - // `--lockfile-only` writes no store-index entries — ignore any an - // older server still streams. - if (opts.lockfileOnly) return - // Format: I\t{integrity}\t{pkgId}\t{base64} - // Key is "{integrity}\t{pkgId}" — everything between first and last tab - const rest = line.substring(tabIdx + 1) - const lastTab = rest.lastIndexOf('\t') - const key = rest.substring(0, lastTab) - const buffer = new Uint8Array(Buffer.from(rest.substring(lastTab + 1), 'base64')) - indexEntries.push({ key, buffer }) - } else if (type === 'E') { - // Server emitted a structured error after headers were sent. - // Record it so stream `end` / `catch` can reject with the payload. - let message = 'pnpr server error' - try { - const payload = JSON.parse(line.substring(tabIdx + 1)) as { error?: string } - if (payload?.error) message = payload.error - } catch { - // Fall back to the raw payload if it isn't JSON. - message = line.substring(tabIdx + 1) || message - } - serverError = new Error(message) - } - } + const indexEntries = (header.indexEntries ?? []).map(({ key, b64 }) => ({ + key, + buffer: new Uint8Array(Buffer.from(b64, 'base64')), + })) - const streamComplete = streamNdjsonRequest( - opts.registryUrl, 'v1/install', requestBody, handleLine - ) + // `--lockfile-only` fetches nothing: there are no file frames to write + // (the server sends only the end-of-stream marker), so leave the store + // untouched. + const fileDownloads = opts.lockfileOnly + ? Promise.resolve() + : writeCafsFiles({ + storeDir: opts.storeDir, + payload: body.subarray(4 + headerLength), + }).then(() => {}) - streamComplete.then(() => { - if (serverError) { - reject(serverError) - } else if (!resolved) { - reject(new Error('pnpr server closed the stream without emitting a lockfile')) - } - }, reject) - }) + return { + // The server speaks the on-disk lockfile format; convert it to the + // in-memory `LockfileObject` the rest of pnpm consumes. + lockfile: convertToLockfileObject(header.lockfile), + stats: header.stats, + fileDownloads, + indexEntries, + } } function readStoreIntegrities (storeIndex: StoreIndex): string[] { @@ -220,59 +170,54 @@ function isIntegrityLike (value: string): boolean { value.startsWith('sha1-') } -const FILES_PER_WORKER = 4000 const REQUEST_TIMEOUT = 600_000 // 10 minutes — server-side resolution can be slow on first run /** - * Stream an NDJSON response, calling `onLine` for each complete line as - * it arrives. Chunks are buffered until a newline is seen. + * `POST /v1/install` and return the full response body, decompressed. + * + * `urlPath` resolution normalizes the base to end with "/" so a path + * prefix configured on the pnpr server URL (e.g. https://host/pnpr/) is + * preserved. */ -async function streamNdjsonRequest ( - registryUrl: string, - urlPath: string, - body: string, - onLine: (line: string) => void -): Promise { - // `urlPath` is expected to be relative (e.g. "v1/install"). We normalize - // the base to end with "/" so `new URL(rel, base)` preserves any path - // prefix configured on the pnpr server URL (e.g. https://host/pnpr/). +async function postInstall (registryUrl: string, body: string): Promise { const base = registryUrl.endsWith('/') ? registryUrl : `${registryUrl}/` - const url = new URL(urlPath, base) - const isHttps = url.protocol === 'https:' - const requestFn = isHttps ? https.request : http.request + const url = new URL('v1/install', base) + const requestFn = url.protocol === 'https:' ? https.request : http.request - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { const req = requestFn(url, { method: 'POST', timeout: REQUEST_TIMEOUT, headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body), - 'Accept': 'application/x-ndjson', + 'Accept-Encoding': 'gzip', }, }, (res) => { - if (res.statusCode !== 200) { - const chunks: Buffer[] = [] - res.on('data', (chunk: Buffer) => chunks.push(chunk)) - res.on('end', () => { - reject(new Error(`pnpr server responded with ${res.statusCode}: ${Buffer.concat(chunks).toString('utf-8')}`)) - }) - return - } - - let leftover = '' - res.setEncoding('utf-8') - res.on('data', (chunk: string) => { - const data = leftover + chunk - const lines = data.split('\n') - leftover = lines.pop() ?? '' - for (const line of lines) { - onLine(line) - } - }) + const chunks: Buffer[] = [] + res.on('data', (chunk: Buffer) => chunks.push(chunk)) res.on('end', () => { - if (leftover) onLine(leftover) - resolve() + const raw = Buffer.concat(chunks) + // The server gzips both the install body and its JSON error bodies + // (e.g. a 401/403 access denial), so decompress *before* branching + // on the status code — otherwise an error surfaces as binary + // garbage instead of the server's message. Skip it only when the + // HTTP stack already decompressed (no gzip magic bytes). + const finish = (body: Buffer): void => { + if (res.statusCode !== 200) { + reject(new Error(`pnpr server responded with ${res.statusCode}: ${body.toString('utf-8')}`)) + } else { + resolve(body) + } + } + if (res.headers['content-encoding'] === 'gzip' || (raw[0] === 0x1f && raw[1] === 0x8b)) { + gunzip(raw, (err, decompressed) => { + if (err) reject(err) + else finish(decompressed) + }) + } else { + finish(raw) + } }) res.on('error', reject) }) @@ -301,5 +246,3 @@ export function writeRawIndexEntries ( storeIndex.setRawMany(writes) } } - - diff --git a/pnpr/crates/pnpr/src/install_accelerator.rs b/pnpr/crates/pnpr/src/install_accelerator.rs index d32ef2e4bd..88ddf78a47 100644 --- a/pnpr/crates/pnpr/src/install_accelerator.rs +++ b/pnpr/crates/pnpr/src/install_accelerator.rs @@ -3,23 +3,23 @@ //! alongside pnpr's npm-compatible API. The handshake + endpoints are //! served under one base URL (the `pnprServer`). //! -//! Three routes, built on pacquet's resolver and content-addressable +//! Two routes, built on pacquet's resolver and content-addressable //! store: //! //! * `GET /-/pnpr` — capability handshake; advertises the supported //! protocol versions so a client can negotiate or fail fast. //! * `POST /v1/install` — resolve a project **against the registries //! the client sends** (so the server uses the same source of truth as -//! the client), then return the missing-file digests, pre-packed -//! store-index entries, lockfile, and stats. With `inlineFiles` the -//! client also gets the missing files' *contents* in the same response -//! (a length-prefixed JSON header followed by the `/v1/files` binary -//! frames, gzipped), collapsing the cold path to a single round trip -//! ([pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165)); -//! otherwise the response is the NDJSON stream of `D`/`I`/`L`/`E` -//! lines and the client fetches files separately. -//! * `POST /v1/files` — serve a batch of files by digest as a gzip -//! binary stream the client writes straight into its CAFS. +//! the client), then return, in a single gzipped binary response, the +//! lockfile, stats, pre-packed store-index entries, and the contents of +//! the files the client is missing (a length-prefixed JSON header +//! followed by the binary file frames). One round trip +//! ([pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165)). +//! +//! Files are bound to access: every package whose bytes are served is +//! checked against pnpr's `packages:` policy first +//! ([`deny_unauthorized_packages`]), so a content-addressed digest is +//! never a bearer capability for a package the caller can't read. //! //! The client's `registry`, `namedRegistries`, `overrides`, and the //! verification policy (`minimumReleaseAge`, `trustPolicy`, ...) drive @@ -38,14 +38,20 @@ mod protocol; mod resolve; mod verdict_cache; +#[cfg(test)] +mod tests; + use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, io::Write as _, path::PathBuf, sync::{Arc, Mutex, OnceLock}, }; -use crate::config::Config as RegistryConfig; +use crate::{ + config::Config as RegistryConfig, + policy::{Identity, PackagePolicies}, +}; use axum::{ body::{Body, Bytes}, @@ -64,10 +70,7 @@ use pacquet_resolving_npm_resolver::{InMemoryPackageMetaCache, PackageMetaCache} use pacquet_resolving_resolver_base::ResolutionVerifier; use pacquet_store_dir::{StoreDir, StoreIndex}; -use self::{ - protocol::{FilesRequest, InstallRequest, is_valid_sha512_hex}, - verdict_cache::VerdictCache, -}; +use self::{protocol::InstallRequest, verdict_cache::VerdictCache}; /// Per-server engine backing the pnpr install endpoints: it holds the /// store, cache, and HTTP client used to resolve a client's project and @@ -177,8 +180,16 @@ impl InstallAccelerator { } } -/// Handle `POST /v1/install`. -pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> Response { +/// Handle `POST /v1/install`. `identity` is the resolved caller; the +/// store's possession of a package's bytes is not a capability to read +/// them, so every served package is checked against `policies` — see +/// [`deny_unauthorized_packages`]. +pub(crate) async fn handle_install( + runtime: &InstallAccelerator, + policies: &PackagePolicies, + identity: Identity, + body: Bytes, +) -> Response { let request: InstallRequest = match serde_json::from_slice(&body) { Ok(request) => request, Err(err) => return json_error(StatusCode::BAD_REQUEST, &err.to_string()), @@ -202,7 +213,7 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> { return match failure { VerifyFailure::Internal(response) => response, - VerifyFailure::Violations(violations) => violation_response(&violations, &request), + VerifyFailure::Violations(violations) => violation_response(&violations), }; } @@ -248,38 +259,12 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> } }; + if let Some(denied) = deny_unauthorized_packages(policies, &identity, &result.package_index) { + return denied; + } + let stats_json = stats_json(&result.stats); - - if request.inline_files { - return inline_response(runtime, &lockfile, &stats_json, &result); - } - - let mut ndjson: Vec = Vec::new(); - for file in &result.missing_files { - let _ = - writeln!(ndjson, "D\t{}\t{}\t{}", file.digest, file.size, u8::from(file.executable)); - } - for entry in &result.package_index { - let _ = writeln!( - ndjson, - "I\t{}\t{}\t{}", - entry.integrity, - entry.pkg_id, - BASE64.encode(&entry.raw), - ); - } - - let payload = serde_json::json!({ - "lockfile": serde_json::to_value(&lockfile).unwrap_or(serde_json::Value::Null), - "stats": stats_json, - }); - let _ = writeln!(ndjson, "L\t{}", payload); - - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/x-ndjson") - .body(Body::from(ndjson)) - .expect("static ndjson response is always valid") + inline_response(runtime, &lockfile, &stats_json, &result) } fn stats_json(stats: &diff::Stats) -> serde_json::Value { @@ -294,23 +279,70 @@ fn stats_json(stats: &diff::Stats) -> serde_json::Value { }) } -/// gzip level for the file-bearing responses (`/v1/files` and the -/// `inlineFiles` install body). Level 6 (the gzip default) shrinks the -/// payload ~16% over level 1 — the win that matters once the server is -/// across a latency link, where fewer bytes means fewer TCP slow-start -/// round trips — while level 9 adds under a percent for several times -/// the CPU. +/// Deny the install when the caller may not read a package whose files +/// are about to be served. A content-addressed digest is shared across +/// packages and reveals nothing about access, so possession of a +/// package's bytes in the store is never a capability to receive them: +/// every served package is checked against pnpr's own `packages:` policy +/// — the same decision `serve_packument` / `serve_tarball` make, in +/// process, with no network round trip. Returns the denial response (401 +/// for an anonymous caller who could authenticate, 403 for an +/// authenticated caller outside the allowed set) or `None` when every +/// served package is readable. +/// +/// This authorizes against pnpr's own surface, which is the authority for +/// everything the store can hold today: pnpr fetches anonymously (no +/// credential forwarding yet), so cached content is either pnpr-hosted or +/// publicly fetchable. When credential forwarding lands, packages the +/// client resolved from *external* registries under its own token become +/// reachable, and those carry no pnpr policy — their access must then be +/// re-verified per caller against the owning registry (with a TTL'd +/// verdict cache), which this local check does not cover. +fn deny_unauthorized_packages( + policies: &PackagePolicies, + identity: &Identity, + served: &[diff::PackageIndexEntry], +) -> Option { + let mut checked: HashSet<&str> = HashSet::new(); + for entry in served { + let Some(name) = package_name(&entry.pkg_id) else { continue }; + // Access is decided per package name, so evaluate each name once. + if !checked.insert(name) { + continue; + } + if !policies.for_package(name).access.allows(identity) { + let status = match identity { + Identity::Anonymous => StatusCode::UNAUTHORIZED, + Identity::User { .. } => StatusCode::FORBIDDEN, + }; + return Some(json_error(status, &format!("not authorized to access {name:?}"))); + } + } + None +} + +/// The package name from a `name@version` package id, tolerating a +/// leading scope `@` (`@scope/foo@1.0.0` → `@scope/foo`). +fn package_name(pkg_id: &str) -> Option<&str> { + let at = pkg_id.rfind('@')?; + (at > 0).then_some(&pkg_id[..at]) +} + +/// gzip level for the install response body. Level 6 (the gzip default) +/// shrinks the payload ~16% over level 1 — the win that matters once the +/// server is across a latency link, where fewer bytes means fewer TCP +/// slow-start round trips — while level 9 adds under a percent for several +/// times the CPU. const FILES_GZIP_LEVEL: u32 = 6; -/// Content type of the combined `inlineFiles` install response: a -/// length-prefixed JSON header followed by the [`build_files_payload`] -/// binary frames, gzip-compressed. +/// Content type of the install response: a length-prefixed JSON header +/// followed by the [`build_files_payload`] binary frames, gzip-compressed. const INLINE_CONTENT_TYPE: &str = "application/x-pnpr-install-inline"; -/// Build the combined single-response body for an `inlineFiles` request: -/// the lockfile, stats, and store-index entries in a length-prefixed JSON -/// header, followed by the missing files' contents as `/v1/files` binary -/// frames — so the client materializes everything from one round trip. +/// Build the single-response body: the lockfile, stats, and store-index +/// entries in a length-prefixed JSON header, followed by the contents of +/// the files the client is missing as binary frames — so the client +/// materializes everything from one round trip. fn inline_response( runtime: &InstallAccelerator, lockfile: &Lockfile, @@ -439,27 +471,16 @@ async fn verify_input_lockfile( Err(VerifyFailure::Violations(rendered)) } -/// Render input-lockfile policy violations in the protocol the client -/// asked for: the inline header (`{ "violations": [...] }`, no files) for -/// an `inlineFiles` request, otherwise a 200 NDJSON `E` line. Either way -/// the client rebuilds the identical `VerifyError` and aborts the same -/// way the local gate would. -fn violation_response(violations: &[serde_json::Value], request: &InstallRequest) -> Response { - if request.inline_files { - let header = serde_json::json!({ "violations": violations }); - // No files follow a verification failure: just the end-of-stream - // marker so the client's frame parser terminates cleanly. - let files_payload = empty_files_payload(); - return finish_inline_response(&header, &files_payload); - } - - let payload = serde_json::json!({ "violations": violations }); - let body = format!("E\t{payload}\n"); - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/x-ndjson") - .body(Body::from(body)) - .expect("static ndjson violation response is always valid") +/// Render input-lockfile policy violations into the inline response +/// header (`{ "violations": [...] }`, no files following) so the client +/// rebuilds the identical `VerifyError` and aborts the same way the local +/// gate would. +fn violation_response(violations: &[serde_json::Value]) -> Response { + let header = serde_json::json!({ "violations": violations }); + // No files follow a verification failure: just the end-of-stream + // marker so the client's frame parser terminates cleanly. + let files_payload = empty_files_payload(); + finish_inline_response(&header, &files_payload) } /// Merge every active verifier's policy snapshot into one bag, the key @@ -479,52 +500,11 @@ fn merge_policies( merged } -/// Handle `POST /v1/files`. -pub(crate) async fn handle_files(runtime: &InstallAccelerator, body: Bytes) -> Response { - let request: FilesRequest = match serde_json::from_slice(&body) { - Ok(request) => request, - Err(err) => return json_error(StatusCode::BAD_REQUEST, &err.to_string()), - }; - - for (index, file) in request.digests.iter().enumerate() { - if !is_valid_sha512_hex(&file.digest) { - return json_error( - StatusCode::BAD_REQUEST, - &format!("digests[{index}].digest must be a valid sha512 hex string"), - ); - } - } - - // Build the binary payload up front so a missing file surfaces as a - // clean 500 before any bytes are committed to the response. - let files = request.digests.iter().map(|file| (file.digest.as_str(), file.executable)); - let payload = match build_files_payload(&runtime.store_dir, files) { - Ok(payload) => payload, - Err((status, message)) => return json_error(status, &message), - }; - - let mut encoder = GzEncoder::new(Vec::new(), Compression::new(FILES_GZIP_LEVEL)); - if encoder.write_all(&payload).is_err() { - return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"); - } - let gzipped = match encoder.finish() { - Ok(gzipped) => gzipped, - Err(_) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"), - }; - - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/x-pnpm-install") - .header(header::CONTENT_ENCODING, "gzip") - .body(Body::from(gzipped)) - .expect("binary response is always valid") -} - -/// The binary frames `/v1/files` serves and the `inlineFiles` install -/// response embeds: a 2-byte `{}` JSON header (length-prefixed) followed -/// by one `[64-byte digest][u32 size][1-byte exec][content]` frame per -/// file, terminated by 64 zero bytes. Reads each file's content from the -/// store by digest; an `Err` is a ready-made error response. +/// The binary file frames the install response embeds: a 2-byte `{}` JSON +/// header (length-prefixed) followed by one +/// `[64-byte digest][u32 size][1-byte exec][content]` frame per file, +/// terminated by 64 zero bytes. Reads each file's content from the store +/// by digest; an `Err` is a ready-made error response. fn build_files_payload<'a>( store_dir: &StoreDir, files: impl Iterator, diff --git a/pnpr/crates/pnpr/src/install_accelerator/diff.rs b/pnpr/crates/pnpr/src/install_accelerator/diff.rs index 380376148d..d45fc3b8ac 100644 --- a/pnpr/crates/pnpr/src/install_accelerator/diff.rs +++ b/pnpr/crates/pnpr/src/install_accelerator/diff.rs @@ -27,7 +27,6 @@ pub struct ResolvedPackage { pub struct MissingFile { /// Lowercase sha512 hex digest (no `sha512-` prefix). pub digest: String, - pub size: u64, pub executable: bool, } @@ -141,11 +140,7 @@ pub fn compute_diff( if client_digests.insert(key) { stats.files_to_download += 1; stats.download_bytes += file.size; - missing_files.push(MissingFile { - digest: file.digest.clone(), - size: file.size, - executable, - }); + missing_files.push(MissingFile { digest: file.digest.clone(), executable }); } else { stats.files_already_in_cafs += 1; } diff --git a/pnpr/crates/pnpr/src/install_accelerator/protocol.rs b/pnpr/crates/pnpr/src/install_accelerator/protocol.rs index 7ef0aba360..5c04ea471a 100644 --- a/pnpr/crates/pnpr/src/install_accelerator/protocol.rs +++ b/pnpr/crates/pnpr/src/install_accelerator/protocol.rs @@ -115,17 +115,6 @@ pub struct InstallRequest { /// check. #[serde(default)] pub trust_policy_ignore_after: Option, - /// When `true`, the client wants the file contents it's missing - /// streamed inline in this response rather than fetched in a second - /// `POST /v1/files` round trip. The server answers with a single - /// gzipped binary body (a length-prefixed header carrying the - /// lockfile, stats, and store-index entries, followed by the - /// `/v1/files` binary frames) instead of the NDJSON stream. Cuts the - /// cold-path round trips from three (handshake + install + files) to - /// one. See - /// [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165). - #[serde(default)] - pub inline_files: bool, } /// One project's importer dir and its dependency maps, normalized @@ -162,24 +151,3 @@ impl InstallRequest { }] } } - -#[derive(Debug, Deserialize)] -pub struct FilesRequest { - pub digests: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct FileDigest { - pub digest: String, - #[serde(default)] - pub executable: bool, -} - -/// A valid sha512 digest is 128 lowercase hex chars. The all-zero -/// digest is rejected because it collides with the 64-byte end-of- -/// stream marker in the `/v1/files` binary framing. -pub fn is_valid_sha512_hex(digest: &str) -> bool { - digest.len() == 128 - && digest.bytes().all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte)) - && digest.bytes().any(|byte| byte != b'0') -} diff --git a/pnpr/crates/pnpr/src/install_accelerator/tests.rs b/pnpr/crates/pnpr/src/install_accelerator/tests.rs new file mode 100644 index 0000000000..08cca8bafc --- /dev/null +++ b/pnpr/crates/pnpr/src/install_accelerator/tests.rs @@ -0,0 +1,73 @@ +//! Tests for the per-caller access gate the install accelerator applies +//! before serving a package's files: a digest in the store is not a +//! bearer capability, so [`deny_unauthorized_packages`] checks every +//! served package against pnpr's own `packages:` policy. + +use axum::http::StatusCode; + +use super::{deny_unauthorized_packages, diff::PackageIndexEntry}; +use crate::policy::{AccessList, Identity, PackagePolicies, PackagePolicy}; + +fn served(name: &str) -> Vec { + vec![PackageIndexEntry { + integrity: "sha512-deadbeef".to_string(), + pkg_id: format!("{name}@1.0.0"), + raw: Vec::new(), + }] +} + +fn anonymous() -> Identity { + Identity::Anonymous +} + +fn user() -> Identity { + Identity::User { username: "alice".to_string() } +} + +/// `registry_mock_defaults` gates `@private/*` to `$authenticated`. +fn policies() -> PackagePolicies { + PackagePolicies::registry_mock_defaults() +} + +/// `@team/*` is restricted to the single user `alice`, so an authenticated +/// caller who isn't `alice` is forbidden rather than merely unauthenticated. +fn team_owned_by_alice() -> PackagePolicies { + let team = + PackagePolicy::new("@team/*", AccessList::parse("alice"), AccessList::parse("alice")) + .expect("pattern compiles"); + let rest = + PackagePolicy::new("**", AccessList::parse("$all"), AccessList::parse("$authenticated")) + .expect("pattern compiles"); + PackagePolicies::new(vec![team, rest]) +} + +#[test] +fn anonymous_caller_is_denied_a_private_package() { + let denied = deny_unauthorized_packages(&policies(), &anonymous(), &served("@private/foo")); + assert_eq!(denied.map(|response| response.status()), Some(StatusCode::UNAUTHORIZED)); +} + +#[test] +fn authenticated_caller_is_allowed_a_private_package() { + let denied = deny_unauthorized_packages(&policies(), &user(), &served("@private/foo")); + assert!(denied.is_none()); +} + +#[test] +fn anonymous_caller_is_allowed_a_public_package() { + let denied = deny_unauthorized_packages(&policies(), &anonymous(), &served("is-positive")); + assert!(denied.is_none()); +} + +#[test] +fn authenticated_caller_outside_the_allowed_set_is_forbidden() { + let bob = Identity::User { username: "bob".to_string() }; + let denied = deny_unauthorized_packages(&team_owned_by_alice(), &bob, &served("@team/foo")); + assert_eq!(denied.map(|response| response.status()), Some(StatusCode::FORBIDDEN)); +} + +#[test] +fn authenticated_caller_in_the_allowed_set_is_allowed() { + let denied = deny_unauthorized_packages(&team_owned_by_alice(), &user(), &served("@team/foo")); + assert!(denied.is_none()); +} diff --git a/pnpr/crates/pnpr/src/server.rs b/pnpr/crates/pnpr/src/server.rs index 42bb008fcf..be7c70a530 100644 --- a/pnpr/crates/pnpr/src/server.rs +++ b/pnpr/crates/pnpr/src/server.rs @@ -64,9 +64,8 @@ struct AppInner { upstreams: IndexMap, config: Config, auth: AuthState, - /// Lazily-built engine backing the `/v1/install` and `/v1/files` - /// endpoints. Built on first such request so servers that never - /// receive one pay nothing. + /// Lazily-built engine backing the `/v1/install` endpoint. Built on + /// first such request so servers that never receive one pay nothing. install_accelerator: std::sync::OnceLock, } @@ -111,7 +110,6 @@ pub fn router_with_auth(config: Config, auth: AuthState) -> Router { // is the capability handshake (404 on a plain registry). .route("/-/pnpr", get(serve_pnpr_handshake)) .route("/v1/install", post(serve_install)) - .route("/v1/files", post(serve_files)) .route("/{name}", get(get_packument_unscoped).put(put_one_segment)) .route("/{first}/{second}", get(get_two_segments).put(put_two_segments)) .route( @@ -133,17 +131,15 @@ pub fn router_with_auth(config: Config, auth: AuthState) -> Router { // front, so the application is the only layer that can compress. // Scoped to JSON: the binary endpoints are excluded so we never // re-gzip an already-compressed payload — tarballs - // (`application/octet-stream`, already `.tgz`), the install - // accelerator's file stream (`application/x-pnpm-install`, already - // gzipped), and its NDJSON resolve response - // (`application/x-ndjson`). Already-`Content-Encoding` responses - // are skipped by the layer regardless. + // (`application/octet-stream`, already `.tgz`) and the install + // accelerator response (`application/x-pnpr-install-inline`, + // already gzipped). Already-`Content-Encoding` responses are + // skipped by the layer regardless. .layer( CompressionLayer::new().compress_when( DefaultPredicate::new() .and(NotForContentType::const_new("application/octet-stream")) - .and(NotForContentType::const_new("application/x-pnpm-install")) - .and(NotForContentType::const_new("application/x-ndjson")), + .and(NotForContentType::const_new("application/x-pnpr-install-inline")), ), ) // One structured access record per HTTP request: a span @@ -1538,18 +1534,28 @@ async fn serve_pnpr_handshake() -> Response { (StatusCode::OK, axum::Json(serde_json::json!({ "pnpr": { "versions": [1] } }))).into_response() } -async fn serve_install(State(state): State, body: axum::body::Bytes) -> Response { +async fn serve_install( + State(state): State, + headers: HeaderMap, + body: axum::body::Bytes, +) -> Response { let runtime = crate::install_accelerator::InstallAccelerator::get_or_init( &state.inner.install_accelerator, &state.inner.config, ); - crate::install_accelerator::handle_install(runtime, body).await -} - -async fn serve_files(State(state): State, body: axum::body::Bytes) -> Response { - let runtime = crate::install_accelerator::InstallAccelerator::get_or_init( - &state.inner.install_accelerator, - &state.inner.config, - ); - crate::install_accelerator::handle_files(runtime, body).await + let identity = match identify( + headers.get(header::AUTHORIZATION).and_then(|value| value.to_str().ok()), + &state.inner.auth.users, + &state.inner.auth.tokens, + ) { + Some(username) => Identity::User { username }, + None => Identity::Anonymous, + }; + crate::install_accelerator::handle_install( + runtime, + &state.inner.config.policies, + identity, + body, + ) + .await } diff --git a/pnpr/crates/pnpr/tests/install_accelerator.rs b/pnpr/crates/pnpr/tests/install_accelerator.rs deleted file mode 100644 index 4d508960c4..0000000000 --- a/pnpr/crates/pnpr/tests/install_accelerator.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! Integration tests for the pnpr install-accelerator endpoints. -//! -//! `/v1/install` resolves against an upstream registry and is covered -//! by the broader install suite; these tests exercise the network-free -//! `/v1/files` binary framing end to end through the axum router. - -use std::{ - io::Read as _, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, -}; - -use axum::{ - body::{Body, to_bytes}, - http::{Request, StatusCode}, -}; -use flate2::read::GzDecoder; -use pacquet_store_dir::StoreDir; -use pnpr::{Config, router}; -use serde_json::json; -use tempfile::TempDir; -use tower::ServiceExt; - -fn config_for(storage: std::path::PathBuf) -> Config { - let listen = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 4873)); - Config::proxy(listen, storage) -} - -/// Decode the `/v1/files` payload into `(digest_bytes, mode, content)` -/// frames, validating the JSON header and the end-of-stream marker. -fn parse_files_payload(payload: &[u8]) -> Vec<([u8; 64], u8, Vec)> { - let json_len = u32::from_be_bytes(payload[0..4].try_into().unwrap()) as usize; - assert_eq!(&payload[4..4 + json_len], b"{}"); - - let mut offset = 4 + json_len; - let mut frames = Vec::new(); - loop { - let mut digest = [0u8; 64]; - digest.copy_from_slice(&payload[offset..offset + 64]); - if digest == [0u8; 64] { - break; // end-of-stream marker - } - let size = - u32::from_be_bytes(payload[offset + 64..offset + 68].try_into().unwrap()) as usize; - let mode = payload[offset + 68]; - let content_start = offset + 69; - let content = payload[content_start..content_start + size].to_vec(); - frames.push((digest, mode, content)); - offset = content_start + size; - } - frames -} - -#[tokio::test] -async fn files_endpoint_serves_a_cafs_file_by_digest() { - let tmp = TempDir::new().unwrap(); - let config = config_for(tmp.path().to_path_buf()); - - // Seed a file into the same content-addressable store the pnpr - // runtime reads from (`/pnpr-store`). - let store = StoreDir::new(tmp.path().join("pnpr-store")); - let content = b"console.log('hello from pnpr')\n"; - let (_path, hash) = store.write_cas_file(content, false).expect("write cas file"); - let digest = format!("{hash:x}"); - - let app = router(config); - let request_body = json!({ "digests": [{ "digest": digest, "executable": false }] }); - let response = app - .oneshot( - Request::post("/v1/files") - .header("content-type", "application/json") - .body(Body::from(request_body.to_string())) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response.headers().get("content-encoding").map(|value| value.to_str().unwrap()), - Some("gzip"), - ); - - let gzipped = to_bytes(response.into_body(), usize::MAX).await.unwrap(); - let mut decoder = GzDecoder::new(&gzipped[..]); - let mut payload = Vec::new(); - decoder.read_to_end(&mut payload).unwrap(); - - let frames = parse_files_payload(&payload); - assert_eq!(frames.len(), 1); - let (digest_bytes, mode, served) = &frames[0]; - assert_eq!(&served[..], content); - assert_eq!(*mode, 0); - assert_eq!(format!("{:x}", hash), hex(digest_bytes)); -} - -#[tokio::test] -async fn files_endpoint_rejects_an_invalid_digest() { - let tmp = TempDir::new().unwrap(); - let app = router(config_for(tmp.path().to_path_buf())); - - let request_body = json!({ "digests": [{ "digest": "not-a-sha512", "executable": false }] }); - let response = app - .oneshot( - Request::post("/v1/files") - .header("content-type", "application/json") - .body(Body::from(request_body.to_string())) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), StatusCode::BAD_REQUEST); -} - -fn hex(bytes: &[u8]) -> String { - bytes.iter().map(|byte| format!("{byte:02x}")).collect() -} diff --git a/worker/src/index.ts b/worker/src/index.ts index 0d50f982d4..bf5910fa91 100644 --- a/worker/src/index.ts +++ b/worker/src/index.ts @@ -14,11 +14,11 @@ import pLimit from 'p-limit' import type { AddDirToStoreMessage, - FetchAndWriteCafsMessage, HardLinkDirMessage, LinkPkgMessage, SymlinkAllModulesMessage, TarballExtractMessage, + WriteCafsFilesMessage, } from './types.js' let workerPool: WorkerPool | undefined @@ -201,10 +201,9 @@ export async function addFilesFromTarball (opts: AddFilesFromTarballOptions): Pr } -export async function fetchAndWriteCafsFiles (opts: { - registryUrl: string +export async function writeCafsFiles (opts: { storeDir: string - digests: FetchAndWriteCafsMessage['digests'] + payload: Uint8Array }): Promise { if (!workerPool) { workerPool = createTarballWorkerPool() @@ -214,17 +213,16 @@ export async function fetchAndWriteCafsFiles (opts: { localWorker.once('message', ({ status, error, filesWritten }) => { workerPool!.checkinWorker(localWorker) if (status === 'error') { - reject(new PnpmError('CAFS_FETCH_WRITE', error.message)) + reject(new PnpmError('CAFS_WRITE', error.message)) return } resolve(filesWritten) }) localWorker.postMessage({ - type: 'fetch-and-write-cafs', - registryUrl: opts.registryUrl, + type: 'write-cafs-files', storeDir: opts.storeDir, - digests: opts.digests, - } satisfies FetchAndWriteCafsMessage) + payload: opts.payload, + } satisfies WriteCafsFilesMessage) }) } diff --git a/worker/src/start.ts b/worker/src/start.ts index e093904ca0..be1e58459f 100644 --- a/worker/src/start.ts +++ b/worker/src/start.ts @@ -28,13 +28,13 @@ import type { BundledManifest, DependencyManifest } from '@pnpm/types' import { equalOrSemverEqual } from './equalOrSemverEqual.js' import type { AddDirToStoreMessage, - FetchAndWriteCafsMessage, HardLinkDirMessage, InitStoreMessage, LinkPkgMessage, ReadPkgFromCafsMessage, SymlinkAllModulesMessage, TarballExtractMessage, + WriteCafsFilesMessage, } from './types.js' export function startWorker (): void { @@ -65,7 +65,7 @@ async function handleMessage ( | SymlinkAllModulesMessage | HardLinkDirMessage | InitStoreMessage - | FetchAndWriteCafsMessage + | WriteCafsFilesMessage | false ): Promise { if (message === false) { @@ -169,8 +169,8 @@ async function handleMessage ( parentPort!.postMessage({ status: 'success' }) break } - case 'fetch-and-write-cafs': { - parentPort!.postMessage(await fetchAndWriteCafs(message)) + case 'write-cafs-files': { + parentPort!.postMessage(await writeCafsFiles(message)) break } } @@ -501,190 +501,73 @@ function symlinkAllModules (opts: SymlinkAllModulesMessage): { status: 'success' return { status: 'success' } } -async function fetchAndWriteCafs (message: FetchAndWriteCafsMessage): Promise<{ status: string, filesWritten: number }> { - const http = await import('node:http') - const https = await import('node:https') - const { URL } = await import('node:url') - const { createGunzip } = await import('node:zlib') +async function writeCafsFiles (message: WriteCafsFilesMessage): Promise<{ status: string, filesWritten: number }> { const { contentPathFromHex } = await import('@pnpm/store.cafs') - // Preserve any path prefix on the pnpr server URL (e.g. https://host/pnpr/) - // by normalizing the base and using a relative URL. - const base = message.registryUrl.endsWith('/') ? message.registryUrl : `${message.registryUrl}/` - const url = new URL('v1/files', base) - const requestFn = url.protocol === 'https:' ? https.request : http.request - const body = JSON.stringify({ digests: message.digests }) + // `message.payload` is the already-decompressed file portion of a + // `/v1/install` response: a length-prefixed JSON header, then one + // `[64-byte digest][u32 size][1-byte exec][content]` frame per file, + // terminated by 64 zero bytes. + const payload = Buffer.from(message.payload.buffer, message.payload.byteOffset, message.payload.byteLength) + const END_MARKER = Buffer.alloc(64, 0) const createdDirs = new Set() - // Build a set of digests we actually requested, so we can reject a - // misbehaving pnpr server that streams unrelated entries and tries to write - // unbounded files into our CAFS. The set is keyed by `${digest}:${exec}` - // because the same digest may appear with different modes. - const requestedDigests = new Set() - for (const d of message.digests) { - requestedDigests.add(`${d.digest}:${d.executable ? 'x' : ''}`) + if (payload.length < 4) { + throw new Error('pnpr server /v1/install file payload is truncated') + } + // Skip the length-prefixed JSON header that precedes the frames. + const jsonLen = payload.readUInt32BE(0) + let offset = 4 + jsonLen + let filesWritten = 0 + let endMarkerSeen = false + + while (offset + 64 <= payload.length) { + if (payload.subarray(offset, offset + 64).equals(END_MARKER)) { + endMarkerSeen = true + offset += 64 + break + } + if (offset + 69 > payload.length) break // 64 digest + 4 size + 1 mode + const size = payload.readUInt32BE(offset + 64) + const entryLen = 69 + size + if (offset + entryLen > payload.length) break // incomplete entry + + const digest = payload.subarray(offset, offset + 64).toString('hex') + const executable = (payload[offset + 68] & 0x01) !== 0 + const content = payload.subarray(offset + 69, offset + entryLen) + + const relPath = contentPathFromHex(executable ? 'exec' : 'nonexec', digest) + const fullPath = path.join(message.storeDir, relPath) + const dir = path.dirname(fullPath) + if (!createdDirs.has(dir)) { + fs.mkdirSync(dir, { recursive: true }) + createdDirs.add(dir) + } + try { + fs.writeFileSync(fullPath, content, { flag: 'wx', mode: executable ? 0o755 : 0o644 }) + } catch (err: unknown) { + if (!(err instanceof Error && 'code' in err && (err as NodeJS.ErrnoException).code === 'EEXIST')) { + throw err + } + // EEXIST means the same digest is already at this CAFS path. CAFS is + // content-addressed, so a complete file is by definition correct. But a + // previous process could have crashed mid-write and left a truncated + // file — the pnpr path skips integrity verification, so we'd silently + // install garbage. Detect truncation by size and overwrite atomically. + const onDiskSize = fs.statSync(fullPath).size + if (onDiskSize !== content.length) { + const tmpPath = `${fullPath}.tmp-${process.pid}-${Date.now()}` + fs.writeFileSync(tmpPath, content, { mode: executable ? 0o755 : 0o644 }) + fs.renameSync(tmpPath, fullPath) + } + } + filesWritten++ + offset += entryLen } - // Stream: HTTP response → gunzip → parse entries → write to CAFS. - // No buffering — files are written as data arrives. - return new Promise<{ status: string, filesWritten: number }>((resolve, reject) => { - let filesWritten = 0 - let buf = Buffer.alloc(0) - let headerSkipped = false - let endMarkerSeen = false - const END_MARKER = Buffer.alloc(64, 0) - - const processBuffer = () => { - // Skip JSON header on first chunk - if (!headerSkipped && buf.length >= 4) { - const jsonLen = buf.readUInt32BE(0) - if (buf.length >= 4 + jsonLen) { - buf = buf.subarray(4 + jsonLen) - headerSkipped = true - } else { - return - } - } - - // Parse complete file entries from the buffer - while (headerSkipped && !endMarkerSeen) { - if (buf.length < 64) break - if (buf.subarray(0, 64).equals(END_MARKER)) { - buf = buf.subarray(64) - endMarkerSeen = true - break - } - if (buf.length < 69) break // 64 digest + 4 size + 1 mode - - const size = buf.readUInt32BE(64) - const entryLen = 69 + size - if (buf.length < entryLen) break // incomplete entry - - const digest = buf.subarray(0, 64).toString('hex') - const executable = (buf[68] & 0x01) !== 0 - - const requestKey = `${digest}:${executable ? 'x' : ''}` - if (!requestedDigests.has(requestKey)) { - throw new Error(`pnpr server /v1/files returned an entry that was not requested: digest=${digest} executable=${String(executable)}`) - } - // Consume the request so duplicates past the requested count also fail. - requestedDigests.delete(requestKey) - - const content = buf.subarray(69, entryLen) - - const relPath = contentPathFromHex(executable ? 'exec' : 'nonexec', digest) - const fullPath = path.join(message.storeDir, relPath) - const dir = path.dirname(fullPath) - if (!createdDirs.has(dir)) { - fs.mkdirSync(dir, { recursive: true }) - createdDirs.add(dir) - } - try { - fs.writeFileSync(fullPath, content, { flag: 'wx', mode: executable ? 0o755 : 0o644 }) - } catch (err: unknown) { - if (!(err instanceof Error && 'code' in err && (err as NodeJS.ErrnoException).code === 'EEXIST')) { - throw err - } - // EEXIST means the same digest is already at this CAFS path. CAFS - // is content-addressed, so a complete file is by definition correct. - // But a previous process could have crashed mid-write and left a - // truncated file — the pnpr server path skips integrity verification, so - // we'd silently install garbage. Detect truncation by size and - // overwrite atomically if the on-disk file is the wrong length. - const onDiskSize = fs.statSync(fullPath).size - if (onDiskSize !== content.length) { - const tmpPath = `${fullPath}.tmp-${process.pid}-${Date.now()}` - fs.writeFileSync(tmpPath, content, { mode: executable ? 0o755 : 0o644 }) - fs.renameSync(tmpPath, fullPath) - } - } - filesWritten++ - buf = buf.subarray(entryLen) - } - } - - // processBuffer is called from stream event handlers where a thrown - // exception would become `uncaughtException` and crash the worker. - // Surface errors via the Promise rejection instead. - const safeProcessBuffer = (): boolean => { - try { - processBuffer() - return true - } catch (err) { - reject(err) - req.destroy() - return false - } - } - - // Match the NDJSON client timeout — a stalled connection would otherwise - // hang the install indefinitely waiting on `fileDownloads`. - const FILES_REQUEST_TIMEOUT_MS = 600_000 - - const req = requestFn(url, { - method: 'POST', - timeout: FILES_REQUEST_TIMEOUT_MS, - headers: { - 'Content-Type': 'application/json', - 'Content-Length': Buffer.byteLength(body), - 'Accept-Encoding': 'gzip', - }, - }, (res: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any - // Non-2xx responses are JSON error bodies from the pnpr server; read - // and reject so we never try to gunzip an error payload as a file stream. - if (typeof res.statusCode === 'number' && (res.statusCode < 200 || res.statusCode >= 300)) { - const chunks: Buffer[] = [] - res.on('data', (chunk: Buffer) => chunks.push(chunk)) - res.on('end', () => { - reject(new Error(`pnpr server /v1/files responded with ${res.statusCode}: ${Buffer.concat(chunks).toString('utf-8')}`)) - }) - res.on('error', reject) - return - } - - let stream: NodeJS.ReadableStream = res - if (res.headers['content-encoding'] === 'gzip') { - const gunzip = createGunzip() - res.pipe(gunzip) - stream = gunzip - } - - stream.on('data', (chunk: Buffer) => { - buf = Buffer.concat([buf, chunk]) - safeProcessBuffer() - }) - stream.on('end', () => { - if (!safeProcessBuffer()) return - // Guard against a truncated response: the server must terminate the - // stream with the 64-byte end marker. If it didn't, or if there's a - // partial entry still in `buf`, fail — otherwise we'd silently leave - // the CAFS missing files. - if (!endMarkerSeen) { - reject(new Error('pnpr server /v1/files stream ended without the end marker')) - return - } - if (buf.length > 0) { - reject(new Error(`pnpr server /v1/files stream left ${buf.length} unparsed bytes after end marker`)) - return - } - // Every received entry was drained from `requestedDigests` as it was - // parsed; anything still in the set means the server ended cleanly - // but omitted files, which would silently leave the CAFS incomplete. - if (requestedDigests.size > 0) { - const sample = [...requestedDigests].slice(0, 3).join(', ') - reject(new Error(`pnpr server /v1/files omitted ${requestedDigests.size} requested entries (e.g. ${sample})`)) - return - } - resolve({ status: 'success', filesWritten }) - }) - stream.on('error', reject) - }) - req.on('timeout', () => { - req.destroy(new Error(`pnpr server /v1/files request timed out after ${FILES_REQUEST_TIMEOUT_MS / 1000}s`)) - }) - req.on('error', reject) - req.write(body) - req.end() - }) + if (!endMarkerSeen) { + throw new Error('pnpr server /v1/install file payload ended without the end marker') + } + return { status: 'success', filesWritten } } diff --git a/worker/src/types.ts b/worker/src/types.ts index 226e14a5f3..a3a8fd9f51 100644 --- a/worker/src/types.ts +++ b/worker/src/types.ts @@ -80,9 +80,14 @@ export interface HardLinkDirMessage { destDirs: string[] } -export interface FetchAndWriteCafsMessage { - type: 'fetch-and-write-cafs' - registryUrl: string +export interface WriteCafsFilesMessage { + type: 'write-cafs-files' storeDir: string - digests: Array<{ digest: string, size: number, executable: boolean }> + /** + * The binary file frames from a `/v1/install` response, already + * decompressed: a length-prefixed JSON header followed by one + * `[64-byte digest][u32 size][1-byte exec][content]` frame per file, + * terminated by 64 zero bytes. + */ + payload: Uint8Array }