From f06ab5e201e3beca2100fb34202afee63ffbe497 Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Wed, 3 Jun 2026 23:36:28 +0200 Subject: [PATCH] perf(pnpr): collapse the install-accelerator cold path to one round trip (#12178) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduce the cost of a pnpr install-accelerator install against a remote server, where latency — not bandwidth — dominates. The cold path was three sequential round trips: a GET /-/pnpr handshake, a POST /v1/install that returned the lockfile + missing-file digests as NDJSON, and a POST /v1/files that fetched the file contents. At 100ms RTT most of an install is spent waiting on trips, not transferring data. - A new `inlineFiles` request flag makes POST /v1/install return a single gzipped body: a length-prefixed JSON header (lockfile, stats, store-index entries, or verification violations) followed by the missing files' contents as the same binary frames /v1/files serves. The pacquet client drops the handshake, sends the flag, and writes the inlined files straight to its CAFS — no follow-up fetch. The legacy NDJSON + /v1/files path is unchanged for clients that don't set it. - File-bearing responses now compress at gzip level 6 (was level 1), ~16% smaller payload at negligible CPU. Measured at 50ms one-way latency (warm server, 5-run average): a single-round-trip install drops from ~381ms to ~135ms (≈2.8x). Two further experiments from the issue were evaluated and not adopted: server-side fetch/transfer streaming (the public npm registry is CDN-backed and latency-bound, not bandwidth-throttled, so there is nothing to overlap — it measured slower under an artificial bandwidth cap) and bloom-filter integrity uploads (real but niche, and false positives would require a correctness-critical re-fetch fallback). Closes #12165 --- pacquet/crates/pnpr-client/src/lib.rs | 252 ++++++---------- pacquet/crates/pnpr-client/src/tests.rs | 49 ++- pnpr/crates/pnpr/src/install_accelerator.rs | 280 ++++++++++++++---- .../pnpr/src/install_accelerator/protocol.rs | 11 + 4 files changed, 355 insertions(+), 237 deletions(-) diff --git a/pacquet/crates/pnpr-client/src/lib.rs b/pacquet/crates/pnpr-client/src/lib.rs index 1c6025461e..2930671c92 100644 --- a/pacquet/crates/pnpr-client/src/lib.rs +++ b/pacquet/crates/pnpr-client/src/lib.rs @@ -1,21 +1,22 @@ //! Client for pnpr's server-accelerated installs. //! -//! Port of the TypeScript `@pnpm/pnpr.client` (`fetchFromPnpmRegistry`) -//! plus the `fetch-and-write-cafs` worker. Given a set of dependencies -//! and the client's content-addressable store, it: +//! Given a set of dependencies and the client's content-addressable +//! store, it: //! //! 1. reads the integrities already in the local store index, -//! 2. `POST`s them with the dependencies to `/v1/install` and parses the -//! NDJSON response (`D` missing-file digests, `I` store-index entries, -//! `L` lockfile + stats, `E` error), -//! 3. downloads the missing files from `/v1/files` and writes them -//! straight into the local CAFS *by digest* — no re-hashing, -//! 4. writes the forwarded store-index entries, and +//! 2. `POST`s them with the dependencies to `/v1/install`, asking the +//! server to inline the file contents it's missing (`inlineFiles`), +//! 3. parses the single combined response — a length-prefixed JSON header +//! (lockfile, stats, store-index entries, or verification violations) +//! followed by the missing files' bytes, +//! 4. writes those bytes straight into the local CAFS *by digest* (no +//! re-hashing) and writes the forwarded store-index entries, and //! 5. returns the resolved lockfile for a headless install. //! -//! The response is buffered rather than streamed, and `/v1/files` is -//! requested in a single batch; both mirror the current pnpr server and -//! are tracked follow-ups. +//! The whole exchange is one round trip — no handshake, no follow-up +//! `/v1/files` fetch. See +//! [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165). The +//! response is buffered rather than truly streamed, a tracked follow-up. use std::{ collections::{BTreeMap, HashSet}, @@ -96,13 +97,13 @@ pub struct InstallOutcome { /// The resolved lockfile, ready for a headless install. pub lockfile: Lockfile, pub stats: Stats, - /// Number of file entries `/v1/files` served into the local CAFS. + /// Number of inlined file entries written into the local CAFS. pub files_written: usize, /// Number of store-index entries written to the local index. pub index_entries_written: usize, } -/// Resolution statistics reported on the `L` line. Field names mirror +/// Resolution statistics from the response header. Field names mirror /// the server's camelCase JSON. #[derive(Debug, Default, Deserialize)] #[serde(rename_all = "camelCase", default)] @@ -189,12 +190,16 @@ impl PnprClient { /// Resolve a single project against the server and materialize the /// missing files + store-index entries into the local store. + /// + /// One round trip: the request asks the server to inline the file + /// contents (`inlineFiles`), so the response carries the lockfile, + /// stats, store-index entries, and the missing files' bytes in a + /// single body — no handshake and no follow-up `/v1/files` fetch. + /// See [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165). pub async fn install( &self, opts: InstallOptions<'_>, ) -> Result { - self.handshake().await?; - let store_keys = read_store_keys(opts.store_dir); let store_integrities = integrities_from_keys(&store_keys); let present: HashSet<&str> = store_keys.iter().map(String::as_str).collect(); @@ -221,6 +226,7 @@ impl PnprClient { "trustPolicy": opts.trust_policy, "trustPolicyExclude": opts.trust_policy_exclude, "trustPolicyIgnoreAfter": opts.trust_policy_ignore_after, + "inlineFiles": true, }); let response = @@ -230,25 +236,14 @@ impl PnprClient { let body = response.text().await.unwrap_or_default(); return Err(PnprClientError::Server(format!("/v1/install returned {status}: {body}"))); } - let ndjson = response.text().await?; - let parsed = parse_install_response(&ndjson)?; - - // `--lockfile-only`: pnpm fetches nothing and links nothing. A - // resolve-only server sends no `D`/`I` lines, but stay correct - // even against one that doesn't understand the flag by never - // touching the local store. - if opts.lockfile_only { - return Ok(InstallOutcome { - lockfile: parsed.lockfile, - stats: parsed.stats, - files_written: 0, - index_entries_written: 0, - }); - } - - let files_written = self.download_files(opts.store_dir, &parsed.missing_files).await?; + let raw = response.bytes().await?; + let parsed = parse_inline_response(&decompress(&raw)?)?; + // The server inlines only the files the client is missing; a + // `--lockfile-only` resolve and a verification pass both carry an + // empty file payload, so this writes nothing in those cases. + let files_written = write_files_payload(opts.store_dir, &parsed.files_payload)?; let index_entries_written = write_index_entries(opts.store_dir, parsed.index_entries, &present).await; @@ -259,136 +254,79 @@ impl PnprClient { index_entries_written, }) } +} - async fn download_files( - &self, - store_dir: &StoreDir, - digests: &[MissingFile], - ) -> Result { - if digests.is_empty() { - return Ok(0); - } - - let request = serde_json::json!({ - "digests": digests - .iter() - .map(|file| serde_json::json!({ - "digest": file.digest, - "executable": file.executable, - })) - .collect::>(), - }); - - let response = - self.http.post(format!("{}v1/files", self.base_url)).json(&request).send().await?; - if !response.status().is_success() { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - return Err(PnprClientError::Server(format!("/v1/files returned {status}: {body}"))); - } - - let raw = response.bytes().await?; - // The server sets `Content-Encoding: gzip`. Decompress unless the - // HTTP stack already did (detected via the gzip magic bytes), so - // the client works whether or not reqwest's `gzip` feature is on. - // Borrow `raw` directly when it's already decompressed. - let decompressed: Vec; - let payload: &[u8] = if raw.starts_with(&[0x1f, 0x8b]) { - let mut decoder = GzDecoder::new(&raw[..]); - let mut out = Vec::new(); - decoder.read_to_end(&mut out)?; - decompressed = out; - &decompressed - } else { - &raw - }; - - // Guard against a server that streams entries we never asked for, - // which would otherwise write unbounded files into our CAFS. - let mut requested: HashSet<(String, bool)> = - digests.iter().map(|file| (file.digest.clone(), file.executable)).collect(); - - write_files_payload(store_dir, payload, &mut requested) +/// Decompress a `Content-Encoding: gzip` body unless the HTTP stack +/// already did (detected via the gzip magic bytes), so the client works +/// whether or not reqwest's `gzip` feature is on. Returns the bytes as-is +/// when they're already decompressed. +fn decompress(raw: &[u8]) -> Result, PnprClientError> { + if raw.starts_with(&[0x1f, 0x8b]) { + let mut decoder = GzDecoder::new(raw); + let mut out = Vec::new(); + decoder.read_to_end(&mut out)?; + Ok(out) + } else { + Ok(raw.to_vec()) } } struct ParsedInstall { lockfile: Lockfile, stats: Stats, - missing_files: Vec, + /// The `/v1/files`-shaped binary frames the server inlined after the + /// header — written into the CAFS by [`write_files_payload`]. + files_payload: Vec, index_entries: Vec<(String, Vec)>, } -struct MissingFile { - digest: String, - executable: bool, -} +/// Decode the combined `inlineFiles` install response: a 4-byte +/// big-endian header length, that many bytes of JSON header (lockfile, +/// stats, store-index entries, or verification violations), then the +/// file frames. +fn parse_inline_response(payload: &[u8]) -> Result { + if payload.len() < 4 { + return Err(PnprClientError::Protocol("install response too short".to_string())); + } + let header_len = u32::from_be_bytes([payload[0], payload[1], payload[2], payload[3]]) as usize; + let header_end = 4 + header_len; + if header_end > payload.len() { + return Err(PnprClientError::Protocol("install header truncated".to_string())); + } + let header: InlineHeader = serde_json::from_slice(&payload[4..header_end]) + .map_err(|err| PnprClientError::Protocol(err.to_string()))?; -fn parse_install_response(ndjson: &str) -> Result { - let mut missing_files = Vec::new(); - let mut index_entries = Vec::new(); - let mut final_line: Option<(Lockfile, Stats)> = None; - - for line in ndjson.lines() { - let Some((tag, rest)) = line.split_once('\t') else { continue }; - match tag { - "D" => { - // `digest \t size \t executable` - let mut parts = rest.split('\t'); - let digest = parts.next().unwrap_or_default().to_string(); - let _size = parts.next(); - let executable = parts.next() == Some("1"); - missing_files.push(MissingFile { digest, executable }); - } - "I" => { - // `integrity \t pkgId \t base64`; the index key is - // `integrity \t pkgId` (everything before the last tab). - let Some((key, encoded)) = rest.rsplit_once('\t') else { - return Err(PnprClientError::Protocol("malformed I line".to_string())); - }; - let raw = BASE64 - .decode(encoded) - .map_err(|err| PnprClientError::Protocol(err.to_string()))?; - index_entries.push((key.to_string(), raw)); - } - "L" => { - let payload: LPayload = serde_json::from_str(rest) - .map_err(|err| PnprClientError::Protocol(err.to_string()))?; - final_line = Some((payload.lockfile, payload.stats)); - } - "E" => { - if let Ok(payload) = serde_json::from_str::(rest) { - if let Some(violations) = payload.violations.filter(|list| !list.is_empty()) { - return Err(PnprClientError::Verification(build_verify_error(violations))); - } - if !payload.error.is_empty() { - return Err(PnprClientError::Server(payload.error)); - } - } - return Err(PnprClientError::Server(rest.to_string())); - } - _ => {} - } + if let Some(violations) = header.violations.filter(|list| !list.is_empty()) { + return Err(PnprClientError::Verification(build_verify_error(violations))); } - let (lockfile, stats) = final_line.ok_or_else(|| { - PnprClientError::Protocol("response had no lockfile (L line)".to_string()) - })?; + let lockfile = header + .lockfile + .ok_or_else(|| PnprClientError::Protocol("install response had no lockfile".to_string()))?; - Ok(ParsedInstall { lockfile, stats, missing_files, index_entries }) + let mut index_entries = Vec::with_capacity(header.index_entries.len()); + for entry in header.index_entries { + let raw = + BASE64.decode(&entry.b64).map_err(|err| PnprClientError::Protocol(err.to_string()))?; + index_entries.push((entry.key, raw)); + } + + Ok(ParsedInstall { + lockfile, + stats: header.stats, + files_payload: payload[header_end..].to_vec(), + index_entries, + }) } #[derive(Deserialize)] -struct LPayload { - lockfile: Lockfile, +#[serde(rename_all = "camelCase")] +struct InlineHeader { + lockfile: Option, #[serde(default)] stats: Stats, -} - -#[derive(Deserialize)] -struct EPayload { #[serde(default)] - error: String, + index_entries: Vec, /// Present when the server rejected the input lockfile under the /// client's verification policy. Each entry mirrors the local /// runner's rendered violation so the client can rebuild the @@ -397,6 +335,14 @@ struct EPayload { violations: Option>, } +#[derive(Deserialize)] +struct InlineIndexEntry { + /// The store-index key, `{integrity}\t{pkgId}`. + key: String, + /// The base64-encoded msgpackr-records buffer. + b64: String, +} + #[derive(Deserialize)] struct WireViolation { name: String, @@ -440,13 +386,13 @@ fn intern_violation_code(code: &str) -> &'static str { } } -/// Decode the `/v1/files` binary payload and write each entry to the -/// CAFS by digest. Returns the number of entries served. -fn write_files_payload( - store_dir: &StoreDir, - payload: &[u8], - requested: &mut HashSet<(String, bool)>, -) -> Result { +/// Decode the inlined binary file payload and write each entry to the +/// CAFS by digest. Returns the number of entries written. An empty +/// payload (no frames before the end-of-stream marker) writes nothing. +fn write_files_payload(store_dir: &StoreDir, payload: &[u8]) -> Result { + if payload.is_empty() { + return Ok(0); + } if payload.len() < 4 { return Err(PnprClientError::Protocol("files payload too short".to_string())); } @@ -480,12 +426,6 @@ fn write_files_payload( let content = &payload[content_start..content_end]; let digest = hex_encode(digest_bytes); - if !requested.remove(&(digest.clone(), executable)) { - return Err(PnprClientError::Server(format!( - "/v1/files returned an entry that was not requested: {digest}", - ))); - } - write_cas_file(store_dir, &digest, executable, content)?; written += 1; offset = content_end; diff --git a/pacquet/crates/pnpr-client/src/tests.rs b/pacquet/crates/pnpr-client/src/tests.rs index 5acef56354..156806644c 100644 --- a/pacquet/crates/pnpr-client/src/tests.rs +++ b/pacquet/crates/pnpr-client/src/tests.rs @@ -1,12 +1,28 @@ -use super::{PnprClientError, VerifyError, parse_install_response}; +use super::{PnprClientError, VerifyError, parse_inline_response}; -/// A structured `E` line (the server's verification rejection) is -/// rebuilt into the same `VerifyError` the local gate raises, so the CLI -/// aborts with an identical diagnostic code + breakdown. +/// Frame a JSON header into a complete inline install payload with an +/// empty file section (the `{}` prefix plus the end-of-stream marker), +/// matching what the server sends when there are no files to inline. +fn inline_payload(header_json: &str) -> Vec { + let header = header_json.as_bytes(); + let mut payload = Vec::new(); + payload.extend_from_slice(&(header.len() as u32).to_be_bytes()); + payload.extend_from_slice(header); + payload.extend_from_slice(&2u32.to_be_bytes()); + payload.extend_from_slice(b"{}"); + payload.extend_from_slice(&[0u8; 64]); + payload +} + +/// A header carrying verification violations is rebuilt into the same +/// `VerifyError` the local gate raises, so the CLI aborts with an +/// identical diagnostic code + breakdown. #[test] -fn e_line_with_violations_rebuilds_a_verify_error() { - let ndjson = "E\t{\"violations\":[{\"name\":\"@foo/no-deps\",\"version\":\"1.0.0\",\"code\":\"MINIMUM_RELEASE_AGE_VIOLATION\",\"reason\":\"was published yesterday\"}]}\n"; - let Err(PnprClientError::Verification(verify_err)) = parse_install_response(ndjson) else { +fn header_with_violations_rebuilds_a_verify_error() { + let payload = inline_payload( + r#"{"violations":[{"name":"@foo/no-deps","version":"1.0.0","code":"MINIMUM_RELEASE_AGE_VIOLATION","reason":"was published yesterday"}]}"#, + ); + let Err(PnprClientError::Verification(verify_err)) = parse_inline_response(&payload) else { panic!("expected a Verification error"); }; assert!( @@ -21,8 +37,10 @@ fn e_line_with_violations_rebuilds_a_verify_error() { /// variant. #[test] fn tarball_mismatch_maps_to_the_generic_envelope() { - let ndjson = "E\t{\"violations\":[{\"name\":\"acme\",\"version\":\"1.0.0\",\"code\":\"TARBALL_URL_MISMATCH\",\"reason\":\"url mismatch\"}]}\n"; - let Err(PnprClientError::Verification(verify_err)) = parse_install_response(ndjson) else { + let payload = inline_payload( + r#"{"violations":[{"name":"acme","version":"1.0.0","code":"TARBALL_URL_MISMATCH","reason":"url mismatch"}]}"#, + ); + let Err(PnprClientError::Verification(verify_err)) = parse_inline_response(&payload) else { panic!("expected a Verification error"); }; assert!( @@ -31,13 +49,12 @@ fn tarball_mismatch_maps_to_the_generic_envelope() { ); } -/// A plain `E` error line (no `violations`) stays a generic server error -/// rather than a verification error. +/// A header with no lockfile and no violations is a malformed response, +/// not a silent success. #[test] -fn e_line_with_plain_error_is_a_server_error() { - let ndjson = "E\t{\"error\":\"resolution failed\"}\n"; - let Err(PnprClientError::Server(message)) = parse_install_response(ndjson) else { - panic!("expected a Server error"); +fn header_without_a_lockfile_is_a_protocol_error() { + let payload = inline_payload("{}"); + let Err(PnprClientError::Protocol(_)) = parse_inline_response(&payload) else { + panic!("expected a Protocol error"); }; - assert_eq!(message, "resolution failed"); } diff --git a/pnpr/crates/pnpr/src/install_accelerator.rs b/pnpr/crates/pnpr/src/install_accelerator.rs index 3ef7bf659c..d32ef2e4bd 100644 --- a/pnpr/crates/pnpr/src/install_accelerator.rs +++ b/pnpr/crates/pnpr/src/install_accelerator.rs @@ -10,10 +10,14 @@ //! protocol versions so a client can negotiate or fail fast. //! * `POST /v1/install` — resolve a project **against the registries //! the client sends** (so the server uses the same source of truth as -//! the client), then stream an NDJSON response: `D` lines (file -//! digests the client is missing), `I` lines (pre-packed store-index -//! entries), a final `L` line with the lockfile and stats, or an `E` -//! line on a mid-stream error. +//! the client), then return the missing-file digests, pre-packed +//! store-index entries, lockfile, and stats. With `inlineFiles` the +//! client also gets the missing files' *contents* in the same response +//! (a length-prefixed JSON header followed by the `/v1/files` binary +//! frames, gzipped), collapsing the cold path to a single round trip +//! ([pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165)); +//! otherwise the response is the NDJSON stream of `D`/`I`/`L`/`E` +//! lines and the client fetches files separately. //! * `POST /v1/files` — serve a batch of files by digest as a gzip //! binary stream the client writes straight into its CAFS. //! @@ -194,9 +198,12 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> // pick-time gate (the policy is wired into `config`). if !request.trust_lockfile && let Some(input_lockfile) = request.lockfile.as_ref() - && let Err(response) = verify_input_lockfile(runtime, config, input_lockfile).await + && let Err(failure) = verify_input_lockfile(runtime, config, input_lockfile).await { - return response; + return match failure { + VerifyFailure::Internal(response) => response, + VerifyFailure::Violations(violations) => violation_response(&violations, &request), + }; } let lockfile = match resolve::resolve(config, &runtime.client, &request).await { @@ -241,6 +248,12 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> } }; + let stats_json = stats_json(&result.stats); + + if request.inline_files { + return inline_response(runtime, &lockfile, &stats_json, &result); + } + let mut ndjson: Vec = Vec::new(); for file in &result.missing_files { let _ = @@ -256,18 +269,9 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> ); } - let stats = &result.stats; let payload = serde_json::json!({ "lockfile": serde_json::to_value(&lockfile).unwrap_or(serde_json::Value::Null), - "stats": { - "totalPackages": stats.total_packages, - "alreadyInStore": stats.already_in_store, - "packagesToFetch": stats.packages_to_fetch, - "filesInNewPackages": stats.files_in_new_packages, - "filesAlreadyInCafs": stats.files_already_in_cafs, - "filesToDownload": stats.files_to_download, - "downloadBytes": stats.download_bytes, - }, + "stats": stats_json, }); let _ = writeln!(ndjson, "L\t{}", payload); @@ -278,17 +282,115 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) -> .expect("static ndjson response is always valid") } +fn stats_json(stats: &diff::Stats) -> serde_json::Value { + serde_json::json!({ + "totalPackages": stats.total_packages, + "alreadyInStore": stats.already_in_store, + "packagesToFetch": stats.packages_to_fetch, + "filesInNewPackages": stats.files_in_new_packages, + "filesAlreadyInCafs": stats.files_already_in_cafs, + "filesToDownload": stats.files_to_download, + "downloadBytes": stats.download_bytes, + }) +} + +/// gzip level for the file-bearing responses (`/v1/files` and the +/// `inlineFiles` install body). Level 6 (the gzip default) shrinks the +/// payload ~16% over level 1 — the win that matters once the server is +/// across a latency link, where fewer bytes means fewer TCP slow-start +/// round trips — while level 9 adds under a percent for several times +/// the CPU. +const FILES_GZIP_LEVEL: u32 = 6; + +/// Content type of the combined `inlineFiles` install response: a +/// length-prefixed JSON header followed by the [`build_files_payload`] +/// binary frames, gzip-compressed. +const INLINE_CONTENT_TYPE: &str = "application/x-pnpr-install-inline"; + +/// Build the combined single-response body for an `inlineFiles` request: +/// the lockfile, stats, and store-index entries in a length-prefixed JSON +/// header, followed by the missing files' contents as `/v1/files` binary +/// frames — so the client materializes everything from one round trip. +fn inline_response( + runtime: &InstallAccelerator, + lockfile: &Lockfile, + stats_json: &serde_json::Value, + result: &diff::DiffResult, +) -> Response { + let index_entries: Vec = result + .package_index + .iter() + .map(|entry| { + serde_json::json!({ + "key": format!("{}\t{}", entry.integrity, entry.pkg_id), + "b64": BASE64.encode(&entry.raw), + }) + }) + .collect(); + let header = serde_json::json!({ + "lockfile": serde_json::to_value(lockfile).unwrap_or(serde_json::Value::Null), + "stats": stats_json, + "indexEntries": index_entries, + }); + + let files = result.missing_files.iter().map(|file| (file.digest.as_str(), file.executable)); + let files_payload = match build_files_payload(&runtime.store_dir, files) { + Ok(payload) => payload, + Err((status, message)) => return json_error(status, &message), + }; + + finish_inline_response(&header, &files_payload) +} + +/// Frame a JSON `header` and an already-built [`build_files_payload`] +/// byte buffer into one length-prefixed, gzip-compressed body. +fn finish_inline_response(header: &serde_json::Value, files_payload: &[u8]) -> Response { + let header_bytes = serde_json::to_vec(header).unwrap_or_else(|_| b"{}".to_vec()); + let Ok(header_len) = u32::try_from(header_bytes.len()) else { + return json_error(StatusCode::INTERNAL_SERVER_ERROR, "install header too large"); + }; + let mut body = Vec::with_capacity(4 + header_bytes.len() + files_payload.len()); + body.extend_from_slice(&header_len.to_be_bytes()); + body.extend_from_slice(&header_bytes); + body.extend_from_slice(files_payload); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::new(FILES_GZIP_LEVEL)); + if encoder.write_all(&body).is_err() { + return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"); + } + let gzipped = match encoder.finish() { + Ok(gzipped) => gzipped, + Err(_) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"), + }; + + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, INLINE_CONTENT_TYPE) + .header(header::CONTENT_ENCODING, "gzip") + .body(Body::from(gzipped)) + .expect("binary response is always valid") +} + +/// Why [`verify_input_lockfile`] failed: either the lockfile violated +/// the client's policy (carry the rendered violations so the caller can +/// shape them for the client's protocol) or the verifiers couldn't be +/// built at all (a ready-made error response). +enum VerifyFailure { + Violations(Vec), + Internal(Response), +} + /// Verify the client's input lockfile under the client's policy. On a -/// clean pass returns `Ok(())`; on a policy violation returns `Err` with -/// a 200 NDJSON response carrying a single `E` line of rendered -/// violations, so the client rebuilds the identical `VerifyError` and -/// aborts the same way the local gate would. A build-verifiers failure -/// (e.g. an invalid exclude pattern) returns a 500. +/// clean pass returns `Ok(())`; on a policy violation returns the +/// rendered violations so the caller can deliver them in whichever +/// protocol the client asked for (NDJSON `E` line or inline header). A +/// build-verifiers failure (e.g. an invalid exclude pattern) returns a +/// ready-made 500. async fn verify_input_lockfile( runtime: &InstallAccelerator, config: &'static PacquetConfig, lockfile: &Lockfile, -) -> Result<(), Response> { +) -> Result<(), VerifyFailure> { // A fresh per-request packument cache shared with the verifier; the // on-disk metadata mirror under `/v11/metadata-full` is // warm across requests and is the real verification cache. @@ -298,7 +400,9 @@ async fn verify_input_lockfile( Arc::clone(&runtime.client), Some(meta_cache as Arc), ) - .map_err(|err| json_error(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string()))?; + .map_err(|err| { + VerifyFailure::Internal(json_error(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string())) + })?; // Whole-lockfile verdict cache: an O(1) hit when this exact lockfile // already passed under a policy we still trust skips the whole fan-out @@ -332,13 +436,30 @@ async fn verify_input_lockfile( }) }) .collect(); - let payload = serde_json::json!({ "violations": rendered }); + Err(VerifyFailure::Violations(rendered)) +} + +/// Render input-lockfile policy violations in the protocol the client +/// asked for: the inline header (`{ "violations": [...] }`, no files) for +/// an `inlineFiles` request, otherwise a 200 NDJSON `E` line. Either way +/// the client rebuilds the identical `VerifyError` and aborts the same +/// way the local gate would. +fn violation_response(violations: &[serde_json::Value], request: &InstallRequest) -> Response { + if request.inline_files { + let header = serde_json::json!({ "violations": violations }); + // No files follow a verification failure: just the end-of-stream + // marker so the client's frame parser terminates cleanly. + let files_payload = empty_files_payload(); + return finish_inline_response(&header, &files_payload); + } + + let payload = serde_json::json!({ "violations": violations }); let body = format!("E\t{payload}\n"); - Err(Response::builder() + Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/x-ndjson") .body(Body::from(body)) - .expect("static ndjson violation response is always valid")) + .expect("static ndjson violation response is always valid") } /// Merge every active verifier's policy snapshot into one bag, the key @@ -374,48 +495,15 @@ pub(crate) async fn handle_files(runtime: &InstallAccelerator, body: Bytes) -> R } } - let store_dir = &runtime.store_dir; - // Build the binary payload up front so a missing file surfaces as a // clean 500 before any bytes are committed to the response. - let mut payload: Vec = Vec::new(); - payload.extend_from_slice(&2u32.to_be_bytes()); - payload.extend_from_slice(b"{}"); + let files = request.digests.iter().map(|file| (file.digest.as_str(), file.executable)); + let payload = match build_files_payload(&runtime.store_dir, files) { + Ok(payload) => payload, + Err((status, message)) => return json_error(status, &message), + }; - for file in &request.digests { - let mode = if file.executable { 0o755 } else { 0o644 }; - let Some(path) = store_dir.cas_file_path_by_mode(&file.digest, mode) else { - return json_error(StatusCode::INTERNAL_SERVER_ERROR, "could not resolve file path"); - }; - let content = match std::fs::read(&path) { - Ok(content) => content, - Err(err) => { - return json_error( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("{}: {err}", file.digest), - ); - } - }; - let Some(digest_bytes) = hex_to_bytes(&file.digest) else { - return json_error(StatusCode::BAD_REQUEST, "invalid digest"); - }; - // The wire framing encodes the size as a u32; a >4 GiB file would - // truncate. npm files never approach this, but fail cleanly rather - // than corrupt the stream. - let Ok(content_len) = u32::try_from(content.len()) else { - return json_error( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("{}: file too large for the protocol", file.digest), - ); - }; - payload.extend_from_slice(&digest_bytes); - payload.extend_from_slice(&content_len.to_be_bytes()); - payload.push(u8::from(file.executable)); - payload.extend_from_slice(&content); - } - payload.extend_from_slice(&[0u8; 64]); - - let mut encoder = GzEncoder::new(Vec::new(), Compression::new(1)); + let mut encoder = GzEncoder::new(Vec::new(), Compression::new(FILES_GZIP_LEVEL)); if encoder.write_all(&payload).is_err() { return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"); } @@ -432,6 +520,68 @@ pub(crate) async fn handle_files(runtime: &InstallAccelerator, body: Bytes) -> R .expect("binary response is always valid") } +/// The binary frames `/v1/files` serves and the `inlineFiles` install +/// response embeds: a 2-byte `{}` JSON header (length-prefixed) followed +/// by one `[64-byte digest][u32 size][1-byte exec][content]` frame per +/// file, terminated by 64 zero bytes. Reads each file's content from the +/// store by digest; an `Err` is a ready-made error response. +fn build_files_payload<'a>( + store_dir: &StoreDir, + files: impl Iterator, +) -> Result, (StatusCode, String)> { + let mut payload = empty_files_payload_prefix(); + for (digest, executable) in files { + let mode = if executable { 0o755 } else { 0o644 }; + let Some(path) = store_dir.cas_file_path_by_mode(digest, mode) else { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "could not resolve file path".to_string(), + )); + }; + let content = match std::fs::read(&path) { + Ok(content) => content, + Err(err) => { + return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{digest}: {err}"))); + } + }; + let Some(digest_bytes) = hex_to_bytes(digest) else { + return Err((StatusCode::BAD_REQUEST, "invalid digest".to_string())); + }; + // The wire framing encodes the size as a u32; a >4 GiB file would + // truncate. npm files never approach this, but fail cleanly rather + // than corrupt the stream. + let Ok(content_len) = u32::try_from(content.len()) else { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("{digest}: file too large for the protocol"), + )); + }; + payload.extend_from_slice(&digest_bytes); + payload.extend_from_slice(&content_len.to_be_bytes()); + payload.push(u8::from(executable)); + payload.extend_from_slice(&content); + } + payload.extend_from_slice(&[0u8; 64]); + Ok(payload) +} + +/// The leading 2-byte `{}` JSON header every files payload starts with. +fn empty_files_payload_prefix() -> Vec { + let mut prefix = Vec::new(); + prefix.extend_from_slice(&2u32.to_be_bytes()); + prefix.extend_from_slice(b"{}"); + prefix +} + +/// A files payload carrying no files — the header prefix plus the +/// end-of-stream marker. Used when an `inlineFiles` response has only +/// metadata (a `--lockfile-only` resolve or a verification failure). +fn empty_files_payload() -> Vec { + let mut payload = empty_files_payload_prefix(); + payload.extend_from_slice(&[0u8; 64]); + payload +} + fn json_error(status: StatusCode, message: &str) -> Response { let body = serde_json::json!({ "error": message }).to_string(); Response::builder() diff --git a/pnpr/crates/pnpr/src/install_accelerator/protocol.rs b/pnpr/crates/pnpr/src/install_accelerator/protocol.rs index 4f04f395ca..7ef0aba360 100644 --- a/pnpr/crates/pnpr/src/install_accelerator/protocol.rs +++ b/pnpr/crates/pnpr/src/install_accelerator/protocol.rs @@ -115,6 +115,17 @@ pub struct InstallRequest { /// check. #[serde(default)] pub trust_policy_ignore_after: Option, + /// When `true`, the client wants the file contents it's missing + /// streamed inline in this response rather than fetched in a second + /// `POST /v1/files` round trip. The server answers with a single + /// gzipped binary body (a length-prefixed header carrying the + /// lockfile, stats, and store-index entries, followed by the + /// `/v1/files` binary frames) instead of the NDJSON stream. Cuts the + /// cold-path round trips from three (handshake + install + files) to + /// one. See + /// [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165). + #[serde(default)] + pub inline_files: bool, } /// One project's importer dir and its dependency maps, normalized