perf(pnpr): collapse the install-accelerator cold path to one round trip (#12178)

Reduce the cost of a pnpr install-accelerator install against a remote
  server, where latency — not bandwidth — dominates.

  The cold path was three sequential round trips: a GET /-/pnpr handshake,
  a POST /v1/install that returned the lockfile + missing-file digests as
  NDJSON, and a POST /v1/files that fetched the file contents. At 100ms RTT
  most of an install is spent waiting on trips, not transferring data.

  - A new `inlineFiles` request flag makes POST /v1/install return a single
    gzipped body: a length-prefixed JSON header (lockfile, stats,
    store-index entries, or verification violations) followed by the
    missing files' contents as the same binary frames /v1/files serves.
    The pacquet client drops the handshake, sends the flag, and writes the
    inlined files straight to its CAFS — no follow-up fetch. The legacy
    NDJSON + /v1/files path is unchanged for clients that don't set it.
  - File-bearing responses now compress at gzip level 6 (was level 1),
    ~16% smaller payload at negligible CPU.

  Measured at 50ms one-way latency (warm server, 5-run average): a
  single-round-trip install drops from ~381ms to ~135ms (≈2.8x).

  Two further experiments from the issue were evaluated and not adopted:
  server-side fetch/transfer streaming (the public npm registry is
  CDN-backed and latency-bound, not bandwidth-throttled, so there is
  nothing to overlap — it measured slower under an artificial bandwidth
  cap) and bloom-filter integrity uploads (real but niche, and false
  positives would require a correctness-critical re-fetch fallback).

  Closes #12165
This commit is contained in:
Zoltan Kochan
2026-06-03 23:36:28 +02:00
committed by GitHub
parent 69cfcb7417
commit f06ab5e201
4 changed files with 355 additions and 237 deletions

View File

@@ -1,21 +1,22 @@
//! Client for pnpr's server-accelerated installs.
//!
//! Port of the TypeScript `@pnpm/pnpr.client` (`fetchFromPnpmRegistry`)
//! plus the `fetch-and-write-cafs` worker. Given a set of dependencies
//! and the client's content-addressable store, it:
//! Given a set of dependencies and the client's content-addressable
//! store, it:
//!
//! 1. reads the integrities already in the local store index,
//! 2. `POST`s them with the dependencies to `/v1/install` and parses the
//! NDJSON response (`D` missing-file digests, `I` store-index entries,
//! `L` lockfile + stats, `E` error),
//! 3. downloads the missing files from `/v1/files` and writes them
//! straight into the local CAFS *by digest* — no re-hashing,
//! 4. writes the forwarded store-index entries, and
//! 2. `POST`s them with the dependencies to `/v1/install`, asking the
//! server to inline the file contents it's missing (`inlineFiles`),
//! 3. parses the single combined response — a length-prefixed JSON header
//! (lockfile, stats, store-index entries, or verification violations)
//! followed by the missing files' bytes,
//! 4. writes those bytes straight into the local CAFS *by digest* (no
//! re-hashing) and writes the forwarded store-index entries, and
//! 5. returns the resolved lockfile for a headless install.
//!
//! The response is buffered rather than streamed, and `/v1/files` is
//! requested in a single batch; both mirror the current pnpr server and
//! are tracked follow-ups.
//! The whole exchange is one round trip — no handshake, no follow-up
//! `/v1/files` fetch. See
//! [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165). The
//! response is buffered rather than truly streamed, a tracked follow-up.
use std::{
collections::{BTreeMap, HashSet},
@@ -96,13 +97,13 @@ pub struct InstallOutcome {
/// The resolved lockfile, ready for a headless install.
pub lockfile: Lockfile,
pub stats: Stats,
/// Number of file entries `/v1/files` served into the local CAFS.
/// Number of inlined file entries written into the local CAFS.
pub files_written: usize,
/// Number of store-index entries written to the local index.
pub index_entries_written: usize,
}
/// Resolution statistics reported on the `L` line. Field names mirror
/// Resolution statistics from the response header. Field names mirror
/// the server's camelCase JSON.
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase", default)]
@@ -189,12 +190,16 @@ impl PnprClient {
/// Resolve a single project against the server and materialize the
/// missing files + store-index entries into the local store.
///
/// One round trip: the request asks the server to inline the file
/// contents (`inlineFiles`), so the response carries the lockfile,
/// stats, store-index entries, and the missing files' bytes in a
/// single body — no handshake and no follow-up `/v1/files` fetch.
/// See [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165).
pub async fn install(
&self,
opts: InstallOptions<'_>,
) -> Result<InstallOutcome, PnprClientError> {
self.handshake().await?;
let store_keys = read_store_keys(opts.store_dir);
let store_integrities = integrities_from_keys(&store_keys);
let present: HashSet<&str> = store_keys.iter().map(String::as_str).collect();
@@ -221,6 +226,7 @@ impl PnprClient {
"trustPolicy": opts.trust_policy,
"trustPolicyExclude": opts.trust_policy_exclude,
"trustPolicyIgnoreAfter": opts.trust_policy_ignore_after,
"inlineFiles": true,
});
let response =
@@ -230,25 +236,14 @@ impl PnprClient {
let body = response.text().await.unwrap_or_default();
return Err(PnprClientError::Server(format!("/v1/install returned {status}: {body}")));
}
let ndjson = response.text().await?;
let parsed = parse_install_response(&ndjson)?;
// `--lockfile-only`: pnpm fetches nothing and links nothing. A
// resolve-only server sends no `D`/`I` lines, but stay correct
// even against one that doesn't understand the flag by never
// touching the local store.
if opts.lockfile_only {
return Ok(InstallOutcome {
lockfile: parsed.lockfile,
stats: parsed.stats,
files_written: 0,
index_entries_written: 0,
});
}
let files_written = self.download_files(opts.store_dir, &parsed.missing_files).await?;
let raw = response.bytes().await?;
let parsed = parse_inline_response(&decompress(&raw)?)?;
// The server inlines only the files the client is missing; a
// `--lockfile-only` resolve and a verification pass both carry an
// empty file payload, so this writes nothing in those cases.
let files_written = write_files_payload(opts.store_dir, &parsed.files_payload)?;
let index_entries_written =
write_index_entries(opts.store_dir, parsed.index_entries, &present).await;
@@ -259,136 +254,79 @@ impl PnprClient {
index_entries_written,
})
}
}
async fn download_files(
&self,
store_dir: &StoreDir,
digests: &[MissingFile],
) -> Result<usize, PnprClientError> {
if digests.is_empty() {
return Ok(0);
}
let request = serde_json::json!({
"digests": digests
.iter()
.map(|file| serde_json::json!({
"digest": file.digest,
"executable": file.executable,
}))
.collect::<Vec<_>>(),
});
let response =
self.http.post(format!("{}v1/files", self.base_url)).json(&request).send().await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(PnprClientError::Server(format!("/v1/files returned {status}: {body}")));
}
let raw = response.bytes().await?;
// The server sets `Content-Encoding: gzip`. Decompress unless the
// HTTP stack already did (detected via the gzip magic bytes), so
// the client works whether or not reqwest's `gzip` feature is on.
// Borrow `raw` directly when it's already decompressed.
let decompressed: Vec<u8>;
let payload: &[u8] = if raw.starts_with(&[0x1f, 0x8b]) {
let mut decoder = GzDecoder::new(&raw[..]);
let mut out = Vec::new();
decoder.read_to_end(&mut out)?;
decompressed = out;
&decompressed
} else {
&raw
};
// Guard against a server that streams entries we never asked for,
// which would otherwise write unbounded files into our CAFS.
let mut requested: HashSet<(String, bool)> =
digests.iter().map(|file| (file.digest.clone(), file.executable)).collect();
write_files_payload(store_dir, payload, &mut requested)
/// Decompress a `Content-Encoding: gzip` body unless the HTTP stack
/// already did (detected via the gzip magic bytes), so the client works
/// whether or not reqwest's `gzip` feature is on. Returns the bytes as-is
/// when they're already decompressed.
fn decompress(raw: &[u8]) -> Result<Vec<u8>, PnprClientError> {
if raw.starts_with(&[0x1f, 0x8b]) {
let mut decoder = GzDecoder::new(raw);
let mut out = Vec::new();
decoder.read_to_end(&mut out)?;
Ok(out)
} else {
Ok(raw.to_vec())
}
}
struct ParsedInstall {
lockfile: Lockfile,
stats: Stats,
missing_files: Vec<MissingFile>,
/// The `/v1/files`-shaped binary frames the server inlined after the
/// header — written into the CAFS by [`write_files_payload`].
files_payload: Vec<u8>,
index_entries: Vec<(String, Vec<u8>)>,
}
struct MissingFile {
digest: String,
executable: bool,
}
/// Decode the combined `inlineFiles` install response: a 4-byte
/// big-endian header length, that many bytes of JSON header (lockfile,
/// stats, store-index entries, or verification violations), then the
/// file frames.
fn parse_inline_response(payload: &[u8]) -> Result<ParsedInstall, PnprClientError> {
if payload.len() < 4 {
return Err(PnprClientError::Protocol("install response too short".to_string()));
}
let header_len = u32::from_be_bytes([payload[0], payload[1], payload[2], payload[3]]) as usize;
let header_end = 4 + header_len;
if header_end > payload.len() {
return Err(PnprClientError::Protocol("install header truncated".to_string()));
}
let header: InlineHeader = serde_json::from_slice(&payload[4..header_end])
.map_err(|err| PnprClientError::Protocol(err.to_string()))?;
fn parse_install_response(ndjson: &str) -> Result<ParsedInstall, PnprClientError> {
let mut missing_files = Vec::new();
let mut index_entries = Vec::new();
let mut final_line: Option<(Lockfile, Stats)> = None;
for line in ndjson.lines() {
let Some((tag, rest)) = line.split_once('\t') else { continue };
match tag {
"D" => {
// `digest \t size \t executable`
let mut parts = rest.split('\t');
let digest = parts.next().unwrap_or_default().to_string();
let _size = parts.next();
let executable = parts.next() == Some("1");
missing_files.push(MissingFile { digest, executable });
}
"I" => {
// `integrity \t pkgId \t base64`; the index key is
// `integrity \t pkgId` (everything before the last tab).
let Some((key, encoded)) = rest.rsplit_once('\t') else {
return Err(PnprClientError::Protocol("malformed I line".to_string()));
};
let raw = BASE64
.decode(encoded)
.map_err(|err| PnprClientError::Protocol(err.to_string()))?;
index_entries.push((key.to_string(), raw));
}
"L" => {
let payload: LPayload = serde_json::from_str(rest)
.map_err(|err| PnprClientError::Protocol(err.to_string()))?;
final_line = Some((payload.lockfile, payload.stats));
}
"E" => {
if let Ok(payload) = serde_json::from_str::<EPayload>(rest) {
if let Some(violations) = payload.violations.filter(|list| !list.is_empty()) {
return Err(PnprClientError::Verification(build_verify_error(violations)));
}
if !payload.error.is_empty() {
return Err(PnprClientError::Server(payload.error));
}
}
return Err(PnprClientError::Server(rest.to_string()));
}
_ => {}
}
if let Some(violations) = header.violations.filter(|list| !list.is_empty()) {
return Err(PnprClientError::Verification(build_verify_error(violations)));
}
let (lockfile, stats) = final_line.ok_or_else(|| {
PnprClientError::Protocol("response had no lockfile (L line)".to_string())
})?;
let lockfile = header
.lockfile
.ok_or_else(|| PnprClientError::Protocol("install response had no lockfile".to_string()))?;
Ok(ParsedInstall { lockfile, stats, missing_files, index_entries })
let mut index_entries = Vec::with_capacity(header.index_entries.len());
for entry in header.index_entries {
let raw =
BASE64.decode(&entry.b64).map_err(|err| PnprClientError::Protocol(err.to_string()))?;
index_entries.push((entry.key, raw));
}
Ok(ParsedInstall {
lockfile,
stats: header.stats,
files_payload: payload[header_end..].to_vec(),
index_entries,
})
}
#[derive(Deserialize)]
struct LPayload {
lockfile: Lockfile,
#[serde(rename_all = "camelCase")]
struct InlineHeader {
lockfile: Option<Lockfile>,
#[serde(default)]
stats: Stats,
}
#[derive(Deserialize)]
struct EPayload {
#[serde(default)]
error: String,
index_entries: Vec<InlineIndexEntry>,
/// Present when the server rejected the input lockfile under the
/// client's verification policy. Each entry mirrors the local
/// runner's rendered violation so the client can rebuild the
@@ -397,6 +335,14 @@ struct EPayload {
violations: Option<Vec<WireViolation>>,
}
#[derive(Deserialize)]
struct InlineIndexEntry {
/// The store-index key, `{integrity}\t{pkgId}`.
key: String,
/// The base64-encoded msgpackr-records buffer.
b64: String,
}
#[derive(Deserialize)]
struct WireViolation {
name: String,
@@ -440,13 +386,13 @@ fn intern_violation_code(code: &str) -> &'static str {
}
}
/// Decode the `/v1/files` binary payload and write each entry to the
/// CAFS by digest. Returns the number of entries served.
fn write_files_payload(
store_dir: &StoreDir,
payload: &[u8],
requested: &mut HashSet<(String, bool)>,
) -> Result<usize, PnprClientError> {
/// Decode the inlined binary file payload and write each entry to the
/// CAFS by digest. Returns the number of entries written. An empty
/// payload (no frames before the end-of-stream marker) writes nothing.
fn write_files_payload(store_dir: &StoreDir, payload: &[u8]) -> Result<usize, PnprClientError> {
if payload.is_empty() {
return Ok(0);
}
if payload.len() < 4 {
return Err(PnprClientError::Protocol("files payload too short".to_string()));
}
@@ -480,12 +426,6 @@ fn write_files_payload(
let content = &payload[content_start..content_end];
let digest = hex_encode(digest_bytes);
if !requested.remove(&(digest.clone(), executable)) {
return Err(PnprClientError::Server(format!(
"/v1/files returned an entry that was not requested: {digest}",
)));
}
write_cas_file(store_dir, &digest, executable, content)?;
written += 1;
offset = content_end;

View File

@@ -1,12 +1,28 @@
use super::{PnprClientError, VerifyError, parse_install_response};
use super::{PnprClientError, VerifyError, parse_inline_response};
/// A structured `E` line (the server's verification rejection) is
/// rebuilt into the same `VerifyError` the local gate raises, so the CLI
/// aborts with an identical diagnostic code + breakdown.
/// Frame a JSON header into a complete inline install payload with an
/// empty file section (the `{}` prefix plus the end-of-stream marker),
/// matching what the server sends when there are no files to inline.
fn inline_payload(header_json: &str) -> Vec<u8> {
let header = header_json.as_bytes();
let mut payload = Vec::new();
payload.extend_from_slice(&(header.len() as u32).to_be_bytes());
payload.extend_from_slice(header);
payload.extend_from_slice(&2u32.to_be_bytes());
payload.extend_from_slice(b"{}");
payload.extend_from_slice(&[0u8; 64]);
payload
}
/// A header carrying verification violations is rebuilt into the same
/// `VerifyError` the local gate raises, so the CLI aborts with an
/// identical diagnostic code + breakdown.
#[test]
fn e_line_with_violations_rebuilds_a_verify_error() {
let ndjson = "E\t{\"violations\":[{\"name\":\"@foo/no-deps\",\"version\":\"1.0.0\",\"code\":\"MINIMUM_RELEASE_AGE_VIOLATION\",\"reason\":\"was published yesterday\"}]}\n";
let Err(PnprClientError::Verification(verify_err)) = parse_install_response(ndjson) else {
fn header_with_violations_rebuilds_a_verify_error() {
let payload = inline_payload(
r#"{"violations":[{"name":"@foo/no-deps","version":"1.0.0","code":"MINIMUM_RELEASE_AGE_VIOLATION","reason":"was published yesterday"}]}"#,
);
let Err(PnprClientError::Verification(verify_err)) = parse_inline_response(&payload) else {
panic!("expected a Verification error");
};
assert!(
@@ -21,8 +37,10 @@ fn e_line_with_violations_rebuilds_a_verify_error() {
/// variant.
#[test]
fn tarball_mismatch_maps_to_the_generic_envelope() {
let ndjson = "E\t{\"violations\":[{\"name\":\"acme\",\"version\":\"1.0.0\",\"code\":\"TARBALL_URL_MISMATCH\",\"reason\":\"url mismatch\"}]}\n";
let Err(PnprClientError::Verification(verify_err)) = parse_install_response(ndjson) else {
let payload = inline_payload(
r#"{"violations":[{"name":"acme","version":"1.0.0","code":"TARBALL_URL_MISMATCH","reason":"url mismatch"}]}"#,
);
let Err(PnprClientError::Verification(verify_err)) = parse_inline_response(&payload) else {
panic!("expected a Verification error");
};
assert!(
@@ -31,13 +49,12 @@ fn tarball_mismatch_maps_to_the_generic_envelope() {
);
}
/// A plain `E` error line (no `violations`) stays a generic server error
/// rather than a verification error.
/// A header with no lockfile and no violations is a malformed response,
/// not a silent success.
#[test]
fn e_line_with_plain_error_is_a_server_error() {
let ndjson = "E\t{\"error\":\"resolution failed\"}\n";
let Err(PnprClientError::Server(message)) = parse_install_response(ndjson) else {
panic!("expected a Server error");
fn header_without_a_lockfile_is_a_protocol_error() {
let payload = inline_payload("{}");
let Err(PnprClientError::Protocol(_)) = parse_inline_response(&payload) else {
panic!("expected a Protocol error");
};
assert_eq!(message, "resolution failed");
}

View File

@@ -10,10 +10,14 @@
//! protocol versions so a client can negotiate or fail fast.
//! * `POST /v1/install` — resolve a project **against the registries
//! the client sends** (so the server uses the same source of truth as
//! the client), then stream an NDJSON response: `D` lines (file
//! digests the client is missing), `I` lines (pre-packed store-index
//! entries), a final `L` line with the lockfile and stats, or an `E`
//! line on a mid-stream error.
//! the client), then return the missing-file digests, pre-packed
//! store-index entries, lockfile, and stats. With `inlineFiles` the
//! client also gets the missing files' *contents* in the same response
//! (a length-prefixed JSON header followed by the `/v1/files` binary
//! frames, gzipped), collapsing the cold path to a single round trip
//! ([pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165));
//! otherwise the response is the NDJSON stream of `D`/`I`/`L`/`E`
//! lines and the client fetches files separately.
//! * `POST /v1/files` — serve a batch of files by digest as a gzip
//! binary stream the client writes straight into its CAFS.
//!
@@ -194,9 +198,12 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) ->
// pick-time gate (the policy is wired into `config`).
if !request.trust_lockfile
&& let Some(input_lockfile) = request.lockfile.as_ref()
&& let Err(response) = verify_input_lockfile(runtime, config, input_lockfile).await
&& let Err(failure) = verify_input_lockfile(runtime, config, input_lockfile).await
{
return response;
return match failure {
VerifyFailure::Internal(response) => response,
VerifyFailure::Violations(violations) => violation_response(&violations, &request),
};
}
let lockfile = match resolve::resolve(config, &runtime.client, &request).await {
@@ -241,6 +248,12 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) ->
}
};
let stats_json = stats_json(&result.stats);
if request.inline_files {
return inline_response(runtime, &lockfile, &stats_json, &result);
}
let mut ndjson: Vec<u8> = Vec::new();
for file in &result.missing_files {
let _ =
@@ -256,18 +269,9 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) ->
);
}
let stats = &result.stats;
let payload = serde_json::json!({
"lockfile": serde_json::to_value(&lockfile).unwrap_or(serde_json::Value::Null),
"stats": {
"totalPackages": stats.total_packages,
"alreadyInStore": stats.already_in_store,
"packagesToFetch": stats.packages_to_fetch,
"filesInNewPackages": stats.files_in_new_packages,
"filesAlreadyInCafs": stats.files_already_in_cafs,
"filesToDownload": stats.files_to_download,
"downloadBytes": stats.download_bytes,
},
"stats": stats_json,
});
let _ = writeln!(ndjson, "L\t{}", payload);
@@ -278,17 +282,115 @@ pub(crate) async fn handle_install(runtime: &InstallAccelerator, body: Bytes) ->
.expect("static ndjson response is always valid")
}
fn stats_json(stats: &diff::Stats) -> serde_json::Value {
serde_json::json!({
"totalPackages": stats.total_packages,
"alreadyInStore": stats.already_in_store,
"packagesToFetch": stats.packages_to_fetch,
"filesInNewPackages": stats.files_in_new_packages,
"filesAlreadyInCafs": stats.files_already_in_cafs,
"filesToDownload": stats.files_to_download,
"downloadBytes": stats.download_bytes,
})
}
/// gzip level for the file-bearing responses (`/v1/files` and the
/// `inlineFiles` install body). Level 6 (the gzip default) shrinks the
/// payload ~16% over level 1 — the win that matters once the server is
/// across a latency link, where fewer bytes means fewer TCP slow-start
/// round trips — while level 9 adds under a percent for several times
/// the CPU.
const FILES_GZIP_LEVEL: u32 = 6;
/// Content type of the combined `inlineFiles` install response: a
/// length-prefixed JSON header followed by the [`build_files_payload`]
/// binary frames, gzip-compressed.
const INLINE_CONTENT_TYPE: &str = "application/x-pnpr-install-inline";
/// Build the combined single-response body for an `inlineFiles` request:
/// the lockfile, stats, and store-index entries in a length-prefixed JSON
/// header, followed by the missing files' contents as `/v1/files` binary
/// frames — so the client materializes everything from one round trip.
fn inline_response(
runtime: &InstallAccelerator,
lockfile: &Lockfile,
stats_json: &serde_json::Value,
result: &diff::DiffResult,
) -> Response {
let index_entries: Vec<serde_json::Value> = result
.package_index
.iter()
.map(|entry| {
serde_json::json!({
"key": format!("{}\t{}", entry.integrity, entry.pkg_id),
"b64": BASE64.encode(&entry.raw),
})
})
.collect();
let header = serde_json::json!({
"lockfile": serde_json::to_value(lockfile).unwrap_or(serde_json::Value::Null),
"stats": stats_json,
"indexEntries": index_entries,
});
let files = result.missing_files.iter().map(|file| (file.digest.as_str(), file.executable));
let files_payload = match build_files_payload(&runtime.store_dir, files) {
Ok(payload) => payload,
Err((status, message)) => return json_error(status, &message),
};
finish_inline_response(&header, &files_payload)
}
/// Frame a JSON `header` and an already-built [`build_files_payload`]
/// byte buffer into one length-prefixed, gzip-compressed body.
fn finish_inline_response(header: &serde_json::Value, files_payload: &[u8]) -> Response {
let header_bytes = serde_json::to_vec(header).unwrap_or_else(|_| b"{}".to_vec());
let Ok(header_len) = u32::try_from(header_bytes.len()) else {
return json_error(StatusCode::INTERNAL_SERVER_ERROR, "install header too large");
};
let mut body = Vec::with_capacity(4 + header_bytes.len() + files_payload.len());
body.extend_from_slice(&header_len.to_be_bytes());
body.extend_from_slice(&header_bytes);
body.extend_from_slice(files_payload);
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(FILES_GZIP_LEVEL));
if encoder.write_all(&body).is_err() {
return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed");
}
let gzipped = match encoder.finish() {
Ok(gzipped) => gzipped,
Err(_) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed"),
};
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, INLINE_CONTENT_TYPE)
.header(header::CONTENT_ENCODING, "gzip")
.body(Body::from(gzipped))
.expect("binary response is always valid")
}
/// Why [`verify_input_lockfile`] failed: either the lockfile violated
/// the client's policy (carry the rendered violations so the caller can
/// shape them for the client's protocol) or the verifiers couldn't be
/// built at all (a ready-made error response).
enum VerifyFailure {
Violations(Vec<serde_json::Value>),
Internal(Response),
}
/// Verify the client's input lockfile under the client's policy. On a
/// clean pass returns `Ok(())`; on a policy violation returns `Err` with
/// a 200 NDJSON response carrying a single `E` line of rendered
/// violations, so the client rebuilds the identical `VerifyError` and
/// aborts the same way the local gate would. A build-verifiers failure
/// (e.g. an invalid exclude pattern) returns a 500.
/// clean pass returns `Ok(())`; on a policy violation returns the
/// rendered violations so the caller can deliver them in whichever
/// protocol the client asked for (NDJSON `E` line or inline header). A
/// build-verifiers failure (e.g. an invalid exclude pattern) returns a
/// ready-made 500.
async fn verify_input_lockfile(
runtime: &InstallAccelerator,
config: &'static PacquetConfig,
lockfile: &Lockfile,
) -> Result<(), Response> {
) -> Result<(), VerifyFailure> {
// A fresh per-request packument cache shared with the verifier; the
// on-disk metadata mirror under `<cache_dir>/v11/metadata-full` is
// warm across requests and is the real verification cache.
@@ -298,7 +400,9 @@ async fn verify_input_lockfile(
Arc::clone(&runtime.client),
Some(meta_cache as Arc<dyn PackageMetaCache>),
)
.map_err(|err| json_error(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string()))?;
.map_err(|err| {
VerifyFailure::Internal(json_error(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string()))
})?;
// Whole-lockfile verdict cache: an O(1) hit when this exact lockfile
// already passed under a policy we still trust skips the whole fan-out
@@ -332,13 +436,30 @@ async fn verify_input_lockfile(
})
})
.collect();
let payload = serde_json::json!({ "violations": rendered });
Err(VerifyFailure::Violations(rendered))
}
/// Render input-lockfile policy violations in the protocol the client
/// asked for: the inline header (`{ "violations": [...] }`, no files) for
/// an `inlineFiles` request, otherwise a 200 NDJSON `E` line. Either way
/// the client rebuilds the identical `VerifyError` and aborts the same
/// way the local gate would.
fn violation_response(violations: &[serde_json::Value], request: &InstallRequest) -> Response {
if request.inline_files {
let header = serde_json::json!({ "violations": violations });
// No files follow a verification failure: just the end-of-stream
// marker so the client's frame parser terminates cleanly.
let files_payload = empty_files_payload();
return finish_inline_response(&header, &files_payload);
}
let payload = serde_json::json!({ "violations": violations });
let body = format!("E\t{payload}\n");
Err(Response::builder()
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/x-ndjson")
.body(Body::from(body))
.expect("static ndjson violation response is always valid"))
.expect("static ndjson violation response is always valid")
}
/// Merge every active verifier's policy snapshot into one bag, the key
@@ -374,48 +495,15 @@ pub(crate) async fn handle_files(runtime: &InstallAccelerator, body: Bytes) -> R
}
}
let store_dir = &runtime.store_dir;
// Build the binary payload up front so a missing file surfaces as a
// clean 500 before any bytes are committed to the response.
let mut payload: Vec<u8> = Vec::new();
payload.extend_from_slice(&2u32.to_be_bytes());
payload.extend_from_slice(b"{}");
let files = request.digests.iter().map(|file| (file.digest.as_str(), file.executable));
let payload = match build_files_payload(&runtime.store_dir, files) {
Ok(payload) => payload,
Err((status, message)) => return json_error(status, &message),
};
for file in &request.digests {
let mode = if file.executable { 0o755 } else { 0o644 };
let Some(path) = store_dir.cas_file_path_by_mode(&file.digest, mode) else {
return json_error(StatusCode::INTERNAL_SERVER_ERROR, "could not resolve file path");
};
let content = match std::fs::read(&path) {
Ok(content) => content,
Err(err) => {
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("{}: {err}", file.digest),
);
}
};
let Some(digest_bytes) = hex_to_bytes(&file.digest) else {
return json_error(StatusCode::BAD_REQUEST, "invalid digest");
};
// The wire framing encodes the size as a u32; a >4 GiB file would
// truncate. npm files never approach this, but fail cleanly rather
// than corrupt the stream.
let Ok(content_len) = u32::try_from(content.len()) else {
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("{}: file too large for the protocol", file.digest),
);
};
payload.extend_from_slice(&digest_bytes);
payload.extend_from_slice(&content_len.to_be_bytes());
payload.push(u8::from(file.executable));
payload.extend_from_slice(&content);
}
payload.extend_from_slice(&[0u8; 64]);
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(1));
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(FILES_GZIP_LEVEL));
if encoder.write_all(&payload).is_err() {
return json_error(StatusCode::INTERNAL_SERVER_ERROR, "gzip failed");
}
@@ -432,6 +520,68 @@ pub(crate) async fn handle_files(runtime: &InstallAccelerator, body: Bytes) -> R
.expect("binary response is always valid")
}
/// The binary frames `/v1/files` serves and the `inlineFiles` install
/// response embeds: a 2-byte `{}` JSON header (length-prefixed) followed
/// by one `[64-byte digest][u32 size][1-byte exec][content]` frame per
/// file, terminated by 64 zero bytes. Reads each file's content from the
/// store by digest; an `Err` is a ready-made error response.
fn build_files_payload<'a>(
store_dir: &StoreDir,
files: impl Iterator<Item = (&'a str, bool)>,
) -> Result<Vec<u8>, (StatusCode, String)> {
let mut payload = empty_files_payload_prefix();
for (digest, executable) in files {
let mode = if executable { 0o755 } else { 0o644 };
let Some(path) = store_dir.cas_file_path_by_mode(digest, mode) else {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"could not resolve file path".to_string(),
));
};
let content = match std::fs::read(&path) {
Ok(content) => content,
Err(err) => {
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{digest}: {err}")));
}
};
let Some(digest_bytes) = hex_to_bytes(digest) else {
return Err((StatusCode::BAD_REQUEST, "invalid digest".to_string()));
};
// The wire framing encodes the size as a u32; a >4 GiB file would
// truncate. npm files never approach this, but fail cleanly rather
// than corrupt the stream.
let Ok(content_len) = u32::try_from(content.len()) else {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("{digest}: file too large for the protocol"),
));
};
payload.extend_from_slice(&digest_bytes);
payload.extend_from_slice(&content_len.to_be_bytes());
payload.push(u8::from(executable));
payload.extend_from_slice(&content);
}
payload.extend_from_slice(&[0u8; 64]);
Ok(payload)
}
/// The leading 2-byte `{}` JSON header every files payload starts with.
fn empty_files_payload_prefix() -> Vec<u8> {
let mut prefix = Vec::new();
prefix.extend_from_slice(&2u32.to_be_bytes());
prefix.extend_from_slice(b"{}");
prefix
}
/// A files payload carrying no files — the header prefix plus the
/// end-of-stream marker. Used when an `inlineFiles` response has only
/// metadata (a `--lockfile-only` resolve or a verification failure).
fn empty_files_payload() -> Vec<u8> {
let mut payload = empty_files_payload_prefix();
payload.extend_from_slice(&[0u8; 64]);
payload
}
fn json_error(status: StatusCode, message: &str) -> Response {
let body = serde_json::json!({ "error": message }).to_string();
Response::builder()

View File

@@ -115,6 +115,17 @@ pub struct InstallRequest {
/// check.
#[serde(default)]
pub trust_policy_ignore_after: Option<u64>,
/// When `true`, the client wants the file contents it's missing
/// streamed inline in this response rather than fetched in a second
/// `POST /v1/files` round trip. The server answers with a single
/// gzipped binary body (a length-prefixed header carrying the
/// lockfile, stats, and store-index entries, followed by the
/// `/v1/files` binary frames) instead of the NDJSON stream. Cuts the
/// cold-path round trips from three (handshake + install + files) to
/// one. See
/// [pnpm/pnpm#12165](https://github.com/pnpm/pnpm/issues/12165).
#[serde(default)]
pub inline_files: bool,
}
/// One project's importer dir and its dependency maps, normalized